From b2443df1e197000015fca5f23dbc110afed4ec1d Mon Sep 17 00:00:00 2001 From: Donna Dionne Date: Thu, 21 Jan 2021 15:23:02 -0800 Subject: [PATCH] xds_interop_client.cc modification: new stats and new timeout_sec config --- src/proto/grpc/testing/messages.proto | 25 ++++- test/cpp/interop/xds_interop_client.cc | 129 ++++++++++++++----------- 2 files changed, 95 insertions(+), 59 deletions(-) diff --git a/src/proto/grpc/testing/messages.proto b/src/proto/grpc/testing/messages.proto index 5acc2f0eadb..cfb1222ac82 100644 --- a/src/proto/grpc/testing/messages.proto +++ b/src/proto/grpc/testing/messages.proto @@ -219,11 +219,27 @@ message LoadBalancerAccumulatedStatsRequest {} // Accumulated stats for RPCs sent by a test client. message LoadBalancerAccumulatedStatsResponse { // The total number of RPCs have ever issued for each type. - map num_rpcs_started_by_method = 1; + // Deprecated: use stats_per_method.rpcs_started instead. + map num_rpcs_started_by_method = 1 [deprecated = true]; // The total number of RPCs have ever completed successfully for each type. - map num_rpcs_succeeded_by_method = 2; + // Deprecated: use stats_per_method.result instead. + map num_rpcs_succeeded_by_method = 2 [deprecated = true]; // The total number of RPCs have ever failed for each type. - map num_rpcs_failed_by_method = 3; + // Deprecated: use stats_per_method.result instead. + map num_rpcs_failed_by_method = 3 [deprecated = true]; + + message MethodStats { + // The number of RPCs that were started for this method. + int32 rpcs_started = 1; + + // The number of RPCs that completed with each status for this method. The + // key is the integral value of a google.rpc.Code; the value is the count. + map result = 2; + } + + // Per-method RPC statistics. The key is the full method path; i.e. + // "/proto.package.ServiceName/MethodName". + map stats_per_method = 4; } // Configurations for a test client. @@ -245,6 +261,9 @@ message ClientConfigureRequest { repeated RpcType types = 1; // The collection of custom metadata to be attached to RPCs sent by the client. repeated Metadata metadata = 2; + // The deadline to use, in seconds, for all RPCs. If unset or zero, the + // client will use the default from the command-line. + int32 timeout_sec = 3; } // Response for updating a test client's configuration. diff --git a/test/cpp/interop/xds_interop_client.cc b/test/cpp/interop/xds_interop_client.cc index 5439046afa2..e0dad44b782 100644 --- a/test/cpp/interop/xds_interop_client.cc +++ b/test/cpp/interop/xds_interop_client.cc @@ -103,6 +103,7 @@ std::atomic one_rpc_succeeded(false); struct RpcConfig { ClientConfigureRequest::RpcType type; std::vector> metadata; + int timeout_sec = 0; }; struct RpcConfigurationsQueue { // A queue of RPC configurations detailing how RPCs should be sent. @@ -110,6 +111,17 @@ struct RpcConfigurationsQueue { // Mutex for rpc_configs_queue std::mutex mu_rpc_configs_queue; }; +struct AsyncClientCall { + Empty empty_response; + SimpleResponse simple_response; + ClientContext context; + Status status; + int saved_request_id; + ClientConfigureRequest::RpcType rpc_type; + std::unique_ptr> empty_response_reader; + std::unique_ptr> + simple_response_reader; +}; /** Records the remote peer distribution for a given range of RPCs. */ class XdsStatsWatcher { @@ -120,24 +132,35 @@ class XdsStatsWatcher { // Upon the completion of an RPC, we will look at the request_id, the // rpc_type, and the peer the RPC was sent to in order to count // this RPC into the right stats bin. - void RpcCompleted(int request_id, - const ClientConfigureRequest::RpcType rpc_type, - const std::string& peer) { + void RpcCompleted(AsyncClientCall* call, const std::string& peer) { // We count RPCs for global watcher or if the request_id falls into the // watcher's interested range of request ids. if ((start_id_ == 0 && end_id_ == 0) || - (start_id_ <= request_id && request_id < end_id_)) { + (start_id_ <= call->saved_request_id && + call->saved_request_id < end_id_)) { { std::lock_guard lock(m_); if (peer.empty()) { no_remote_peer_++; - ++no_remote_peer_by_type_[rpc_type]; + ++no_remote_peer_by_type_[call->rpc_type]; } else { // RPC is counted into both per-peer bin and per-method-per-peer bin. rpcs_by_peer_[peer]++; - rpcs_by_type_[rpc_type][peer]++; + rpcs_by_type_[call->rpc_type][peer]++; } rpcs_needed_--; + // Report accumulated stats. + auto& stats_per_method = *accumulated_stats_.mutable_stats_per_method(); + auto& method_stat = + stats_per_method[ClientConfigureRequest_RpcType_Name( + call->rpc_type)]; + auto& result = *method_stat.mutable_result(); + grpc_status_code code = + static_cast(call->status.error_code()); + auto& num_rpcs = result[code]; + ++num_rpcs; + auto rpcs_started = method_stat.rpcs_started(); + method_stat.set_rpcs_started(++rpcs_started); } cv_.notify_one(); } @@ -145,40 +168,41 @@ class XdsStatsWatcher { void WaitForRpcStatsResponse(LoadBalancerStatsResponse* response, int timeout_sec) { - { - std::unique_lock lock(m_); - cv_.wait_for(lock, std::chrono::seconds(timeout_sec), - [this] { return rpcs_needed_ == 0; }); - response->mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(), - rpcs_by_peer_.end()); - auto& response_rpcs_by_method = *response->mutable_rpcs_by_method(); - for (const auto& rpc_by_type : rpcs_by_type_) { - std::string method_name; - if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) { - method_name = "EmptyCall"; - } else if (rpc_by_type.first == ClientConfigureRequest::UNARY_CALL) { - method_name = "UnaryCall"; - } else { - GPR_ASSERT(0); - } - // TODO(@donnadionne): When the test runner changes to accept EMPTY_CALL - // and UNARY_CALL we will just use the name of the enum instead of the - // method_name variable. - auto& response_rpc_by_method = response_rpcs_by_method[method_name]; - auto& response_rpcs_by_peer = - *response_rpc_by_method.mutable_rpcs_by_peer(); - for (const auto& rpc_by_peer : rpc_by_type.second) { - auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first]; - response_rpc_by_peer = rpc_by_peer.second; - } + std::unique_lock lock(m_); + cv_.wait_for(lock, std::chrono::seconds(timeout_sec), + [this] { return rpcs_needed_ == 0; }); + response->mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(), + rpcs_by_peer_.end()); + auto& response_rpcs_by_method = *response->mutable_rpcs_by_method(); + for (const auto& rpc_by_type : rpcs_by_type_) { + std::string method_name; + if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) { + method_name = "EmptyCall"; + } else if (rpc_by_type.first == ClientConfigureRequest::UNARY_CALL) { + method_name = "UnaryCall"; + } else { + GPR_ASSERT(0); + } + // TODO(@donnadionne): When the test runner changes to accept EMPTY_CALL + // and UNARY_CALL we will just use the name of the enum instead of the + // method_name variable. + auto& response_rpc_by_method = response_rpcs_by_method[method_name]; + auto& response_rpcs_by_peer = + *response_rpc_by_method.mutable_rpcs_by_peer(); + for (const auto& rpc_by_peer : rpc_by_type.second) { + auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first]; + response_rpc_by_peer = rpc_by_peer.second; } - response->set_num_failures(no_remote_peer_ + rpcs_needed_); } + response->set_num_failures(no_remote_peer_ + rpcs_needed_); } void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response, StatsWatchers* stats_watchers) { std::unique_lock lock(m_); + response->CopyFrom(accumulated_stats_); + // TODO(@donnadionne): delete deprecated stats below when the test is no + // longer using them. auto& response_rpcs_started_by_method = *response->mutable_num_rpcs_started_by_method(); auto& response_rpcs_succeeded_by_method = @@ -211,6 +235,8 @@ class XdsStatsWatcher { // A two-level map of stats keyed at top level by RPC method and second level // by peer name. std::map> rpcs_by_type_; + // Storing accumulated stats in the response proto format. + LoadBalancerAccumulatedStatsResponse accumulated_stats_; std::mutex m_; std::condition_variable cv_; }; @@ -221,8 +247,7 @@ class TestClient { StatsWatchers* stats_watchers) : stub_(TestService::NewStub(channel)), stats_watchers_(stats_watchers) {} - void AsyncUnaryCall( - std::vector> metadata) { + void AsyncUnaryCall(const RpcConfig& config) { SimpleResponse response; int saved_request_id; { @@ -233,9 +258,11 @@ class TestClient { } std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + - std::chrono::seconds(absl::GetFlag(FLAGS_rpc_timeout_sec)); + std::chrono::seconds(config.timeout_sec != 0 + ? config.timeout_sec + : absl::GetFlag(FLAGS_rpc_timeout_sec)); AsyncClientCall* call = new AsyncClientCall; - for (const auto& data : metadata) { + for (const auto& data : config.metadata) { call->context.AddMetadata(data.first, data.second); // TODO(@donnadionne): move deadline to separate proto. if (data.first == "rpc-behavior" && data.second == "keep-open") { @@ -253,8 +280,7 @@ class TestClient { call); } - void AsyncEmptyCall( - std::vector> metadata) { + void AsyncEmptyCall(const RpcConfig& config) { Empty response; int saved_request_id; { @@ -265,9 +291,11 @@ class TestClient { } std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + - std::chrono::seconds(absl::GetFlag(FLAGS_rpc_timeout_sec)); + std::chrono::seconds(config.timeout_sec != 0 + ? config.timeout_sec + : absl::GetFlag(FLAGS_rpc_timeout_sec)); AsyncClientCall* call = new AsyncClientCall; - for (const auto& data : metadata) { + for (const auto& data : config.metadata) { call->context.AddMetadata(data.first, data.second); // TODO(@donnadionne): move deadline to separate proto. if (data.first == "rpc-behavior" && data.second == "keep-open") { @@ -302,8 +330,7 @@ class TestClient { metadata_hostname->second.length()) : call->simple_response.hostname(); for (auto watcher : stats_watchers_->watchers) { - watcher->RpcCompleted(call->saved_request_id, call->rpc_type, - hostname); + watcher->RpcCompleted(call, hostname); } } @@ -338,17 +365,6 @@ class TestClient { } private: - struct AsyncClientCall { - Empty empty_response; - SimpleResponse simple_response; - ClientContext context; - Status status; - int saved_request_id; - ClientConfigureRequest::RpcType rpc_type; - std::unique_ptr> empty_response_reader; - std::unique_ptr> - simple_response_reader; - }; static bool RpcStatusCheckSuccess(AsyncClientCall* call) { // Determine RPC success based on expected status. grpc_status_code code; @@ -421,6 +437,7 @@ class XdsUpdateClientConfigureServiceImpl std::vector configs; for (const auto& rpc : request->types()) { RpcConfig config; + config.timeout_sec = request->timeout_sec(); config.type = static_cast(rpc); auto metadata_iter = metadata_map.find(rpc); if (metadata_iter != metadata_map.end()) { @@ -468,9 +485,9 @@ void RunTestLoop(std::chrono::duration duration_per_query, start = std::chrono::system_clock::now(); for (const auto& config : configs) { if (config.type == ClientConfigureRequest::EMPTY_CALL) { - client.AsyncEmptyCall(config.metadata); + client.AsyncEmptyCall(config); } else if (config.type == ClientConfigureRequest::UNARY_CALL) { - client.AsyncUnaryCall(config.metadata); + client.AsyncUnaryCall(config); } else { GPR_ASSERT(0); }