[XDS Interop] Move XdsStatsWatcher to a separate file. (#34000)

This will help with introducing test coverage as the logic becomes more
complex.
pull/33996/head^2
Eugene Ostroukhov 2 years ago committed by GitHub
parent bc41f18beb
commit 18be986e3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 74
      CMakeLists.txt
  2. 28
      build_autogenerated.yaml
  3. 27
      test/cpp/interop/BUILD
  4. 188
      test/cpp/interop/xds_interop_client.cc
  5. 118
      test/cpp/interop/xds_stats_watcher.cc
  6. 104
      test/cpp/interop/xds_stats_watcher.h
  7. 71
      test/cpp/interop/xds_stats_watcher_test.cc
  8. 24
      tools/run_tests/generated/tests.json

74
CMakeLists.txt generated

@ -1456,6 +1456,7 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx xds_routing_end2end_test)
endif()
add_dependencies(buildtests_cxx xds_stats_watcher_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx xds_wrr_end2end_test)
endif()
@ -29205,6 +29206,7 @@ add_executable(xds_interop_client
src/cpp/server/csds/csds.cc
test/cpp/interop/rpc_behavior_lb_policy.cc
test/cpp/interop/xds_interop_client.cc
test/cpp/interop/xds_stats_watcher.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
@ -30888,6 +30890,78 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
endif()
endif()
if(gRPC_BUILD_TESTS)
add_executable(xds_stats_watcher_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/base.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_dump.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_dump.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_dump.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/config_dump.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/csds.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/csds.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/csds.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/csds.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/percent.grpc.pb.h
src/cpp/server/admin/admin_services.cc
src/cpp/server/csds/csds.cc
test/cpp/interop/rpc_behavior_lb_policy.cc
test/cpp/interop/xds_stats_watcher.cc
test/cpp/interop/xds_stats_watcher_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_compile_features(xds_stats_watcher_test PUBLIC cxx_std_14)
target_include_directories(xds_stats_watcher_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(xds_stats_watcher_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc++_reflection
grpcpp_channelz
grpc_test_util
grpc++_test_config
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)

@ -16514,6 +16514,7 @@ targets:
headers:
- src/cpp/server/csds/csds.h
- test/cpp/interop/rpc_behavior_lb_policy.h
- test/cpp/interop/xds_stats_watcher.h
src:
- src/proto/grpc/testing/empty.proto
- src/proto/grpc/testing/messages.proto
@ -16526,6 +16527,7 @@ targets:
- src/cpp/server/csds/csds.cc
- test/cpp/interop/rpc_behavior_lb_policy.cc
- test/cpp/interop/xds_interop_client.cc
- test/cpp/interop/xds_stats_watcher.cc
deps:
- grpc++_reflection
- grpcpp_channelz
@ -17077,6 +17079,32 @@ targets:
- linux
- posix
- mac
- name: xds_stats_watcher_test
gtest: true
build: test
language: c++
headers:
- src/cpp/server/csds/csds.h
- test/cpp/interop/rpc_behavior_lb_policy.h
- test/cpp/interop/xds_stats_watcher.h
src:
- src/proto/grpc/testing/empty.proto
- src/proto/grpc/testing/messages.proto
- src/proto/grpc/testing/test.proto
- src/proto/grpc/testing/xds/v3/base.proto
- src/proto/grpc/testing/xds/v3/config_dump.proto
- src/proto/grpc/testing/xds/v3/csds.proto
- src/proto/grpc/testing/xds/v3/percent.proto
- src/cpp/server/admin/admin_services.cc
- src/cpp/server/csds/csds.cc
- test/cpp/interop/rpc_behavior_lb_policy.cc
- test/cpp/interop/xds_stats_watcher.cc
- test/cpp/interop/xds_stats_watcher_test.cc
deps:
- grpc++_reflection
- grpcpp_channelz
- grpc_test_util
- grpc++_test_config
- name: xds_wrr_end2end_test
gtest: true
build: test

@ -235,6 +235,32 @@ sh_test(
tags = ["no_windows"],
)
grpc_cc_library(
name = "xds_stats_watcher",
srcs = ["xds_stats_watcher.cc"],
hdrs = ["xds_stats_watcher.h"],
deps = [
":rpc_behavior_lb_policy",
"//:grpc++",
"//:grpc++_reflection",
"//:grpcpp_admin",
"//src/proto/grpc/testing:empty_proto",
"//src/proto/grpc/testing:messages_proto",
"//src/proto/grpc/testing:test_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_config",
],
)
grpc_cc_test(
name = "xds_stats_watcher_test",
srcs = [
"xds_stats_watcher_test.cc",
],
external_deps = ["gtest"],
deps = [":xds_stats_watcher"],
)
grpc_cc_binary(
name = "xds_interop_client",
srcs = [
@ -245,6 +271,7 @@ grpc_cc_binary(
],
deps = [
":rpc_behavior_lb_policy",
":xds_stats_watcher",
"//:grpc++",
"//:grpc++_reflection",
"//:grpcpp_admin",

@ -46,6 +46,7 @@
#include "src/proto/grpc/testing/test.grpc.pb.h"
#include "test/core/util/test_config.h"
#include "test/cpp/interop/rpc_behavior_lb_policy.h"
#include "test/cpp/interop/xds_stats_watcher.h"
#include "test/cpp/util/test_config.h"
ABSL_FLAG(bool, fail_on_failed_rpc, false,
@ -75,8 +76,8 @@ using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::testing::AsyncClientCallResult;
using grpc::testing::ClientConfigureRequest;
using grpc::testing::ClientConfigureRequest_RpcType_Name;
using grpc::testing::ClientConfigureResponse;
using grpc::testing::Empty;
using grpc::testing::LoadBalancerAccumulatedStatsRequest;
@ -86,25 +87,22 @@ using grpc::testing::LoadBalancerStatsResponse;
using grpc::testing::LoadBalancerStatsService;
using grpc::testing::SimpleRequest;
using grpc::testing::SimpleResponse;
using grpc::testing::StatsWatchers;
using grpc::testing::TestService;
using grpc::testing::XdsStatsWatcher;
using grpc::testing::XdsUpdateClientConfigureService;
class XdsStatsWatcher;
struct StatsWatchers {
// Unique ID for each outgoing RPC
int global_request_id = 0;
// Unique ID for each outgoing RPC by RPC method type
std::map<int, int> global_request_id_by_type;
// Stores a set of watchers that should be notified upon outgoing RPC
// completion
std::set<XdsStatsWatcher*> watchers;
// Global watcher for accumululated stats.
XdsStatsWatcher* global_watcher;
// Mutex for global_request_id and watchers
std::mutex mu;
struct AsyncClientCall {
ClientContext context;
std::unique_ptr<ClientAsyncResponseReader<Empty>> empty_response_reader;
std::unique_ptr<ClientAsyncResponseReader<SimpleResponse>>
simple_response_reader;
AsyncClientCallResult result;
};
// Whether at least one RPC has succeeded, indicating xDS resolution completed.
// Whether at least one RPC has succeeded, indicating xDS resolution
// completed.
std::atomic<bool> one_rpc_succeeded(false);
// RPC configuration detailing how RPC should be sent.
struct RpcConfig {
@ -118,135 +116,6 @@ struct RpcConfigurationsQueue {
// Mutex for rpc_configs_queue
std::mutex mu_rpc_configs_queue;
};
struct AsyncClientCall {
Empty empty_response;
SimpleResponse simple_response;
ClientContext context;
Status status;
int saved_request_id;
ClientConfigureRequest::RpcType rpc_type;
std::unique_ptr<ClientAsyncResponseReader<Empty>> empty_response_reader;
std::unique_ptr<ClientAsyncResponseReader<SimpleResponse>>
simple_response_reader;
};
/// Records the remote peer distribution for a given range of RPCs.
class XdsStatsWatcher {
public:
XdsStatsWatcher(int start_id, int end_id)
: start_id_(start_id), end_id_(end_id), rpcs_needed_(end_id - start_id) {}
// Upon the completion of an RPC, we will look at the request_id, the
// rpc_type, and the peer the RPC was sent to in order to count
// this RPC into the right stats bin.
void RpcCompleted(AsyncClientCall* call, const std::string& peer) {
// We count RPCs for global watcher or if the request_id falls into the
// watcher's interested range of request ids.
if ((start_id_ == 0 && end_id_ == 0) ||
(start_id_ <= call->saved_request_id &&
call->saved_request_id < end_id_)) {
{
std::lock_guard<std::mutex> lock(m_);
if (peer.empty()) {
no_remote_peer_++;
++no_remote_peer_by_type_[call->rpc_type];
} else {
// RPC is counted into both per-peer bin and per-method-per-peer bin.
rpcs_by_peer_[peer]++;
rpcs_by_type_[call->rpc_type][peer]++;
}
rpcs_needed_--;
// Report accumulated stats.
auto& stats_per_method = *accumulated_stats_.mutable_stats_per_method();
auto& method_stat =
stats_per_method[ClientConfigureRequest_RpcType_Name(
call->rpc_type)];
auto& result = *method_stat.mutable_result();
grpc_status_code code =
static_cast<grpc_status_code>(call->status.error_code());
auto& num_rpcs = result[code];
++num_rpcs;
auto rpcs_started = method_stat.rpcs_started();
method_stat.set_rpcs_started(++rpcs_started);
}
cv_.notify_one();
}
}
void WaitForRpcStatsResponse(LoadBalancerStatsResponse* response,
int timeout_sec) {
std::unique_lock<std::mutex> lock(m_);
cv_.wait_for(lock, std::chrono::seconds(timeout_sec),
[this] { return rpcs_needed_ == 0; });
response->mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(),
rpcs_by_peer_.end());
auto& response_rpcs_by_method = *response->mutable_rpcs_by_method();
for (const auto& rpc_by_type : rpcs_by_type_) {
std::string method_name;
if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) {
method_name = "EmptyCall";
} else if (rpc_by_type.first == ClientConfigureRequest::UNARY_CALL) {
method_name = "UnaryCall";
} else {
GPR_ASSERT(0);
}
// TODO(@donnadionne): When the test runner changes to accept EMPTY_CALL
// and UNARY_CALL we will just use the name of the enum instead of the
// method_name variable.
auto& response_rpc_by_method = response_rpcs_by_method[method_name];
auto& response_rpcs_by_peer =
*response_rpc_by_method.mutable_rpcs_by_peer();
for (const auto& rpc_by_peer : rpc_by_type.second) {
auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first];
response_rpc_by_peer = rpc_by_peer.second;
}
}
response->set_num_failures(no_remote_peer_ + rpcs_needed_);
}
void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response,
StatsWatchers* stats_watchers) {
std::unique_lock<std::mutex> lock(m_);
response->CopyFrom(accumulated_stats_);
// TODO(@donnadionne): delete deprecated stats below when the test is no
// longer using them.
auto& response_rpcs_started_by_method =
*response->mutable_num_rpcs_started_by_method();
auto& response_rpcs_succeeded_by_method =
*response->mutable_num_rpcs_succeeded_by_method();
auto& response_rpcs_failed_by_method =
*response->mutable_num_rpcs_failed_by_method();
for (const auto& rpc_by_type : rpcs_by_type_) {
auto total_succeeded = 0;
for (const auto& rpc_by_peer : rpc_by_type.second) {
total_succeeded += rpc_by_peer.second;
}
response_rpcs_succeeded_by_method[ClientConfigureRequest_RpcType_Name(
rpc_by_type.first)] = total_succeeded;
response_rpcs_started_by_method[ClientConfigureRequest_RpcType_Name(
rpc_by_type.first)] =
stats_watchers->global_request_id_by_type[rpc_by_type.first];
response_rpcs_failed_by_method[ClientConfigureRequest_RpcType_Name(
rpc_by_type.first)] = no_remote_peer_by_type_[rpc_by_type.first];
}
}
private:
int start_id_;
int end_id_;
int rpcs_needed_;
int no_remote_peer_ = 0;
std::map<int, int> no_remote_peer_by_type_;
// A map of stats keyed by peer name.
std::map<std::string, int> rpcs_by_peer_;
// A two-level map of stats keyed at top level by RPC method and second level
// by peer name.
std::map<int, std::map<std::string, int>> rpcs_by_type_;
// Storing accumulated stats in the response proto format.
LoadBalancerAccumulatedStatsResponse accumulated_stats_;
std::mutex m_;
std::condition_variable cv_;
};
class TestClient {
public:
@ -278,13 +147,13 @@ class TestClient {
}
}
call->context.set_deadline(deadline);
call->saved_request_id = saved_request_id;
call->rpc_type = ClientConfigureRequest::UNARY_CALL;
call->result.saved_request_id = saved_request_id;
call->result.rpc_type = ClientConfigureRequest::UNARY_CALL;
call->simple_response_reader = stub_->PrepareAsyncUnaryCall(
&call->context, SimpleRequest::default_instance(), &cq_);
call->simple_response_reader->StartCall();
call->simple_response_reader->Finish(&call->simple_response, &call->status,
call);
call->simple_response_reader->Finish(&call->result.simple_response,
&call->result.status, call);
}
void AsyncEmptyCall(const RpcConfig& config) {
@ -311,13 +180,13 @@ class TestClient {
}
}
call->context.set_deadline(deadline);
call->saved_request_id = saved_request_id;
call->rpc_type = ClientConfigureRequest::EMPTY_CALL;
call->result.saved_request_id = saved_request_id;
call->result.rpc_type = ClientConfigureRequest::EMPTY_CALL;
call->empty_response_reader = stub_->PrepareAsyncEmptyCall(
&call->context, Empty::default_instance(), &cq_);
call->empty_response_reader->StartCall();
call->empty_response_reader->Finish(&call->empty_response, &call->status,
call);
call->empty_response_reader->Finish(&call->result.empty_response,
&call->result.status, call);
}
void AsyncCompleteRpc() {
@ -335,17 +204,17 @@ class TestClient {
metadata_hostname != call->context.GetServerInitialMetadata().end()
? std::string(metadata_hostname->second.data(),
metadata_hostname->second.length())
: call->simple_response.hostname();
: call->result.simple_response.hostname();
for (auto watcher : stats_watchers_->watchers) {
watcher->RpcCompleted(call, hostname);
watcher->RpcCompleted(call->result, hostname);
}
}
if (!RpcStatusCheckSuccess(call)) {
if (absl::GetFlag(FLAGS_print_response) ||
absl::GetFlag(FLAGS_fail_on_failed_rpc)) {
std::cout << "RPC failed: " << call->status.error_code() << ": "
<< call->status.error_message() << std::endl;
std::cout << "RPC failed: " << call->result.status.error_code()
<< ": " << call->result.status.error_message() << std::endl;
}
if (absl::GetFlag(FLAGS_fail_on_failed_rpc) &&
one_rpc_succeeded.load()) {
@ -360,7 +229,7 @@ class TestClient {
call->context.GetServerInitialMetadata().end()
? std::string(metadata_hostname->second.data(),
metadata_hostname->second.length())
: call->simple_response.hostname();
: call->result.simple_response.hostname();
std::cout << "Greeting: Hello world, this is " << hostname
<< ", from " << call->context.peer() << std::endl;
}
@ -377,7 +246,8 @@ class TestClient {
grpc_status_code code;
GPR_ASSERT(grpc_status_code_from_string(
absl::GetFlag(FLAGS_expect_status).c_str(), &code));
return code == static_cast<grpc_status_code>(call->status.error_code());
return code ==
static_cast<grpc_status_code>(call->result.status.error_code());
}
std::unique_ptr<TestService::Stub> stub_;

@ -0,0 +1,118 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/cpp/interop/xds_stats_watcher.h"
#include <map>
namespace grpc {
namespace testing {
XdsStatsWatcher::XdsStatsWatcher(int start_id, int end_id)
: start_id_(start_id), end_id_(end_id), rpcs_needed_(end_id - start_id) {}
void XdsStatsWatcher::RpcCompleted(const AsyncClientCallResult& call,
const std::string& peer) {
// We count RPCs for global watcher or if the request_id falls into the
// watcher's interested range of request ids.
if ((start_id_ == 0 && end_id_ == 0) ||
(start_id_ <= call.saved_request_id && call.saved_request_id < end_id_)) {
{
std::lock_guard<std::mutex> lock(m_);
if (peer.empty()) {
no_remote_peer_++;
++no_remote_peer_by_type_[call.rpc_type];
} else {
// RPC is counted into both per-peer bin and per-method-per-peer bin.
rpcs_by_peer_[peer]++;
rpcs_by_type_[call.rpc_type][peer]++;
}
rpcs_needed_--;
// Report accumulated stats.
auto& stats_per_method = *accumulated_stats_.mutable_stats_per_method();
auto& method_stat =
stats_per_method[ClientConfigureRequest_RpcType_Name(call.rpc_type)];
auto& result = *method_stat.mutable_result();
grpc_status_code code =
static_cast<grpc_status_code>(call.status.error_code());
auto& num_rpcs = result[code];
++num_rpcs;
auto rpcs_started = method_stat.rpcs_started();
method_stat.set_rpcs_started(++rpcs_started);
}
cv_.notify_one();
}
}
void XdsStatsWatcher::WaitForRpcStatsResponse(
LoadBalancerStatsResponse* response, int timeout_sec) {
std::unique_lock<std::mutex> lock(m_);
cv_.wait_for(lock, std::chrono::seconds(timeout_sec),
[this] { return rpcs_needed_ == 0; });
response->mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(),
rpcs_by_peer_.end());
auto& response_rpcs_by_method = *response->mutable_rpcs_by_method();
for (const auto& rpc_by_type : rpcs_by_type_) {
std::string method_name;
if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) {
method_name = "EmptyCall";
} else if (rpc_by_type.first == ClientConfigureRequest::UNARY_CALL) {
method_name = "UnaryCall";
} else {
GPR_ASSERT(0);
}
// TODO(@donnadionne): When the test runner changes to accept EMPTY_CALL
// and UNARY_CALL we will just use the name of the enum instead of the
// method_name variable.
auto& response_rpc_by_method = response_rpcs_by_method[method_name];
auto& response_rpcs_by_peer =
*response_rpc_by_method.mutable_rpcs_by_peer();
for (const auto& rpc_by_peer : rpc_by_type.second) {
auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first];
response_rpc_by_peer = rpc_by_peer.second;
}
}
response->set_num_failures(no_remote_peer_ + rpcs_needed_);
}
void XdsStatsWatcher::GetCurrentRpcStats(
LoadBalancerAccumulatedStatsResponse* response,
StatsWatchers* stats_watchers) {
std::unique_lock<std::mutex> lock(m_);
response->CopyFrom(accumulated_stats_);
// TODO(@donnadionne): delete deprecated stats below when the test is no
// longer using them.
auto& response_rpcs_started_by_method =
*response->mutable_num_rpcs_started_by_method();
auto& response_rpcs_succeeded_by_method =
*response->mutable_num_rpcs_succeeded_by_method();
auto& response_rpcs_failed_by_method =
*response->mutable_num_rpcs_failed_by_method();
for (const auto& rpc_by_type : rpcs_by_type_) {
auto total_succeeded = 0;
for (const auto& rpc_by_peer : rpc_by_type.second) {
total_succeeded += rpc_by_peer.second;
}
response_rpcs_succeeded_by_method[ClientConfigureRequest_RpcType_Name(
rpc_by_type.first)] = total_succeeded;
response_rpcs_started_by_method[ClientConfigureRequest_RpcType_Name(
rpc_by_type.first)] =
stats_watchers->global_request_id_by_type[rpc_by_type.first];
response_rpcs_failed_by_method[ClientConfigureRequest_RpcType_Name(
rpc_by_type.first)] = no_remote_peer_by_type_[rpc_by_type.first];
}
}
} // namespace testing
} // namespace grpc

@ -0,0 +1,104 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_TEST_CPP_INTEROP_XDS_STATS_WATCHER_H
#define GRPC_TEST_CPP_INTEROP_XDS_STATS_WATCHER_H
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <deque>
#include <map>
#include <mutex>
#include <set>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
#include "absl/status/status.h"
#include <grpcpp/grpcpp.h>
#include "src/proto/grpc/testing/empty.pb.h"
#include "src/proto/grpc/testing/messages.pb.h"
namespace grpc {
namespace testing {
class XdsStatsWatcher;
struct AsyncClientCallResult {
Empty empty_response;
SimpleResponse simple_response;
Status status;
int saved_request_id;
ClientConfigureRequest::RpcType rpc_type;
};
struct StatsWatchers {
// Unique ID for each outgoing RPC
int global_request_id = 0;
// Unique ID for each outgoing RPC by RPC method type
std::map<int, int> global_request_id_by_type;
// Stores a set of watchers that should be notified upon outgoing RPC
// completion
std::set<XdsStatsWatcher*> watchers;
// Global watcher for accumululated stats.
XdsStatsWatcher* global_watcher;
// Mutex for global_request_id and watchers
std::mutex mu;
};
/// Records the remote peer distribution for a given range of RPCs.
class XdsStatsWatcher {
public:
XdsStatsWatcher(int start_id, int end_id);
// Upon the completion of an RPC, we will look at the request_id, the
// rpc_type, and the peer the RPC was sent to in order to count
// this RPC into the right stats bin.
void RpcCompleted(const AsyncClientCallResult& call, const std::string& peer);
void WaitForRpcStatsResponse(LoadBalancerStatsResponse* response,
int timeout_sec);
void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response,
StatsWatchers* stats_watchers);
private:
int start_id_;
int end_id_;
int rpcs_needed_;
int no_remote_peer_ = 0;
std::map<int, int> no_remote_peer_by_type_;
// A map of stats keyed by peer name.
std::map<std::string, int> rpcs_by_peer_;
// A two-level map of stats keyed at top level by RPC method and second level
// by peer name.
std::map<int, std::map<std::string, int>> rpcs_by_type_;
// Storing accumulated stats in the response proto format.
LoadBalancerAccumulatedStatsResponse accumulated_stats_;
std::mutex m_;
std::condition_variable cv_;
};
} // namespace testing
} // namespace grpc
#endif // GRPC_TEST_CPP_INTEROP_XDS_STATS_WATCHER_H

@ -0,0 +1,71 @@
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "test/cpp/interop/xds_stats_watcher.h"
#include <map>
#include <memory>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <grpc/grpc.h>
#include "test/core/util/test_config.h"
namespace grpc {
namespace testing {
namespace {
AsyncClientCallResult BuildCallResult(int saved_request_id) {
AsyncClientCallResult result;
result.saved_request_id = saved_request_id;
result.rpc_type = ClientConfigureRequest::UNARY_CALL;
return result;
}
TEST(XdsStatsWatcherTest, CollectsMetadata) {
XdsStatsWatcher watcher(0, 3);
watcher.RpcCompleted(BuildCallResult(0), "peer1");
watcher.RpcCompleted(BuildCallResult(1), "peer1");
watcher.RpcCompleted(BuildCallResult(2), "peer2");
LoadBalancerStatsResponse lb_response;
watcher.WaitForRpcStatsResponse(&lb_response, 1);
EXPECT_EQ(
(std::multimap<std::string, int32_t>(lb_response.rpcs_by_peer().begin(),
lb_response.rpcs_by_peer().end())),
(std::multimap<std::string, int32_t>({{"peer1", 2}, {"peer2", 1}})));
EXPECT_EQ(lb_response.rpcs_by_method_size(), 1);
auto rpcs = lb_response.rpcs_by_method().find("UnaryCall");
EXPECT_NE(rpcs, lb_response.rpcs_by_method().end());
std::multimap<std::string, int32_t> by_peer(
rpcs->second.rpcs_by_peer().begin(), rpcs->second.rpcs_by_peer().end());
EXPECT_EQ(
by_peer,
(std::multimap<std::string, int32_t>({{"peer1", 2}, {"peer2", 1}})));
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
grpc_init();
auto result = RUN_ALL_TESTS();
grpc_shutdown();
return result;
}

@ -11505,6 +11505,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "xds_stats_watcher_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save