diff --git a/.clang-format b/.clang-format index 608eea14056..64387e9e515 100644 --- a/.clang-format +++ b/.clang-format @@ -46,4 +46,8 @@ Language: ObjC BasedOnStyle: Google ColumnLimit: 100 ObjCBlockIndentWidth: 2 +--- +Language: Proto +BasedOnStyle: Google +ColumnLimit: 100 ... diff --git a/src/proto/grpc/testing/messages.proto b/src/proto/grpc/testing/messages.proto index 33d07f47fdd..d948ef6a929 100644 --- a/src/proto/grpc/testing/messages.proto +++ b/src/proto/grpc/testing/messages.proto @@ -103,9 +103,6 @@ message SimpleRequest { // If set the server should record this metrics report data for the current RPC. TestOrcaReport orca_per_query_report = 11; - - // If set the server should update this metrics report data at the OOB server. - TestOrcaReport orca_oob_report = 12; } // Unary response, as configured by the request. @@ -210,9 +207,26 @@ message LoadBalancerStatsRequest { int32 num_rpcs = 1; // If num_rpcs have not completed within timeout_sec, return partial results. int32 timeout_sec = 2; + // response header+trailer we want the values of + repeated string metadata_keys = 3; } message LoadBalancerStatsResponse { + message MetadataEntry { + string key = 1; + string value = 2; + } + message RpcMetadata { + // metadata values for each rpc for the keys specified in + // LoadBalancerStatsRequest.metadata_keys. + // metadata keys and values are returned exactly as was recieved + // from the server. + repeated MetadataEntry metadata = 1; + } + message MetadataByPeer { + // List of RpcMetadata in for each RPC with a given peer + repeated RpcMetadata rpc_metadata = 1; + } message RpcsByPeer { // The number of completed RPCs for each peer. map rpcs_by_peer = 1; @@ -222,6 +236,8 @@ message LoadBalancerStatsResponse { // The number of RPCs that failed to record a remote peer. int32 num_failures = 2; map rpcs_by_method = 3; + // All the metadata of all RPCs for each peer. + map metadatas_by_peer = 4; } // Request for retrieving a test client's accumulated stats. diff --git a/test/cpp/interop/xds_interop_client.cc b/test/cpp/interop/xds_interop_client.cc index e8df4a40b21..9b6fe30e7ca 100644 --- a/test/cpp/interop/xds_interop_client.cc +++ b/test/cpp/interop/xds_interop_client.cc @@ -21,11 +21,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include "absl/algorithm/container.h" @@ -206,7 +208,8 @@ class TestClient { metadata_hostname->second.length()) : call->result.simple_response.hostname(); for (auto watcher : stats_watchers_->watchers) { - watcher->RpcCompleted(call->result, hostname); + watcher->RpcCompleted(call->result, hostname, + call->context.GetServerInitialMetadata()); } } @@ -265,20 +268,22 @@ class LoadBalancerStatsServiceImpl : public LoadBalancerStatsService::Service { LoadBalancerStatsResponse* response) override { int start_id; int end_id; - XdsStatsWatcher* watcher; + std::unique_ptr watcher; { std::lock_guard lock(stats_watchers_->mu); start_id = stats_watchers_->global_request_id + 1; end_id = start_id + request->num_rpcs(); - watcher = new XdsStatsWatcher(start_id, end_id); - stats_watchers_->watchers.insert(watcher); + watcher = std::make_unique( + start_id, end_id, + std::vector(request->metadata_keys().begin(), + request->metadata_keys().end())); + stats_watchers_->watchers.insert(watcher.get()); } - watcher->WaitForRpcStatsResponse(response, request->timeout_sec()); + *response = watcher->WaitForRpcStatsResponse(request->timeout_sec()); { std::lock_guard lock(stats_watchers_->mu); - stats_watchers_->watchers.erase(watcher); + stats_watchers_->watchers.erase(watcher.get()); } - delete watcher; return Status::OK; } @@ -356,8 +361,7 @@ void RunTestLoop(std::chrono::duration duration_per_query, std::vector configs; while (true) { { - std::lock_guard lockk( - rpc_configs_queue->mu_rpc_configs_queue); + std::lock_guard lock(rpc_configs_queue->mu_rpc_configs_queue); if (!rpc_configs_queue->rpc_configs_queue.empty()) { configs = std::move(rpc_configs_queue->rpc_configs_queue.front()); rpc_configs_queue->rpc_configs_queue.pop_front(); @@ -464,7 +468,7 @@ int main(int argc, char** argv) { { std::lock_guard lock(stats_watchers.mu); - stats_watchers.global_watcher = new XdsStatsWatcher(0, 0); + stats_watchers.global_watcher = new XdsStatsWatcher(0, 0, {}); stats_watchers.watchers.insert(stats_watchers.global_watcher); } diff --git a/test/cpp/interop/xds_stats_watcher.cc b/test/cpp/interop/xds_stats_watcher.cc index e6b1f80908e..1a5dec9063f 100644 --- a/test/cpp/interop/xds_stats_watcher.cc +++ b/test/cpp/interop/xds_stats_watcher.cc @@ -19,11 +19,36 @@ namespace grpc { namespace testing { -XdsStatsWatcher::XdsStatsWatcher(int start_id, int end_id) - : start_id_(start_id), end_id_(end_id), rpcs_needed_(end_id - start_id) {} +namespace { -void XdsStatsWatcher::RpcCompleted(const AsyncClientCallResult& call, - const std::string& peer) { +LoadBalancerStatsResponse::RpcMetadata BuildRpcMetadata( + absl::Span metadata_keys, + const std::multimap& initial_metadata) { + LoadBalancerStatsResponse::RpcMetadata rpc_metadata; + for (const auto& key : metadata_keys) { + auto matching = initial_metadata.equal_range(key); + for (auto value = matching.first; value != matching.second; ++value) { + auto entry = rpc_metadata.add_metadata(); + entry->set_key(key); + entry->set_value( + absl::string_view(value->second.data(), value->second.length())); + } + } + return rpc_metadata; +} + +} // namespace + +XdsStatsWatcher::XdsStatsWatcher(int start_id, int end_id, + absl::Span metadata_keys) + : start_id_(start_id), + end_id_(end_id), + rpcs_needed_(end_id - start_id), + metadata_keys_(metadata_keys.begin(), metadata_keys.end()) {} + +void XdsStatsWatcher::RpcCompleted( + const AsyncClientCallResult& call, const std::string& peer, + const std::multimap& initial_metadata) { // We count RPCs for global watcher or if the request_id falls into the // watcher's interested range of request ids. if ((start_id_ == 0 && end_id_ == 0) || @@ -37,6 +62,8 @@ void XdsStatsWatcher::RpcCompleted(const AsyncClientCallResult& call, // RPC is counted into both per-peer bin and per-method-per-peer bin. rpcs_by_peer_[peer]++; rpcs_by_type_[call.rpc_type][peer]++; + *metadata_by_peer_[peer].add_rpc_metadata() = + BuildRpcMetadata(metadata_keys_, initial_metadata); } rpcs_needed_--; // Report accumulated stats. @@ -55,14 +82,17 @@ void XdsStatsWatcher::RpcCompleted(const AsyncClientCallResult& call, } } -void XdsStatsWatcher::WaitForRpcStatsResponse( - LoadBalancerStatsResponse* response, int timeout_sec) { +LoadBalancerStatsResponse XdsStatsWatcher::WaitForRpcStatsResponse( + int timeout_sec) { + LoadBalancerStatsResponse response; std::unique_lock lock(m_); cv_.wait_for(lock, std::chrono::seconds(timeout_sec), [this] { return rpcs_needed_ == 0; }); - response->mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(), - rpcs_by_peer_.end()); - auto& response_rpcs_by_method = *response->mutable_rpcs_by_method(); + response.mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(), + rpcs_by_peer_.end()); + response.mutable_metadatas_by_peer()->insert(metadata_by_peer_.begin(), + metadata_by_peer_.end()); + auto& response_rpcs_by_method = *response.mutable_rpcs_by_method(); for (const auto& rpc_by_type : rpcs_by_type_) { std::string method_name; if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) { @@ -83,7 +113,8 @@ void XdsStatsWatcher::WaitForRpcStatsResponse( response_rpc_by_peer = rpc_by_peer.second; } } - response->set_num_failures(no_remote_peer_ + rpcs_needed_); + response.set_num_failures(no_remote_peer_ + rpcs_needed_); + return response; } void XdsStatsWatcher::GetCurrentRpcStats( diff --git a/test/cpp/interop/xds_stats_watcher.h b/test/cpp/interop/xds_stats_watcher.h index a5917104f2b..6bce6e64fdb 100644 --- a/test/cpp/interop/xds_stats_watcher.h +++ b/test/cpp/interop/xds_stats_watcher.h @@ -32,6 +32,7 @@ #include #include "absl/status/status.h" +#include "absl/types/span.h" #include @@ -68,15 +69,17 @@ struct StatsWatchers { /// Records the remote peer distribution for a given range of RPCs. class XdsStatsWatcher { public: - XdsStatsWatcher(int start_id, int end_id); + XdsStatsWatcher(int start_id, int end_id, + absl::Span metadata_keys); // Upon the completion of an RPC, we will look at the request_id, the // rpc_type, and the peer the RPC was sent to in order to count // this RPC into the right stats bin. - void RpcCompleted(const AsyncClientCallResult& call, const std::string& peer); + void RpcCompleted(const AsyncClientCallResult& call, const std::string& peer, + const std::multimap& + initial_metadata); - void WaitForRpcStatsResponse(LoadBalancerStatsResponse* response, - int timeout_sec); + LoadBalancerStatsResponse WaitForRpcStatsResponse(int timeout_sec); void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response, StatsWatchers* stats_watchers); @@ -96,6 +99,9 @@ class XdsStatsWatcher { LoadBalancerAccumulatedStatsResponse accumulated_stats_; std::mutex m_; std::condition_variable cv_; + std::vector metadata_keys_; + std::map + metadata_by_peer_; }; } // namespace testing diff --git a/test/cpp/interop/xds_stats_watcher_test.cc b/test/cpp/interop/xds_stats_watcher_test.cc index 63967095e7f..4340a51b2d9 100644 --- a/test/cpp/interop/xds_stats_watcher_test.cc +++ b/test/cpp/interop/xds_stats_watcher_test.cc @@ -29,6 +29,7 @@ namespace grpc { namespace testing { namespace { + AsyncClientCallResult BuildCallResult(int saved_request_id) { AsyncClientCallResult result; result.saved_request_id = saved_request_id; @@ -36,25 +37,63 @@ AsyncClientCallResult BuildCallResult(int saved_request_id) { return result; } -TEST(XdsStatsWatcherTest, CollectsMetadata) { - XdsStatsWatcher watcher(0, 3); - watcher.RpcCompleted(BuildCallResult(0), "peer1"); - watcher.RpcCompleted(BuildCallResult(1), "peer1"); - watcher.RpcCompleted(BuildCallResult(2), "peer2"); - LoadBalancerStatsResponse lb_response; - watcher.WaitForRpcStatsResponse(&lb_response, 1); - EXPECT_EQ( - (std::multimap(lb_response.rpcs_by_peer().begin(), - lb_response.rpcs_by_peer().end())), - (std::multimap({{"peer1", 2}, {"peer2", 1}}))); - EXPECT_EQ(lb_response.rpcs_by_method_size(), 1); - auto rpcs = lb_response.rpcs_by_method().find("UnaryCall"); - EXPECT_NE(rpcs, lb_response.rpcs_by_method().end()); - std::multimap by_peer( - rpcs->second.rpcs_by_peer().begin(), rpcs->second.rpcs_by_peer().end()); - EXPECT_EQ( - by_peer, - (std::multimap({{"peer1", 2}, {"peer2", 1}}))); +LoadBalancerStatsResponse::MetadataByPeer BuildMetadatas( + const std::initializer_list< + std::initializer_list>>& values) { + LoadBalancerStatsResponse::MetadataByPeer metadata_by_peer; + for (const auto& per_rpc : values) { + auto rpc_metadata = metadata_by_peer.add_rpc_metadata(); + for (const auto& key_value : per_rpc) { + auto entry = rpc_metadata->add_metadata(); + entry->set_key(key_value.first); + entry->set_value(key_value.second); + } + } + return metadata_by_peer; +} + +TEST(XdsStatsWatcherTest, WaitForRpcStatsResponse) { + // "k3" will be ignored + XdsStatsWatcher watcher(0, 3, {"k1", "k2"}); + watcher.RpcCompleted(BuildCallResult(0), "peer1", + {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}}); + watcher.RpcCompleted(BuildCallResult(1), "peer1", {{"k1", "v4"}}); + watcher.RpcCompleted(BuildCallResult(2), "peer2", + {{"k1", "v5"}, {"k2", "v6"}, {"k3", "v7"}}); + LoadBalancerStatsResponse expected; + expected.mutable_rpcs_by_peer()->insert({{"peer1", 2}, {"peer2", 1}}); + expected.mutable_metadatas_by_peer()->insert({ + {"peer1", BuildMetadatas({{{"k1", "v1"}, {"k2", "v2"}}, {{"k1", "v4"}}})}, + {"peer2", BuildMetadatas({{{"k1", "v5"}, {"k2", "v6"}}})}, + }); + (*expected.mutable_rpcs_by_method())["UnaryCall"] + .mutable_rpcs_by_peer() + ->insert({{"peer1", 2}, {"peer2", 1}}); + EXPECT_EQ(watcher.WaitForRpcStatsResponse(0).DebugString(), + expected.DebugString()); +} + +TEST(XdsStatsWatcherTest, WaitForRpcStatsResponseIgnoresMetadata) { + XdsStatsWatcher watcher(0, 3, {}); + // RPC had metadata - but watcher should ignore it + watcher.RpcCompleted(BuildCallResult(0), "peer1", + {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}}); + // No metadata came with RPC + watcher.RpcCompleted(BuildCallResult(1), "peer1", {}); + watcher.RpcCompleted(BuildCallResult(2), "peer2", + {{"k1", "v5"}, {"k2", "v6"}, {"k3", "v7"}}); + LoadBalancerStatsResponse expected; + expected.mutable_rpcs_by_peer()->insert({{"peer1", 2}, {"peer2", 1}}); + // There will still be an empty metadata collection for each RPC + expected.mutable_metadatas_by_peer()->insert({ + {"peer1", BuildMetadatas({{}, {}})}, + {"peer2", BuildMetadatas({{}})}, + }); + (*expected.mutable_rpcs_by_method())["UnaryCall"] + .mutable_rpcs_by_peer() + ->insert({{"peer1", 2}, {"peer2", 1}}); + EXPECT_EQ(watcher.WaitForRpcStatsResponse(0).DebugString(), + expected.DebugString()); } } // namespace