diff --git a/test/cpp/interop/xds_interop_client.cc b/test/cpp/interop/xds_interop_client.cc index 4013ce9879a..b965ad1562e 100644 --- a/test/cpp/interop/xds_interop_client.cc +++ b/test/cpp/interop/xds_interop_client.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -63,24 +64,48 @@ using grpc::Server; using grpc::ServerBuilder; using grpc::ServerContext; using grpc::Status; +using grpc::testing::ClientConfigureRequest; +using grpc::testing::ClientConfigureRequest_RpcType_Name; +using grpc::testing::ClientConfigureResponse; using grpc::testing::Empty; +using grpc::testing::LoadBalancerAccumulatedStatsRequest; +using grpc::testing::LoadBalancerAccumulatedStatsResponse; using grpc::testing::LoadBalancerStatsRequest; using grpc::testing::LoadBalancerStatsResponse; using grpc::testing::LoadBalancerStatsService; using grpc::testing::SimpleRequest; using grpc::testing::SimpleResponse; using grpc::testing::TestService; +using grpc::testing::XdsUpdateClientConfigureService; class XdsStatsWatcher; -// Unique ID for each outgoing RPC -int global_request_id; -// Stores a set of watchers that should be notified upon outgoing RPC completion -std::set watchers; -// Mutex for global_request_id and watchers -std::mutex mu; +struct StatsWatchers { + // Unique ID for each outgoing RPC + int global_request_id = 0; + // Unique ID for each outgoing RPC by RPC method type + std::map global_request_id_by_type; + // Stores a set of watchers that should be notified upon outgoing RPC + // completion + std::set watchers; + // Global watcher for accumululated stats. + XdsStatsWatcher* global_watcher; + // Mutex for global_request_id and watchers + std::mutex mu; +}; // Whether at least one RPC has succeeded, indicating xDS resolution completed. std::atomic one_rpc_succeeded(false); +// RPC configuration detailing how RPC should be sent. +struct RpcConfig { + ClientConfigureRequest::RpcType type; + std::vector> metadata; +}; +struct RpcConfigurationsQueue { + // A queue of RPC configurations detailing how RPCs should be sent. + std::deque> rpc_configs_queue; + // Mutex for rpc_configs_queue + std::mutex mu_rpc_configs_queue; +}; /** Records the remote peer distribution for a given range of RPCs. */ class XdsStatsWatcher { @@ -88,16 +113,25 @@ class XdsStatsWatcher { XdsStatsWatcher(int start_id, int end_id) : start_id_(start_id), end_id_(end_id), rpcs_needed_(end_id - start_id) {} - void RpcCompleted(int request_id, const std::string& rpc_method, + // Upon the completion of an RPC, we will look at the request_id, the + // rpc_type, and the peer the RPC was sent to in order to count + // this RPC into the right stats bin. + void RpcCompleted(int request_id, + const ClientConfigureRequest::RpcType rpc_type, const std::string& peer) { - if (start_id_ <= request_id && request_id < end_id_) { + // We count RPCs for global watcher or if the request_id falls into the + // watcher's interested range of request ids. + if ((start_id_ == 0 && end_id_ == 0) || + (start_id_ <= request_id && request_id < end_id_)) { { - std::lock_guard lk(m_); + std::lock_guard lock(m_); if (peer.empty()) { no_remote_peer_++; + ++no_remote_peer_by_type_[rpc_type]; } else { + // RPC is counted into both per-peer bin and per-method-per-peer bin. rpcs_by_peer_[peer]++; - rpcs_by_method_[rpc_method][peer]++; + rpcs_by_type_[rpc_type][peer]++; } rpcs_needed_--; } @@ -108,18 +142,28 @@ class XdsStatsWatcher { void WaitForRpcStatsResponse(LoadBalancerStatsResponse* response, int timeout_sec) { { - std::unique_lock lk(m_); - cv_.wait_for(lk, std::chrono::seconds(timeout_sec), + std::unique_lock lock(m_); + cv_.wait_for(lock, std::chrono::seconds(timeout_sec), [this] { return rpcs_needed_ == 0; }); response->mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(), rpcs_by_peer_.end()); auto& response_rpcs_by_method = *response->mutable_rpcs_by_method(); - for (const auto& rpc_by_method : rpcs_by_method_) { - auto& response_rpc_by_method = - response_rpcs_by_method[rpc_by_method.first]; + for (const auto& rpc_by_type : rpcs_by_type_) { + std::string method_name; + if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) { + method_name = "EmptyCall"; + } else if (rpc_by_type.first == ClientConfigureRequest::UNARY_CALL) { + method_name = "UnaryCall"; + } else { + GPR_ASSERT(0); + } + // TODO@donnadionne: When the test runner changes to accept EMPTY_CALL + // and UNARY_CALL we will just use the name of the enum instead of the + // method_name variable. + auto& response_rpc_by_method = response_rpcs_by_method[method_name]; auto& response_rpcs_by_peer = *response_rpc_by_method.mutable_rpcs_by_peer(); - for (const auto& rpc_by_peer : rpc_by_method.second) { + for (const auto& rpc_by_peer : rpc_by_type.second) { auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first]; response_rpc_by_peer = rpc_by_peer.second; } @@ -128,43 +172,76 @@ class XdsStatsWatcher { } } + void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response, + StatsWatchers* stats_watchers) { + std::unique_lock lock(m_); + auto& response_rpcs_started_by_method = + *response->mutable_num_rpcs_started_by_method(); + auto& response_rpcs_succeeded_by_method = + *response->mutable_num_rpcs_succeeded_by_method(); + auto& response_rpcs_failed_by_method = + *response->mutable_num_rpcs_failed_by_method(); + for (const auto& rpc_by_type : rpcs_by_type_) { + auto total_succeeded = 0; + for (const auto& rpc_by_peer : rpc_by_type.second) { + total_succeeded += rpc_by_peer.second; + } + response_rpcs_succeeded_by_method[ClientConfigureRequest_RpcType_Name( + rpc_by_type.first)] = total_succeeded; + response_rpcs_started_by_method[ClientConfigureRequest_RpcType_Name( + rpc_by_type.first)] = + stats_watchers->global_request_id_by_type[rpc_by_type.first]; + response_rpcs_failed_by_method[ClientConfigureRequest_RpcType_Name( + rpc_by_type.first)] = no_remote_peer_by_type_[rpc_by_type.first]; + } + } + private: int start_id_; int end_id_; int rpcs_needed_; int no_remote_peer_ = 0; + std::map no_remote_peer_by_type_; // A map of stats keyed by peer name. std::map rpcs_by_peer_; // A two-level map of stats keyed at top level by RPC method and second level // by peer name. - std::map> rpcs_by_method_; + std::map> rpcs_by_type_; std::mutex m_; std::condition_variable cv_; }; class TestClient { public: - TestClient(const std::shared_ptr& channel) - : stub_(TestService::NewStub(channel)) {} + TestClient(const std::shared_ptr& channel, + StatsWatchers* stats_watchers) + : stub_(TestService::NewStub(channel)), stats_watchers_(stats_watchers) {} void AsyncUnaryCall( std::vector> metadata) { SimpleResponse response; int saved_request_id; { - std::lock_guard lk(mu); - saved_request_id = ++global_request_id; + std::lock_guard lock(stats_watchers_->mu); + saved_request_id = ++stats_watchers_->global_request_id; + ++stats_watchers_ + ->global_request_id_by_type[ClientConfigureRequest::UNARY_CALL]; } std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + std::chrono::seconds(absl::GetFlag(FLAGS_rpc_timeout_sec)); AsyncClientCall* call = new AsyncClientCall; - call->context.set_deadline(deadline); for (const auto& data : metadata) { call->context.AddMetadata(data.first, data.second); + // TODO@donnadionne: move deadline to separate proto. + if (data.first == "rpc-behavior" && data.second == "keep-open") { + deadline = + std::chrono::system_clock::now() + std::chrono::seconds(INT_MAX); + } } + call->context.set_deadline(deadline); call->saved_request_id = saved_request_id; - call->rpc_method = "UnaryCall"; + call->rpc_type = ClientConfigureRequest::UNARY_CALL; call->simple_response_reader = stub_->PrepareAsyncUnaryCall( &call->context, SimpleRequest::default_instance(), &cq_); call->simple_response_reader->StartCall(); @@ -177,19 +254,26 @@ class TestClient { Empty response; int saved_request_id; { - std::lock_guard lk(mu); - saved_request_id = ++global_request_id; + std::lock_guard lock(stats_watchers_->mu); + saved_request_id = ++stats_watchers_->global_request_id; + ++stats_watchers_ + ->global_request_id_by_type[ClientConfigureRequest::EMPTY_CALL]; } std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + std::chrono::seconds(absl::GetFlag(FLAGS_rpc_timeout_sec)); AsyncClientCall* call = new AsyncClientCall; - call->context.set_deadline(deadline); for (const auto& data : metadata) { call->context.AddMetadata(data.first, data.second); + // TODO@donnadionne: move deadline to separate proto. + if (data.first == "rpc-behavior" && data.second == "keep-open") { + deadline = + std::chrono::system_clock::now() + std::chrono::seconds(INT_MAX); + } } + call->context.set_deadline(deadline); call->saved_request_id = saved_request_id; - call->rpc_method = "EmptyCall"; + call->rpc_type = ClientConfigureRequest::EMPTY_CALL; call->empty_response_reader = stub_->PrepareAsyncEmptyCall( &call->context, Empty::default_instance(), &cq_); call->empty_response_reader->StartCall(); @@ -204,7 +288,7 @@ class TestClient { AsyncClientCall* call = static_cast(got_tag); GPR_ASSERT(ok); { - std::lock_guard lk(mu); + std::lock_guard lock(stats_watchers_->mu); auto server_initial_metadata = call->context.GetServerInitialMetadata(); auto metadata_hostname = call->context.GetServerInitialMetadata().find("hostname"); @@ -213,8 +297,8 @@ class TestClient { ? std::string(metadata_hostname->second.data(), metadata_hostname->second.length()) : call->simple_response.hostname(); - for (auto watcher : watchers) { - watcher->RpcCompleted(call->saved_request_id, call->rpc_method, + for (auto watcher : stats_watchers_->watchers) { + watcher->RpcCompleted(call->saved_request_id, call->rpc_type, hostname); } } @@ -256,18 +340,22 @@ class TestClient { ClientContext context; Status status; int saved_request_id; - std::string rpc_method; + ClientConfigureRequest::RpcType rpc_type; std::unique_ptr> empty_response_reader; std::unique_ptr> simple_response_reader; }; std::unique_ptr stub_; + StatsWatchers* stats_watchers_; CompletionQueue cq_; }; class LoadBalancerStatsServiceImpl : public LoadBalancerStatsService::Service { public: + explicit LoadBalancerStatsServiceImpl(StatsWatchers* stats_watchers) + : stats_watchers_(stats_watchers) {} + Status GetClientStats(ServerContext* context, const LoadBalancerStatsRequest* request, LoadBalancerStatsResponse* response) override { @@ -275,64 +363,104 @@ class LoadBalancerStatsServiceImpl : public LoadBalancerStatsService::Service { int end_id; XdsStatsWatcher* watcher; { - std::lock_guard lk(mu); - start_id = global_request_id + 1; + std::lock_guard lock(stats_watchers_->mu); + start_id = stats_watchers_->global_request_id + 1; end_id = start_id + request->num_rpcs(); watcher = new XdsStatsWatcher(start_id, end_id); - watchers.insert(watcher); + stats_watchers_->watchers.insert(watcher); } watcher->WaitForRpcStatsResponse(response, request->timeout_sec()); { - std::lock_guard lk(mu); - watchers.erase(watcher); + std::lock_guard lock(stats_watchers_->mu); + stats_watchers_->watchers.erase(watcher); } delete watcher; return Status::OK; } + + Status GetClientAccumulatedStats( + ServerContext* context, + const LoadBalancerAccumulatedStatsRequest* request, + LoadBalancerAccumulatedStatsResponse* response) override { + std::lock_guard lock(stats_watchers_->mu); + stats_watchers_->global_watcher->GetCurrentRpcStats(response, + stats_watchers_); + return Status::OK; + } + + private: + StatsWatchers* stats_watchers_; }; -void RunTestLoop(std::chrono::duration duration_per_query) { - std::vector rpc_methods = - absl::StrSplit(absl::GetFlag(FLAGS_rpc), ',', absl::SkipEmpty()); - // Store Metadata like - // "EmptyCall:key1:value1,UnaryCall:key1:value1,UnaryCall:key2:value2" into a - // map where the key is the RPC method and value is a vector of key:value - // pairs. {EmptyCall, [{key1,value1}], - // UnaryCall, [{key1,value1}, {key2,value2}]} - std::vector rpc_metadata = - absl::StrSplit(absl::GetFlag(FLAGS_metadata), ',', absl::SkipEmpty()); - std::map>> - metadata_map; - for (auto& data : rpc_metadata) { - std::vector metadata = - absl::StrSplit(data, ':', absl::SkipEmpty()); - GPR_ASSERT(metadata.size() == 3); - metadata_map[metadata[0]].push_back({metadata[1], metadata[2]}); +class XdsUpdateClientConfigureServiceImpl + : public XdsUpdateClientConfigureService::Service { + public: + explicit XdsUpdateClientConfigureServiceImpl( + RpcConfigurationsQueue* rpc_configs_queue) + : rpc_configs_queue_(rpc_configs_queue) {} + + Status Configure(ServerContext* context, + const ClientConfigureRequest* request, + ClientConfigureResponse* response) override { + std::map>> + metadata_map; + for (const auto& data : request->metadata()) { + metadata_map[data.type()].push_back({data.key(), data.value()}); + } + std::vector configs; + for (const auto& rpc : request->types()) { + RpcConfig config; + config.type = static_cast(rpc); + auto metadata_iter = metadata_map.find(rpc); + if (metadata_iter != metadata_map.end()) { + config.metadata = metadata_iter->second; + } + configs.push_back(std::move(config)); + } + { + std::lock_guard lock( + rpc_configs_queue_->mu_rpc_configs_queue); + rpc_configs_queue_->rpc_configs_queue.emplace_back(std::move(configs)); + } + return Status::OK; } + + private: + RpcConfigurationsQueue* rpc_configs_queue_; +}; + +void RunTestLoop(std::chrono::duration duration_per_query, + StatsWatchers* stats_watchers, + RpcConfigurationsQueue* rpc_configs_queue) { TestClient client(grpc::CreateChannel(absl::GetFlag(FLAGS_server), - grpc::InsecureChannelCredentials())); + grpc::InsecureChannelCredentials()), + stats_watchers); std::chrono::time_point start = std::chrono::system_clock::now(); std::chrono::duration elapsed; std::thread thread = std::thread(&TestClient::AsyncCompleteRpc, &client); + std::vector configs; while (true) { - for (const std::string& rpc_method : rpc_methods) { + { + std::lock_guard lockk( + rpc_configs_queue->mu_rpc_configs_queue); + if (!rpc_configs_queue->rpc_configs_queue.empty()) { + configs = std::move(rpc_configs_queue->rpc_configs_queue.front()); + rpc_configs_queue->rpc_configs_queue.pop_front(); + } + } + for (const auto& config : configs) { elapsed = std::chrono::system_clock::now() - start; if (elapsed > duration_per_query) { start = std::chrono::system_clock::now(); - auto metadata_iter = metadata_map.find(rpc_method); - if (rpc_method == "EmptyCall") { - client.AsyncEmptyCall( - metadata_iter != metadata_map.end() - ? metadata_iter->second - : std::vector>()); + if (config.type == ClientConfigureRequest::EMPTY_CALL) { + client.AsyncEmptyCall(config.metadata); + } else if (config.type == ClientConfigureRequest::UNARY_CALL) { + client.AsyncUnaryCall(config.metadata); } else { - client.AsyncUnaryCall( - metadata_iter != metadata_map.end() - ? metadata_iter->second - : std::vector>()); + GPR_ASSERT(0); } } } @@ -340,40 +468,100 @@ void RunTestLoop(std::chrono::duration duration_per_query) { thread.join(); } -void RunServer(const int port) { +void RunServer(const int port, StatsWatchers* stats_watchers, + RpcConfigurationsQueue* rpc_configs_queue) { GPR_ASSERT(port != 0); std::ostringstream server_address; server_address << "0.0.0.0:" << port; - LoadBalancerStatsServiceImpl service; + LoadBalancerStatsServiceImpl stats_service(stats_watchers); + XdsUpdateClientConfigureServiceImpl client_config_service(rpc_configs_queue); ServerBuilder builder; - builder.RegisterService(&service); + builder.RegisterService(&stats_service); + builder.RegisterService(&client_config_service); builder.AddListeningPort(server_address.str(), grpc::InsecureServerCredentials()); std::unique_ptr server(builder.BuildAndStart()); - gpr_log(GPR_INFO, "Stats server listening on %s", - server_address.str().c_str()); + gpr_log(GPR_DEBUG, "Server listening on %s", server_address.str().c_str()); server->Wait(); } +void BuildRpcConfigsFromFlags(RpcConfigurationsQueue* rpc_configs_queue) { + // Store Metadata like + // "EmptyCall:key1:value1,UnaryCall:key1:value1,UnaryCall:key2:value2" into a + // map where the key is the RPC method and value is a vector of key:value + // pairs. {EmptyCall, [{key1,value1}], + // UnaryCall, [{key1,value1}, {key2,value2}]} + std::vector rpc_metadata = + absl::StrSplit(absl::GetFlag(FLAGS_metadata), ',', absl::SkipEmpty()); + std::map>> metadata_map; + for (auto& data : rpc_metadata) { + std::vector metadata = + absl::StrSplit(data, ':', absl::SkipEmpty()); + GPR_ASSERT(metadata.size() == 3); + if (metadata[0] == "EmptyCall") { + metadata_map[ClientConfigureRequest::EMPTY_CALL].push_back( + {metadata[1], metadata[2]}); + } else if (metadata[0] == "UnaryCall") { + metadata_map[ClientConfigureRequest::UNARY_CALL].push_back( + {metadata[1], metadata[2]}); + } else { + GPR_ASSERT(0); + } + } + std::vector configs; + std::vector rpc_methods = + absl::StrSplit(absl::GetFlag(FLAGS_rpc), ',', absl::SkipEmpty()); + for (const std::string& rpc_method : rpc_methods) { + RpcConfig config; + if (rpc_method == "EmptyCall") { + config.type = ClientConfigureRequest::EMPTY_CALL; + } else if (rpc_method == "UnaryCall") { + config.type = ClientConfigureRequest::UNARY_CALL; + } else { + GPR_ASSERT(0); + } + auto metadata_iter = metadata_map.find(config.type); + if (metadata_iter != metadata_map.end()) { + config.metadata = metadata_iter->second; + } + configs.push_back(std::move(config)); + } + { + std::lock_guard lock(rpc_configs_queue->mu_rpc_configs_queue); + rpc_configs_queue->rpc_configs_queue.emplace_back(std::move(configs)); + } +} + int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); grpc::testing::InitTest(&argc, &argv, true); + StatsWatchers stats_watchers; + RpcConfigurationsQueue rpc_config_queue; + + { + std::lock_guard lock(stats_watchers.mu); + stats_watchers.global_watcher = new XdsStatsWatcher(0, 0); + stats_watchers.watchers.insert(stats_watchers.global_watcher); + } + + BuildRpcConfigsFromFlags(&rpc_config_queue); std::chrono::duration duration_per_query = std::chrono::nanoseconds(std::chrono::seconds(1)) / absl::GetFlag(FLAGS_qps); std::vector test_threads; - test_threads.reserve(absl::GetFlag(FLAGS_num_channels)); for (int i = 0; i < absl::GetFlag(FLAGS_num_channels); i++) { - test_threads.emplace_back(std::thread(&RunTestLoop, duration_per_query)); + test_threads.emplace_back(std::thread(&RunTestLoop, duration_per_query, + &stats_watchers, &rpc_config_queue)); } - RunServer(absl::GetFlag(FLAGS_stats_port)); + RunServer(absl::GetFlag(FLAGS_stats_port), &stats_watchers, + &rpc_config_queue); for (auto it = test_threads.begin(); it != test_threads.end(); it++) { it->join(); diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 1fca4de4688..4802e96957b 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -431,6 +431,7 @@ def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction) if error_msg: + logger.debug('Progress: %s', error_msg) time.sleep(2) else: break @@ -1195,11 +1196,15 @@ def test_circuit_breaking(gcp, original_backend_service, instance_group, 'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC + int(extra_backend_service_max_requests / args.qps)), extra_backend_service_max_requests, 1) + logger.info('UNARY_CALL reached stable state (%d)', + extra_backend_service_max_requests) wait_until_rpcs_in_flight( 'EMPTY_CALL', (_WAIT_FOR_BACKEND_SEC + int(more_extra_backend_service_max_requests / args.qps)), more_extra_backend_service_max_requests, 1) + logger.info('EMPTY_CALL reached stable state (%d)', + more_extra_backend_service_max_requests) # Increment circuit breakers max_requests threshold. extra_backend_service_max_requests = 800 @@ -1213,6 +1218,9 @@ def test_circuit_breaking(gcp, original_backend_service, instance_group, 'UNARY_CALL', (_WAIT_FOR_BACKEND_SEC + int(extra_backend_service_max_requests / args.qps)), extra_backend_service_max_requests, 1) + logger.info('UNARY_CALL reached stable state after increase (%d)', + extra_backend_service_max_requests) + logger.info('success') finally: patch_url_map_backend_service(gcp, original_backend_service) patch_backend_service(gcp, original_backend_service, [instance_group])