From 627f5d5f4756c2886d5556a8b11ba985b200a344 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Mon, 2 Mar 2020 18:30:57 -0800 Subject: [PATCH] xds interop client: use async api and initialize an int fixes two bugs with the client, with an uninitialized int and where the "timer" used to approximate a QPS would block waiting for a synchronous RPC to timeout, which dramatically limited the QPS. --- test/cpp/interop/xds_interop_client.cc | 65 +++++++++++++++++++------- 1 file changed, 48 insertions(+), 17 deletions(-) diff --git a/test/cpp/interop/xds_interop_client.cc b/test/cpp/interop/xds_interop_client.cc index 7d6233de5e8..30811267a90 100644 --- a/test/cpp/interop/xds_interop_client.cc +++ b/test/cpp/interop/xds_interop_client.cc @@ -47,7 +47,9 @@ DEFINE_int32(stats_port, 50052, "Port to expose peer distribution stats service."); using grpc::Channel; +using grpc::ClientAsyncResponseReader; using grpc::ClientContext; +using grpc::CompletionQueue; using grpc::Server; using grpc::ServerBuilder; using grpc::ServerContext; @@ -109,8 +111,8 @@ class XdsStatsWatcher { int start_id_; int end_id_; int rpcs_needed_; + int no_remote_peer_ = 0; std::map rpcs_by_peer_; - int no_remote_peer_; std::mutex m_; std::condition_variable cv_; }; @@ -120,7 +122,7 @@ class TestClient { TestClient(const std::shared_ptr& channel) : stub_(TestService::NewStub(channel)) {} - void UnaryCall() { + void AsyncUnaryCall() { SimpleResponse response; ClientContext context; @@ -133,29 +135,55 @@ class TestClient { std::chrono::system_clock::now() + std::chrono::seconds(FLAGS_rpc_timeout_sec); context.set_deadline(deadline); - Status status = stub_->UnaryCall( - &context, SimpleRequest::default_instance(), &response); - { - std::lock_guard lk(mu); - for (auto watcher : watchers) { - watcher->RpcCompleted(saved_request_id, response.hostname()); + AsyncClientCall* call = new AsyncClientCall; + call->saved_request_id = saved_request_id; + call->response_reader = stub_->PrepareAsyncUnaryCall( + &call->context, SimpleRequest::default_instance(), &cq_); + call->response_reader->StartCall(); + call->response_reader->Finish(&call->response, &call->status, (void*)call); + } + + void AsyncCompleteRpc() { + void* got_tag; + bool ok = false; + while (cq_.Next(&got_tag, &ok)) { + AsyncClientCall* call = static_cast(got_tag); + GPR_ASSERT(ok); + { + std::lock_guard lk(mu); + for (auto watcher : watchers) { + watcher->RpcCompleted(call->saved_request_id, + call->response.hostname()); + } } - } - if (FLAGS_print_response) { - if (status.ok()) { - std::cout << "Greeting: Hello world, this is " << response.hostname() - << ", from " << context.peer() << std::endl; - } else { - std::cout << "RPC failed: " << status.error_code() << ": " - << status.error_message() << std::endl; + if (FLAGS_print_response) { + if (call->status.ok()) { + std::cout << "Greeting: Hello world, this is " + << call->response.hostname() << ", from " + << call->context.peer() << std::endl; + } else { + std::cout << "RPC failed: " << call->status.error_code() << ": " + << call->status.error_message() << std::endl; + } } + + delete call; } } private: + struct AsyncClientCall { + SimpleResponse response; + ClientContext context; + Status status; + int saved_request_id; + std::unique_ptr> response_reader; + }; + std::unique_ptr stub_; + CompletionQueue cq_; }; class LoadBalancerStatsServiceImpl : public LoadBalancerStatsService::Service { @@ -191,13 +219,16 @@ void RunTestLoop(const std::string& server, std::chrono::system_clock::now(); std::chrono::duration elapsed; + std::thread thread = std::thread(&TestClient::AsyncCompleteRpc, &client); + while (true) { elapsed = std::chrono::system_clock::now() - start; if (elapsed > duration_per_query) { start = std::chrono::system_clock::now(); - client.UnaryCall(); + client.AsyncUnaryCall(); } } + thread.join(); } void RunServer(const int port) {