diff --git a/.clang-format b/.clang-format index 64387e9e515..608eea14056 100644 --- a/.clang-format +++ b/.clang-format @@ -46,8 +46,4 @@ Language: ObjC BasedOnStyle: Google ColumnLimit: 100 ObjCBlockIndentWidth: 2 ---- -Language: Proto -BasedOnStyle: Google -ColumnLimit: 100 ... diff --git a/src/proto/grpc/testing/messages.proto b/src/proto/grpc/testing/messages.proto index d948ef6a929..33d07f47fdd 100644 --- a/src/proto/grpc/testing/messages.proto +++ b/src/proto/grpc/testing/messages.proto @@ -103,6 +103,9 @@ message SimpleRequest { // If set the server should record this metrics report data for the current RPC. TestOrcaReport orca_per_query_report = 11; + + // If set the server should update this metrics report data at the OOB server. + TestOrcaReport orca_oob_report = 12; } // Unary response, as configured by the request. @@ -207,26 +210,9 @@ message LoadBalancerStatsRequest { int32 num_rpcs = 1; // If num_rpcs have not completed within timeout_sec, return partial results. int32 timeout_sec = 2; - // response header+trailer we want the values of - repeated string metadata_keys = 3; } message LoadBalancerStatsResponse { - message MetadataEntry { - string key = 1; - string value = 2; - } - message RpcMetadata { - // metadata values for each rpc for the keys specified in - // LoadBalancerStatsRequest.metadata_keys. - // metadata keys and values are returned exactly as was recieved - // from the server. - repeated MetadataEntry metadata = 1; - } - message MetadataByPeer { - // List of RpcMetadata in for each RPC with a given peer - repeated RpcMetadata rpc_metadata = 1; - } message RpcsByPeer { // The number of completed RPCs for each peer. map rpcs_by_peer = 1; @@ -236,8 +222,6 @@ message LoadBalancerStatsResponse { // The number of RPCs that failed to record a remote peer. int32 num_failures = 2; map rpcs_by_method = 3; - // All the metadata of all RPCs for each peer. - map metadatas_by_peer = 4; } // Request for retrieving a test client's accumulated stats. diff --git a/test/cpp/interop/xds_interop_client.cc b/test/cpp/interop/xds_interop_client.cc index 1f6cf9b8332..e8df4a40b21 100644 --- a/test/cpp/interop/xds_interop_client.cc +++ b/test/cpp/interop/xds_interop_client.cc @@ -21,19 +21,16 @@ #include #include #include -#include #include #include #include #include #include -#include #include #include "absl/algorithm/container.h" #include "absl/flags/flag.h" #include "absl/strings/str_split.h" -#include "google/protobuf/repeated_ptr_field.h" #include #include @@ -209,8 +206,7 @@ class TestClient { metadata_hostname->second.length()) : call->result.simple_response.hostname(); for (auto watcher : stats_watchers_->watchers) { - watcher->RpcCompleted(call->result, hostname, - call->context.GetServerInitialMetadata()); + watcher->RpcCompleted(call->result, hostname); } } @@ -269,22 +265,20 @@ class LoadBalancerStatsServiceImpl : public LoadBalancerStatsService::Service { LoadBalancerStatsResponse* response) override { int start_id; int end_id; - std::unique_ptr watcher; + XdsStatsWatcher* watcher; { std::lock_guard lock(stats_watchers_->mu); start_id = stats_watchers_->global_request_id + 1; end_id = start_id + request->num_rpcs(); - watcher = std::make_unique( - start_id, end_id, - std::vector(request->metadata_keys().begin(), - request->metadata_keys().end())); - stats_watchers_->watchers.insert(watcher.get()); + watcher = new XdsStatsWatcher(start_id, end_id); + stats_watchers_->watchers.insert(watcher); } - *response = watcher->WaitForRpcStatsResponse(request->timeout_sec()); + watcher->WaitForRpcStatsResponse(response, request->timeout_sec()); { std::lock_guard lock(stats_watchers_->mu); - stats_watchers_->watchers.erase(watcher.get()); + stats_watchers_->watchers.erase(watcher); } + delete watcher; return Status::OK; } @@ -362,7 +356,8 @@ void RunTestLoop(std::chrono::duration duration_per_query, std::vector configs; while (true) { { - std::lock_guard lock(rpc_configs_queue->mu_rpc_configs_queue); + std::lock_guard lockk( + rpc_configs_queue->mu_rpc_configs_queue); if (!rpc_configs_queue->rpc_configs_queue.empty()) { configs = std::move(rpc_configs_queue->rpc_configs_queue.front()); rpc_configs_queue->rpc_configs_queue.pop_front(); @@ -469,7 +464,7 @@ int main(int argc, char** argv) { { std::lock_guard lock(stats_watchers.mu); - stats_watchers.global_watcher = new XdsStatsWatcher(0, 0, {}); + stats_watchers.global_watcher = new XdsStatsWatcher(0, 0); stats_watchers.watchers.insert(stats_watchers.global_watcher); } diff --git a/test/cpp/interop/xds_stats_watcher.cc b/test/cpp/interop/xds_stats_watcher.cc index 1a5dec9063f..e6b1f80908e 100644 --- a/test/cpp/interop/xds_stats_watcher.cc +++ b/test/cpp/interop/xds_stats_watcher.cc @@ -19,36 +19,11 @@ namespace grpc { namespace testing { -namespace { +XdsStatsWatcher::XdsStatsWatcher(int start_id, int end_id) + : start_id_(start_id), end_id_(end_id), rpcs_needed_(end_id - start_id) {} -LoadBalancerStatsResponse::RpcMetadata BuildRpcMetadata( - absl::Span metadata_keys, - const std::multimap& initial_metadata) { - LoadBalancerStatsResponse::RpcMetadata rpc_metadata; - for (const auto& key : metadata_keys) { - auto matching = initial_metadata.equal_range(key); - for (auto value = matching.first; value != matching.second; ++value) { - auto entry = rpc_metadata.add_metadata(); - entry->set_key(key); - entry->set_value( - absl::string_view(value->second.data(), value->second.length())); - } - } - return rpc_metadata; -} - -} // namespace - -XdsStatsWatcher::XdsStatsWatcher(int start_id, int end_id, - absl::Span metadata_keys) - : start_id_(start_id), - end_id_(end_id), - rpcs_needed_(end_id - start_id), - metadata_keys_(metadata_keys.begin(), metadata_keys.end()) {} - -void XdsStatsWatcher::RpcCompleted( - const AsyncClientCallResult& call, const std::string& peer, - const std::multimap& initial_metadata) { +void XdsStatsWatcher::RpcCompleted(const AsyncClientCallResult& call, + const std::string& peer) { // We count RPCs for global watcher or if the request_id falls into the // watcher's interested range of request ids. if ((start_id_ == 0 && end_id_ == 0) || @@ -62,8 +37,6 @@ void XdsStatsWatcher::RpcCompleted( // RPC is counted into both per-peer bin and per-method-per-peer bin. rpcs_by_peer_[peer]++; rpcs_by_type_[call.rpc_type][peer]++; - *metadata_by_peer_[peer].add_rpc_metadata() = - BuildRpcMetadata(metadata_keys_, initial_metadata); } rpcs_needed_--; // Report accumulated stats. @@ -82,17 +55,14 @@ void XdsStatsWatcher::RpcCompleted( } } -LoadBalancerStatsResponse XdsStatsWatcher::WaitForRpcStatsResponse( - int timeout_sec) { - LoadBalancerStatsResponse response; +void XdsStatsWatcher::WaitForRpcStatsResponse( + LoadBalancerStatsResponse* response, int timeout_sec) { std::unique_lock lock(m_); cv_.wait_for(lock, std::chrono::seconds(timeout_sec), [this] { return rpcs_needed_ == 0; }); - response.mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(), - rpcs_by_peer_.end()); - response.mutable_metadatas_by_peer()->insert(metadata_by_peer_.begin(), - metadata_by_peer_.end()); - auto& response_rpcs_by_method = *response.mutable_rpcs_by_method(); + response->mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(), + rpcs_by_peer_.end()); + auto& response_rpcs_by_method = *response->mutable_rpcs_by_method(); for (const auto& rpc_by_type : rpcs_by_type_) { std::string method_name; if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) { @@ -113,8 +83,7 @@ LoadBalancerStatsResponse XdsStatsWatcher::WaitForRpcStatsResponse( response_rpc_by_peer = rpc_by_peer.second; } } - response.set_num_failures(no_remote_peer_ + rpcs_needed_); - return response; + response->set_num_failures(no_remote_peer_ + rpcs_needed_); } void XdsStatsWatcher::GetCurrentRpcStats( diff --git a/test/cpp/interop/xds_stats_watcher.h b/test/cpp/interop/xds_stats_watcher.h index 6bce6e64fdb..a5917104f2b 100644 --- a/test/cpp/interop/xds_stats_watcher.h +++ b/test/cpp/interop/xds_stats_watcher.h @@ -32,7 +32,6 @@ #include #include "absl/status/status.h" -#include "absl/types/span.h" #include @@ -69,17 +68,15 @@ struct StatsWatchers { /// Records the remote peer distribution for a given range of RPCs. class XdsStatsWatcher { public: - XdsStatsWatcher(int start_id, int end_id, - absl::Span metadata_keys); + XdsStatsWatcher(int start_id, int end_id); // Upon the completion of an RPC, we will look at the request_id, the // rpc_type, and the peer the RPC was sent to in order to count // this RPC into the right stats bin. - void RpcCompleted(const AsyncClientCallResult& call, const std::string& peer, - const std::multimap& - initial_metadata); + void RpcCompleted(const AsyncClientCallResult& call, const std::string& peer); - LoadBalancerStatsResponse WaitForRpcStatsResponse(int timeout_sec); + void WaitForRpcStatsResponse(LoadBalancerStatsResponse* response, + int timeout_sec); void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response, StatsWatchers* stats_watchers); @@ -99,9 +96,6 @@ class XdsStatsWatcher { LoadBalancerAccumulatedStatsResponse accumulated_stats_; std::mutex m_; std::condition_variable cv_; - std::vector metadata_keys_; - std::map - metadata_by_peer_; }; } // namespace testing diff --git a/test/cpp/interop/xds_stats_watcher_test.cc b/test/cpp/interop/xds_stats_watcher_test.cc index 4340a51b2d9..63967095e7f 100644 --- a/test/cpp/interop/xds_stats_watcher_test.cc +++ b/test/cpp/interop/xds_stats_watcher_test.cc @@ -29,7 +29,6 @@ namespace grpc { namespace testing { namespace { - AsyncClientCallResult BuildCallResult(int saved_request_id) { AsyncClientCallResult result; result.saved_request_id = saved_request_id; @@ -37,63 +36,25 @@ AsyncClientCallResult BuildCallResult(int saved_request_id) { return result; } -LoadBalancerStatsResponse::MetadataByPeer BuildMetadatas( - const std::initializer_list< - std::initializer_list>>& values) { - LoadBalancerStatsResponse::MetadataByPeer metadata_by_peer; - for (const auto& per_rpc : values) { - auto rpc_metadata = metadata_by_peer.add_rpc_metadata(); - for (const auto& key_value : per_rpc) { - auto entry = rpc_metadata->add_metadata(); - entry->set_key(key_value.first); - entry->set_value(key_value.second); - } - } - return metadata_by_peer; -} - -TEST(XdsStatsWatcherTest, WaitForRpcStatsResponse) { - // "k3" will be ignored - XdsStatsWatcher watcher(0, 3, {"k1", "k2"}); - watcher.RpcCompleted(BuildCallResult(0), "peer1", - {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}}); - watcher.RpcCompleted(BuildCallResult(1), "peer1", {{"k1", "v4"}}); - watcher.RpcCompleted(BuildCallResult(2), "peer2", - {{"k1", "v5"}, {"k2", "v6"}, {"k3", "v7"}}); - LoadBalancerStatsResponse expected; - expected.mutable_rpcs_by_peer()->insert({{"peer1", 2}, {"peer2", 1}}); - expected.mutable_metadatas_by_peer()->insert({ - {"peer1", BuildMetadatas({{{"k1", "v1"}, {"k2", "v2"}}, {{"k1", "v4"}}})}, - {"peer2", BuildMetadatas({{{"k1", "v5"}, {"k2", "v6"}}})}, - }); - (*expected.mutable_rpcs_by_method())["UnaryCall"] - .mutable_rpcs_by_peer() - ->insert({{"peer1", 2}, {"peer2", 1}}); - EXPECT_EQ(watcher.WaitForRpcStatsResponse(0).DebugString(), - expected.DebugString()); -} - -TEST(XdsStatsWatcherTest, WaitForRpcStatsResponseIgnoresMetadata) { - XdsStatsWatcher watcher(0, 3, {}); - // RPC had metadata - but watcher should ignore it - watcher.RpcCompleted(BuildCallResult(0), "peer1", - {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}}); - // No metadata came with RPC - watcher.RpcCompleted(BuildCallResult(1), "peer1", {}); - watcher.RpcCompleted(BuildCallResult(2), "peer2", - {{"k1", "v5"}, {"k2", "v6"}, {"k3", "v7"}}); - LoadBalancerStatsResponse expected; - expected.mutable_rpcs_by_peer()->insert({{"peer1", 2}, {"peer2", 1}}); - // There will still be an empty metadata collection for each RPC - expected.mutable_metadatas_by_peer()->insert({ - {"peer1", BuildMetadatas({{}, {}})}, - {"peer2", BuildMetadatas({{}})}, - }); - (*expected.mutable_rpcs_by_method())["UnaryCall"] - .mutable_rpcs_by_peer() - ->insert({{"peer1", 2}, {"peer2", 1}}); - EXPECT_EQ(watcher.WaitForRpcStatsResponse(0).DebugString(), - expected.DebugString()); +TEST(XdsStatsWatcherTest, CollectsMetadata) { + XdsStatsWatcher watcher(0, 3); + watcher.RpcCompleted(BuildCallResult(0), "peer1"); + watcher.RpcCompleted(BuildCallResult(1), "peer1"); + watcher.RpcCompleted(BuildCallResult(2), "peer2"); + LoadBalancerStatsResponse lb_response; + watcher.WaitForRpcStatsResponse(&lb_response, 1); + EXPECT_EQ( + (std::multimap(lb_response.rpcs_by_peer().begin(), + lb_response.rpcs_by_peer().end())), + (std::multimap({{"peer1", 2}, {"peer2", 1}}))); + EXPECT_EQ(lb_response.rpcs_by_method_size(), 1); + auto rpcs = lb_response.rpcs_by_method().find("UnaryCall"); + EXPECT_NE(rpcs, lb_response.rpcs_by_method().end()); + std::multimap by_peer( + rpcs->second.rpcs_by_peer().begin(), rpcs->second.rpcs_by_peer().end()); + EXPECT_EQ( + by_peer, + (std::multimap({{"peer1", 2}, {"peer2", 1}}))); } } // namespace