Merge pull request #25193 from donnadionne/to_interop

xds_interop_client.cc modification: new stats and new timeout_sec config
pull/25245/head
donnadionne 4 years ago committed by GitHub
commit 191431ab03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      src/proto/grpc/testing/messages.proto
  2. 79
      test/cpp/interop/xds_interop_client.cc

@ -219,11 +219,27 @@ message LoadBalancerAccumulatedStatsRequest {}
// Accumulated stats for RPCs sent by a test client.
message LoadBalancerAccumulatedStatsResponse {
// The total number of RPCs have ever issued for each type.
map<string, int32> num_rpcs_started_by_method = 1;
// Deprecated: use stats_per_method.rpcs_started instead.
map<string, int32> num_rpcs_started_by_method = 1 [deprecated = true];
// The total number of RPCs have ever completed successfully for each type.
map<string, int32> num_rpcs_succeeded_by_method = 2;
// Deprecated: use stats_per_method.result instead.
map<string, int32> num_rpcs_succeeded_by_method = 2 [deprecated = true];
// The total number of RPCs have ever failed for each type.
map<string, int32> num_rpcs_failed_by_method = 3;
// Deprecated: use stats_per_method.result instead.
map<string, int32> num_rpcs_failed_by_method = 3 [deprecated = true];
message MethodStats {
// The number of RPCs that were started for this method.
int32 rpcs_started = 1;
// The number of RPCs that completed with each status for this method. The
// key is the integral value of a google.rpc.Code; the value is the count.
map<int32, int32> result = 2;
}
// Per-method RPC statistics. The key is the full method path; i.e.
// "/proto.package.ServiceName/MethodName".
map<string, MethodStats> stats_per_method = 4;
}
// Configurations for a test client.
@ -245,6 +261,9 @@ message ClientConfigureRequest {
repeated RpcType types = 1;
// The collection of custom metadata to be attached to RPCs sent by the client.
repeated Metadata metadata = 2;
// The deadline to use, in seconds, for all RPCs. If unset or zero, the
// client will use the default from the command-line.
int32 timeout_sec = 3;
}
// Response for updating a test client's configuration.

