From fc8923c0a3fd5842f5a555f9e7f4052c93cc5e80 Mon Sep 17 00:00:00 2001 From: Donna Dionne Date: Tue, 14 Jul 2020 18:23:18 -0700 Subject: [PATCH] Updating C++ interop client and server to do path matching and header matching tests. --- src/proto/grpc/testing/messages.proto | 5 + test/cpp/interop/xds_interop_client.cc | 143 +++++++++++++++++++++---- test/cpp/interop/xds_interop_server.cc | 8 ++ 3 files changed, 135 insertions(+), 21 deletions(-) diff --git a/src/proto/grpc/testing/messages.proto b/src/proto/grpc/testing/messages.proto index 5993bc6bf14..70e34277607 100644 --- a/src/proto/grpc/testing/messages.proto +++ b/src/proto/grpc/testing/messages.proto @@ -202,8 +202,13 @@ message LoadBalancerStatsRequest { } message LoadBalancerStatsResponse { + message RpcsByPeer { + // The number of completed RPCs for each peer. + map rpcs_by_peer = 1; + } // The number of completed RPCs for each peer. map rpcs_by_peer = 1; // The number of RPCs that failed to record a remote peer. int32 num_failures = 2; + map rpcs_by_method = 3; } diff --git a/test/cpp/interop/xds_interop_client.cc b/test/cpp/interop/xds_interop_client.cc index 6d7f3c21690..e8871a56fec 100644 --- a/test/cpp/interop/xds_interop_client.cc +++ b/test/cpp/interop/xds_interop_client.cc @@ -26,12 +26,15 @@ #include #include +#include "absl/strings/str_split.h" + #include #include #include #include #include +#include "src/core/lib/gpr/env.h" #include "src/proto/grpc/testing/empty.pb.h" #include "src/proto/grpc/testing/messages.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" @@ -46,6 +49,8 @@ DEFINE_int32(rpc_timeout_sec, 30, "Per RPC timeout seconds."); DEFINE_string(server, "localhost:50051", "Address of server."); DEFINE_int32(stats_port, 50052, "Port to expose peer distribution stats service."); +DEFINE_string(rpc, "UnaryCall", "a comma separated list of rpc methods."); +DEFINE_string(metadata, "", "metadata to send with the RPC."); using grpc::Channel; using grpc::ClientAsyncResponseReader; @@ -59,6 +64,7 @@ using grpc::ServerReader; using grpc::ServerReaderWriter; using grpc::ServerWriter; using grpc::Status; +using grpc::testing::Empty; using grpc::testing::LoadBalancerStatsRequest; using grpc::testing::LoadBalancerStatsResponse; using grpc::testing::LoadBalancerStatsService; @@ -81,7 +87,8 @@ class XdsStatsWatcher { XdsStatsWatcher(int start_id, int end_id) : start_id_(start_id), end_id_(end_id), rpcs_needed_(end_id - start_id) {} - void RpcCompleted(int request_id, const std::string& peer) { + void RpcCompleted(int request_id, const std::string& rpc_method, + const std::string& peer) { if (start_id_ <= request_id && request_id < end_id_) { { std::lock_guard lk(m_); @@ -89,6 +96,7 @@ class XdsStatsWatcher { no_remote_peer_++; } else { rpcs_by_peer_[peer]++; + rpcs_by_method_[rpc_method][peer]++; } rpcs_needed_--; } @@ -104,6 +112,17 @@ class XdsStatsWatcher { [this] { return rpcs_needed_ == 0; }); response->mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(), rpcs_by_peer_.end()); + auto& response_rpcs_by_method = *response->mutable_rpcs_by_method(); + for (const auto& rpc_by_method : rpcs_by_method_) { + auto& response_rpc_by_method = + response_rpcs_by_method[rpc_by_method.first]; + auto& response_rpcs_by_peer = + *response_rpc_by_method.mutable_rpcs_by_peer(); + for (const auto& rpc_by_peer : rpc_by_method.second) { + auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first]; + response_rpc_by_peer = rpc_by_peer.second; + } + } response->set_num_failures(no_remote_peer_ + rpcs_needed_); } } @@ -113,7 +132,11 @@ class XdsStatsWatcher { int end_id_; int rpcs_needed_; int no_remote_peer_ = 0; + // A map of stats keyed by peer name. std::map rpcs_by_peer_; + // A two-level map of stats keyed at top level by RPC method and second level + // by peer name. + std::map> rpcs_by_method_; std::mutex m_; std::condition_variable cv_; }; @@ -123,7 +146,8 @@ class TestClient { TestClient(const std::shared_ptr& channel) : stub_(TestService::NewStub(channel)) {} - void AsyncUnaryCall() { + void AsyncUnaryCall( + std::vector> metadata) { SimpleResponse response; int saved_request_id; { @@ -135,11 +159,41 @@ class TestClient { std::chrono::seconds(FLAGS_rpc_timeout_sec); AsyncClientCall* call = new AsyncClientCall; call->context.set_deadline(deadline); + for (const auto& data : metadata) { + call->context.AddMetadata(data.first, data.second); + } call->saved_request_id = saved_request_id; - call->response_reader = stub_->PrepareAsyncUnaryCall( + call->rpc_method = "UnaryCall"; + call->simple_response_reader = stub_->PrepareAsyncUnaryCall( &call->context, SimpleRequest::default_instance(), &cq_); - call->response_reader->StartCall(); - call->response_reader->Finish(&call->response, &call->status, (void*)call); + call->simple_response_reader->StartCall(); + call->simple_response_reader->Finish(&call->simple_response, &call->status, + (void*)call); + } + + void AsyncEmptyCall( + std::vector> metadata) { + Empty response; + int saved_request_id; + { + std::lock_guard lk(mu); + saved_request_id = ++global_request_id; + } + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + + std::chrono::seconds(FLAGS_rpc_timeout_sec); + AsyncClientCall* call = new AsyncClientCall; + call->context.set_deadline(deadline); + for (const auto& data : metadata) { + call->context.AddMetadata(data.first, data.second); + } + call->saved_request_id = saved_request_id; + call->rpc_method = "EmptyCall"; + call->empty_response_reader = stub_->PrepareAsyncEmptyCall( + &call->context, Empty::default_instance(), &cq_); + call->empty_response_reader->StartCall(); + call->empty_response_reader->Finish(&call->empty_response, &call->status, + (void*)call); } void AsyncCompleteRpc() { @@ -150,9 +204,17 @@ class TestClient { GPR_ASSERT(ok); { std::lock_guard lk(mu); + auto server_initial_metadata = call->context.GetServerInitialMetadata(); + auto metadata_hostname = + call->context.GetServerInitialMetadata().find("hostname"); + std::string hostname = + metadata_hostname != call->context.GetServerInitialMetadata().end() + ? std::string(metadata_hostname->second.data(), + metadata_hostname->second.length()) + : call->simple_response.hostname(); for (auto watcher : watchers) { - watcher->RpcCompleted(call->saved_request_id, - call->response.hostname()); + watcher->RpcCompleted(call->saved_request_id, call->rpc_method, + std::move(hostname)); } } @@ -166,9 +228,16 @@ class TestClient { } } else { if (FLAGS_print_response) { - std::cout << "Greeting: Hello world, this is " - << call->response.hostname() << ", from " - << call->context.peer() << std::endl; + auto metadata_hostname = + call->context.GetServerInitialMetadata().find("hostname"); + std::string hostname = + metadata_hostname != + call->context.GetServerInitialMetadata().end() + ? std::string(metadata_hostname->second.data(), + metadata_hostname->second.length()) + : call->simple_response.hostname(); + std::cout << "Greeting: Hello world, this is " << hostname + << ", from " << call->context.peer() << std::endl; } } @@ -178,11 +247,15 @@ class TestClient { private: struct AsyncClientCall { - SimpleResponse response; + Empty empty_response; + SimpleResponse simple_response; ClientContext context; Status status; int saved_request_id; - std::unique_ptr> response_reader; + std::string rpc_method; + std::unique_ptr> empty_response_reader; + std::unique_ptr> + simple_response_reader; }; std::unique_ptr stub_; @@ -214,10 +287,25 @@ class LoadBalancerStatsServiceImpl : public LoadBalancerStatsService::Service { } }; -void RunTestLoop(const std::string& server, - std::chrono::duration duration_per_query) { +void RunTestLoop(std::chrono::duration duration_per_query) { + std::vector rpc_methods = absl::StrSplit(FLAGS_rpc, ','); + // Store Metadata like + // "EmptyCall:key1:value1,UnaryCall:key1:value1,UnaryCall:key2:value2" into a + // map where the key is the RPC method and value is a vector of key:value + // pairs. {EmptyCall, [{key1,value1}], + // UnaryCall, [{key1,value1}, {key2,value2}]} + std::vector rpc_metadata = + absl::StrSplit(FLAGS_metadata, ','); + std::map>> + metadata_map; + for (auto& data : rpc_metadata) { + std::vector metadata = absl::StrSplit(data, ':'); + GPR_ASSERT(metadata.size() == 3); + metadata_map[std::string(metadata[0])].push_back( + {std::string(metadata[1]), std::string(metadata[2])}); + } TestClient client( - grpc::CreateChannel(server, grpc::InsecureChannelCredentials())); + grpc::CreateChannel(FLAGS_server, grpc::InsecureChannelCredentials())); std::chrono::time_point start = std::chrono::system_clock::now(); std::chrono::duration elapsed; @@ -225,10 +313,23 @@ void RunTestLoop(const std::string& server, std::thread thread = std::thread(&TestClient::AsyncCompleteRpc, &client); while (true) { - elapsed = std::chrono::system_clock::now() - start; - if (elapsed > duration_per_query) { - start = std::chrono::system_clock::now(); - client.AsyncUnaryCall(); + for (const absl::string_view& rpc_method : rpc_methods) { + elapsed = std::chrono::system_clock::now() - start; + if (elapsed > duration_per_query) { + start = std::chrono::system_clock::now(); + auto metadata_iter = metadata_map.find(std::string(rpc_method)); + if (rpc_method == "EmptyCall") { + client.AsyncEmptyCall( + metadata_iter != metadata_map.end() + ? metadata_iter->second + : std::vector>()); + } else { + client.AsyncUnaryCall( + metadata_iter != metadata_map.end() + ? metadata_iter->second + : std::vector>()); + } + } } } thread.join(); @@ -255,6 +356,7 @@ void RunServer(const int port) { int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); grpc::testing::InitTest(&argc, &argv, true); + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ROUTING", "true"); std::chrono::duration duration_per_query = std::chrono::nanoseconds(std::chrono::seconds(1)) / FLAGS_qps; @@ -263,8 +365,7 @@ int main(int argc, char** argv) { test_threads.reserve(FLAGS_num_channels); for (int i = 0; i < FLAGS_num_channels; i++) { - test_threads.emplace_back( - std::thread(&RunTestLoop, FLAGS_server, duration_per_query)); + test_threads.emplace_back(std::thread(&RunTestLoop, duration_per_query)); } RunServer(FLAGS_stats_port); diff --git a/test/cpp/interop/xds_interop_server.cc b/test/cpp/interop/xds_interop_server.cc index 59f2328cba8..11a551e3cae 100644 --- a/test/cpp/interop/xds_interop_server.cc +++ b/test/cpp/interop/xds_interop_server.cc @@ -46,6 +46,7 @@ using grpc::ServerReader; using grpc::ServerReaderWriter; using grpc::ServerWriter; using grpc::Status; +using grpc::testing::Empty; using grpc::testing::SimpleRequest; using grpc::testing::SimpleResponse; using grpc::testing::TestService; @@ -58,6 +59,13 @@ class TestServiceImpl : public TestService::Service { SimpleResponse* response) { response->set_server_id(FLAGS_server_id); response->set_hostname(hostname_); + context->AddInitialMetadata("hostname", hostname_); + return Status::OK; + } + + Status EmptyCall(ServerContext* context, const Empty* request, + Empty* response) { + context->AddInitialMetadata("hostname", hostname_); return Status::OK; }