@ -103,6 +103,7 @@ std::atomic<bool> one_rpc_succeeded(false);
struct RpcConfig {
ClientConfigureRequest::RpcType type;
std::vector<std::pair<std::string, std::string>> metadata;
int timeout_sec = 0;
};
struct RpcConfigurationsQueue {
// A queue of RPC configurations detailing how RPCs should be sent.
@ -110,6 +111,17 @@ struct RpcConfigurationsQueue {
// Mutex for rpc_configs_queue
std::mutex mu_rpc_configs_queue;
};
struct AsyncClientCall {
Empty empty_response;
SimpleResponse simple_response;
ClientContext context;
Status status;
int saved_request_id;
ClientConfigureRequest::RpcType rpc_type;
std::unique_ptr<ClientAsyncResponseReader<Empty>> empty_response_reader;
std::unique_ptr<ClientAsyncResponseReader<SimpleResponse>>
simple_response_reader;
};
/** Records the remote peer distribution for a given range of RPCs. */
class XdsStatsWatcher {
@ -120,24 +132,35 @@ class XdsStatsWatcher {
// Upon the completion of an RPC, we will look at the request_id, the
// rpc_type, and the peer the RPC was sent to in order to count
// this RPC into the right stats bin.
void RpcCompleted(int request_id,
const ClientConfigureRequest::RpcType rpc_type,
const std::string& peer) {
void RpcCompleted(AsyncClientCall* call, const std::string& peer) {
// We count RPCs for global watcher or if the request_id falls into the
// watcher's interested range of request ids.
if ((start_id_ == 0 && end_id_ == 0) ||
(start_id_ <= request_id && request_id < end_id_)) {
(start_id_ <= call->saved_request_id &&
call->saved_request_id < end_id_)) {
{
std::lock_guard<std::mutex> lock(m_);
if (peer.empty()) {
no_remote_peer_++;
++no_remote_peer_by_type_[rpc_type];
++no_remote_peer_by_type_[call->rpc_type];
} else {
// RPC is counted into both per-peer bin and per-method-per-peer bin.
rpcs_by_peer_[peer]++;
rpcs_by_type_[rpc_type][peer]++;
rpcs_by_type_[call->rpc_type][peer]++;
}
rpcs_needed_--;
// Report accumulated stats.
auto& stats_per_method = *accumulated_stats_.mutable_stats_per_method();
auto& method_stat =
stats_per_method[ClientConfigureRequest_RpcType_Name(
call->rpc_type)];
auto& result = *method_stat.mutable_result();
grpc_status_code code =
static_cast<grpc_status_code>(call->status.error_code());
auto& num_rpcs = result[code];
++num_rpcs;
auto rpcs_started = method_stat.rpcs_started();
method_stat.set_rpcs_started(++rpcs_started);
}
cv_.notify_one();
}
@ -145,7 +168,6 @@ class XdsStatsWatcher {
void WaitForRpcStatsResponse(LoadBalancerStatsResponse* response,
int timeout_sec) {
{
std::unique_lock<std::mutex> lock(m_);
cv_.wait_for(lock, std::chrono::seconds(timeout_sec),
[this] { return rpcs_needed_ == 0; });
@ -174,11 +196,13 @@ class XdsStatsWatcher {
}
response->set_num_failures(no_remote_peer_ + rpcs_needed_);
}
}
void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response,
StatsWatchers* stats_watchers) {
std::unique_lock<std::mutex> lock(m_);
response->CopyFrom(accumulated_stats_);
// TODO(@donnadionne): delete deprecated stats below when the test is no
// longer using them.
auto& response_rpcs_started_by_method =
*response->mutable_num_rpcs_started_by_method();
auto& response_rpcs_succeeded_by_method =
@ -211,6 +235,8 @@ class XdsStatsWatcher {
// A two-level map of stats keyed at top level by RPC method and second level
// by peer name.
std::map<int, std::map<std::string, int>> rpcs_by_type_;
// Storing accumulated stats in the response proto format.
LoadBalancerAccumulatedStatsResponse accumulated_stats_;
std::mutex m_;
std::condition_variable cv_;
};
@ -221,8 +247,7 @@ class TestClient {
StatsWatchers* stats_watchers)
: stub_(TestService::NewStub(channel)), stats_watchers_(stats_watchers) {}
void AsyncUnaryCall(
std::vector<std::pair<std::string, std::string>> metadata) {
void AsyncUnaryCall(const RpcConfig& config) {
SimpleResponse response;
int saved_request_id;
{
@ -233,9 +258,11 @@ class TestClient {
}
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() +
std::chrono::seconds(absl::GetFlag(FLAGS_rpc_timeout_sec));
std::chrono::seconds(config.timeout_sec != 0
? config.timeout_sec
: absl::GetFlag(FLAGS_rpc_timeout_sec));
AsyncClientCall* call = new AsyncClientCall;
for (const auto& data : metadata) {
for (const auto& data : config.metadata) {
call->context.AddMetadata(data.first, data.second);
// TODO(@donnadionne): move deadline to separate proto.
if (data.first == "rpc-behavior" && data.second == "keep-open") {
@ -253,8 +280,7 @@ class TestClient {
call);
}
void AsyncEmptyCall(
std::vector<std::pair<std::string, std::string>> metadata) {
void AsyncEmptyCall(const RpcConfig& config) {
Empty response;
int saved_request_id;
{
@ -265,9 +291,11 @@ class TestClient {
}
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() +
std::chrono::seconds(absl::GetFlag(FLAGS_rpc_timeout_sec));
std::chrono::seconds(config.timeout_sec != 0
? config.timeout_sec
: absl::GetFlag(FLAGS_rpc_timeout_sec));
AsyncClientCall* call = new AsyncClientCall;
for (const auto& data : metadata) {
for (const auto& data : config.metadata) {
call->context.AddMetadata(data.first, data.second);
// TODO(@donnadionne): move deadline to separate proto.
if (data.first == "rpc-behavior" && data.second == "keep-open") {
@ -302,8 +330,7 @@ class TestClient {
metadata_hostname->second.length())
: call->simple_response.hostname();
for (auto watcher : stats_watchers_->watchers) {
watcher->RpcCompleted(call->saved_request_id, call->rpc_type,
hostname);
watcher->RpcCompleted(call, hostname);
}
}
@ -338,17 +365,6 @@ class TestClient {
}
private:
struct AsyncClientCall {
Empty empty_response;
SimpleResponse simple_response;
ClientContext context;
Status status;
int saved_request_id;
ClientConfigureRequest::RpcType rpc_type;
std::unique_ptr<ClientAsyncResponseReader<Empty>> empty_response_reader;
std::unique_ptr<ClientAsyncResponseReader<SimpleResponse>>
simple_response_reader;
};
static bool RpcStatusCheckSuccess(AsyncClientCall* call) {
// Determine RPC success based on expected status.
grpc_status_code code;
@ -421,6 +437,7 @@ class XdsUpdateClientConfigureServiceImpl
std::vector<RpcConfig> configs;
for (const auto& rpc : request->types()) {
RpcConfig config;
config.timeout_sec = request->timeout_sec();
config.type = static_cast<ClientConfigureRequest::RpcType>(rpc);
auto metadata_iter = metadata_map.find(rpc);
if (metadata_iter != metadata_map.end()) {
@ -468,9 +485,9 @@ void RunTestLoop(std::chrono::duration<double> duration_per_query,
start = std::chrono::system_clock::now();
for (const auto& config : configs) {
if (config.type == ClientConfigureRequest::EMPTY_CALL) {
client.AsyncEmptyCall(config.metadata);
client.AsyncEmptyCall(config);
} else if (config.type == ClientConfigureRequest::UNARY_CALL) {
client.AsyncUnaryCall(config.metadata);
client.AsyncUnaryCall(config);
} else {
GPR_ASSERT(0);
}

Loading…
Cancel
Save