[PSM Interop] Revert "Report per-RPC metadata if requested. (#33939)" (#34028)

This reverts commit 6fadb994ef.
pull/33991/head
Eugene Ostroukhov 1 year ago committed by GitHub
parent df9ec2b0f0
commit fc9a1ccaed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      .clang-format
  2. 22
      src/proto/grpc/testing/messages.proto
  3. 25
      test/cpp/interop/xds_interop_client.cc
  4. 51
      test/cpp/interop/xds_stats_watcher.cc
  5. 14
      test/cpp/interop/xds_stats_watcher.h
  6. 77
      test/cpp/interop/xds_stats_watcher_test.cc

@ -46,8 +46,4 @@ Language: ObjC
BasedOnStyle: Google BasedOnStyle: Google
ColumnLimit: 100 ColumnLimit: 100
ObjCBlockIndentWidth: 2 ObjCBlockIndentWidth: 2
---
Language: Proto
BasedOnStyle: Google
ColumnLimit: 100
... ...

@ -103,6 +103,9 @@ message SimpleRequest {
// If set the server should record this metrics report data for the current RPC. // If set the server should record this metrics report data for the current RPC.
TestOrcaReport orca_per_query_report = 11; TestOrcaReport orca_per_query_report = 11;
// If set the server should update this metrics report data at the OOB server.
TestOrcaReport orca_oob_report = 12;
} }
// Unary response, as configured by the request. // Unary response, as configured by the request.
@ -207,26 +210,9 @@ message LoadBalancerStatsRequest {
int32 num_rpcs = 1; int32 num_rpcs = 1;
// If num_rpcs have not completed within timeout_sec, return partial results. // If num_rpcs have not completed within timeout_sec, return partial results.
int32 timeout_sec = 2; int32 timeout_sec = 2;
// response header+trailer we want the values of
repeated string metadata_keys = 3;
} }
message LoadBalancerStatsResponse { message LoadBalancerStatsResponse {
message MetadataEntry {
string key = 1;
string value = 2;
}
message RpcMetadata {
// metadata values for each rpc for the keys specified in
// LoadBalancerStatsRequest.metadata_keys.
// metadata keys and values are returned exactly as was recieved
// from the server.
repeated MetadataEntry metadata = 1;
}
message MetadataByPeer {
// List of RpcMetadata in for each RPC with a given peer
repeated RpcMetadata rpc_metadata = 1;
}
message RpcsByPeer { message RpcsByPeer {
// The number of completed RPCs for each peer. // The number of completed RPCs for each peer.
map<string, int32> rpcs_by_peer = 1; map<string, int32> rpcs_by_peer = 1;
@ -236,8 +222,6 @@ message LoadBalancerStatsResponse {
// The number of RPCs that failed to record a remote peer. // The number of RPCs that failed to record a remote peer.
int32 num_failures = 2; int32 num_failures = 2;
map<string, RpcsByPeer> rpcs_by_method = 3; map<string, RpcsByPeer> rpcs_by_method = 3;
// All the metadata of all RPCs for each peer.
map<string, MetadataByPeer> metadatas_by_peer = 4;
} }
// Request for retrieving a test client's accumulated stats. // Request for retrieving a test client's accumulated stats.

@ -21,19 +21,16 @@
#include <condition_variable> #include <condition_variable>
#include <deque> #include <deque>
#include <map> #include <map>
#include <memory>
#include <mutex> #include <mutex>
#include <set> #include <set>
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <thread> #include <thread>
#include <utility>
#include <vector> #include <vector>
#include "absl/algorithm/container.h" #include "absl/algorithm/container.h"
#include "absl/flags/flag.h" #include "absl/flags/flag.h"
#include "absl/strings/str_split.h" #include "absl/strings/str_split.h"
#include "google/protobuf/repeated_ptr_field.h"
#include <grpcpp/ext/admin_services.h> #include <grpcpp/ext/admin_services.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h> #include <grpcpp/ext/proto_server_reflection_plugin.h>
@ -209,8 +206,7 @@ class TestClient {
metadata_hostname->second.length()) metadata_hostname->second.length())
: call->result.simple_response.hostname(); : call->result.simple_response.hostname();
for (auto watcher : stats_watchers_->watchers) { for (auto watcher : stats_watchers_->watchers) {
watcher->RpcCompleted(call->result, hostname, watcher->RpcCompleted(call->result, hostname);
call->context.GetServerInitialMetadata());
} }
} }
@ -269,22 +265,20 @@ class LoadBalancerStatsServiceImpl : public LoadBalancerStatsService::Service {
LoadBalancerStatsResponse* response) override { LoadBalancerStatsResponse* response) override {
int start_id; int start_id;
int end_id; int end_id;
std::unique_ptr<XdsStatsWatcher> watcher; XdsStatsWatcher* watcher;
{ {
std::lock_guard<std::mutex> lock(stats_watchers_->mu); std::lock_guard<std::mutex> lock(stats_watchers_->mu);
start_id = stats_watchers_->global_request_id + 1; start_id = stats_watchers_->global_request_id + 1;
end_id = start_id + request->num_rpcs(); end_id = start_id + request->num_rpcs();
watcher = std::make_unique<XdsStatsWatcher>( watcher = new XdsStatsWatcher(start_id, end_id);
start_id, end_id, stats_watchers_->watchers.insert(watcher);
std::vector<std::string>(request->metadata_keys().begin(),
request->metadata_keys().end()));
stats_watchers_->watchers.insert(watcher.get());
} }
*response = watcher->WaitForRpcStatsResponse(request->timeout_sec()); watcher->WaitForRpcStatsResponse(response, request->timeout_sec());
{ {
std::lock_guard<std::mutex> lock(stats_watchers_->mu); std::lock_guard<std::mutex> lock(stats_watchers_->mu);
stats_watchers_->watchers.erase(watcher.get()); stats_watchers_->watchers.erase(watcher);
} }
delete watcher;
return Status::OK; return Status::OK;
} }
@ -362,7 +356,8 @@ void RunTestLoop(std::chrono::duration<double> duration_per_query,
std::vector<RpcConfig> configs; std::vector<RpcConfig> configs;
while (true) { while (true) {
{ {
std::lock_guard<std::mutex> lock(rpc_configs_queue->mu_rpc_configs_queue); std::lock_guard<std::mutex> lockk(
rpc_configs_queue->mu_rpc_configs_queue);
if (!rpc_configs_queue->rpc_configs_queue.empty()) { if (!rpc_configs_queue->rpc_configs_queue.empty()) {
configs = std::move(rpc_configs_queue->rpc_configs_queue.front()); configs = std::move(rpc_configs_queue->rpc_configs_queue.front());
rpc_configs_queue->rpc_configs_queue.pop_front(); rpc_configs_queue->rpc_configs_queue.pop_front();
@ -469,7 +464,7 @@ int main(int argc, char** argv) {
{ {
std::lock_guard<std::mutex> lock(stats_watchers.mu); std::lock_guard<std::mutex> lock(stats_watchers.mu);
stats_watchers.global_watcher = new XdsStatsWatcher(0, 0, {}); stats_watchers.global_watcher = new XdsStatsWatcher(0, 0);
stats_watchers.watchers.insert(stats_watchers.global_watcher); stats_watchers.watchers.insert(stats_watchers.global_watcher);
} }

@ -19,36 +19,11 @@
namespace grpc { namespace grpc {
namespace testing { namespace testing {
namespace { XdsStatsWatcher::XdsStatsWatcher(int start_id, int end_id)
: start_id_(start_id), end_id_(end_id), rpcs_needed_(end_id - start_id) {}
LoadBalancerStatsResponse::RpcMetadata BuildRpcMetadata( void XdsStatsWatcher::RpcCompleted(const AsyncClientCallResult& call,
absl::Span<const std::string> metadata_keys, const std::string& peer) {
const std::multimap<grpc::string_ref, grpc::string_ref>& initial_metadata) {
LoadBalancerStatsResponse::RpcMetadata rpc_metadata;
for (const auto& key : metadata_keys) {
auto matching = initial_metadata.equal_range(key);
for (auto value = matching.first; value != matching.second; ++value) {
auto entry = rpc_metadata.add_metadata();
entry->set_key(key);
entry->set_value(
absl::string_view(value->second.data(), value->second.length()));
}
}
return rpc_metadata;
}
} // namespace
XdsStatsWatcher::XdsStatsWatcher(int start_id, int end_id,
absl::Span<const std::string> metadata_keys)
: start_id_(start_id),
end_id_(end_id),
rpcs_needed_(end_id - start_id),
metadata_keys_(metadata_keys.begin(), metadata_keys.end()) {}
void XdsStatsWatcher::RpcCompleted(
const AsyncClientCallResult& call, const std::string& peer,
const std::multimap<grpc::string_ref, grpc::string_ref>& initial_metadata) {
// We count RPCs for global watcher or if the request_id falls into the // We count RPCs for global watcher or if the request_id falls into the
// watcher's interested range of request ids. // watcher's interested range of request ids.
if ((start_id_ == 0 && end_id_ == 0) || if ((start_id_ == 0 && end_id_ == 0) ||
@ -62,8 +37,6 @@ void XdsStatsWatcher::RpcCompleted(
// RPC is counted into both per-peer bin and per-method-per-peer bin. // RPC is counted into both per-peer bin and per-method-per-peer bin.
rpcs_by_peer_[peer]++; rpcs_by_peer_[peer]++;
rpcs_by_type_[call.rpc_type][peer]++; rpcs_by_type_[call.rpc_type][peer]++;
*metadata_by_peer_[peer].add_rpc_metadata() =
BuildRpcMetadata(metadata_keys_, initial_metadata);
} }
rpcs_needed_--; rpcs_needed_--;
// Report accumulated stats. // Report accumulated stats.
@ -82,17 +55,14 @@ void XdsStatsWatcher::RpcCompleted(
} }
} }
LoadBalancerStatsResponse XdsStatsWatcher::WaitForRpcStatsResponse( void XdsStatsWatcher::WaitForRpcStatsResponse(
int timeout_sec) { LoadBalancerStatsResponse* response, int timeout_sec) {
LoadBalancerStatsResponse response;
std::unique_lock<std::mutex> lock(m_); std::unique_lock<std::mutex> lock(m_);
cv_.wait_for(lock, std::chrono::seconds(timeout_sec), cv_.wait_for(lock, std::chrono::seconds(timeout_sec),
[this] { return rpcs_needed_ == 0; }); [this] { return rpcs_needed_ == 0; });
response.mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(), response->mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(),
rpcs_by_peer_.end()); rpcs_by_peer_.end());
response.mutable_metadatas_by_peer()->insert(metadata_by_peer_.begin(), auto& response_rpcs_by_method = *response->mutable_rpcs_by_method();
metadata_by_peer_.end());
auto& response_rpcs_by_method = *response.mutable_rpcs_by_method();
for (const auto& rpc_by_type : rpcs_by_type_) { for (const auto& rpc_by_type : rpcs_by_type_) {
std::string method_name; std::string method_name;
if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) { if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) {
@ -113,8 +83,7 @@ LoadBalancerStatsResponse XdsStatsWatcher::WaitForRpcStatsResponse(
response_rpc_by_peer = rpc_by_peer.second; response_rpc_by_peer = rpc_by_peer.second;
} }
} }
response.set_num_failures(no_remote_peer_ + rpcs_needed_); response->set_num_failures(no_remote_peer_ + rpcs_needed_);
return response;
} }
void XdsStatsWatcher::GetCurrentRpcStats( void XdsStatsWatcher::GetCurrentRpcStats(

@ -32,7 +32,6 @@
#include <vector> #include <vector>
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/types/span.h"
#include <grpcpp/grpcpp.h> #include <grpcpp/grpcpp.h>
@ -69,17 +68,15 @@ struct StatsWatchers {
/// Records the remote peer distribution for a given range of RPCs. /// Records the remote peer distribution for a given range of RPCs.
class XdsStatsWatcher { class XdsStatsWatcher {
public: public:
XdsStatsWatcher(int start_id, int end_id, XdsStatsWatcher(int start_id, int end_id);
absl::Span<const std::string> metadata_keys);
// Upon the completion of an RPC, we will look at the request_id, the // Upon the completion of an RPC, we will look at the request_id, the
// rpc_type, and the peer the RPC was sent to in order to count // rpc_type, and the peer the RPC was sent to in order to count
// this RPC into the right stats bin. // this RPC into the right stats bin.
void RpcCompleted(const AsyncClientCallResult& call, const std::string& peer, void RpcCompleted(const AsyncClientCallResult& call, const std::string& peer);
const std::multimap<grpc::string_ref, grpc::string_ref>&
initial_metadata);
LoadBalancerStatsResponse WaitForRpcStatsResponse(int timeout_sec); void WaitForRpcStatsResponse(LoadBalancerStatsResponse* response,
int timeout_sec);
void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response, void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response,
StatsWatchers* stats_watchers); StatsWatchers* stats_watchers);
@ -99,9 +96,6 @@ class XdsStatsWatcher {
LoadBalancerAccumulatedStatsResponse accumulated_stats_; LoadBalancerAccumulatedStatsResponse accumulated_stats_;
std::mutex m_; std::mutex m_;
std::condition_variable cv_; std::condition_variable cv_;
std::vector<std::string> metadata_keys_;
std::map<std::string, LoadBalancerStatsResponse::MetadataByPeer>
metadata_by_peer_;
}; };
} // namespace testing } // namespace testing

@ -29,7 +29,6 @@
namespace grpc { namespace grpc {
namespace testing { namespace testing {
namespace { namespace {
AsyncClientCallResult BuildCallResult(int saved_request_id) { AsyncClientCallResult BuildCallResult(int saved_request_id) {
AsyncClientCallResult result; AsyncClientCallResult result;
result.saved_request_id = saved_request_id; result.saved_request_id = saved_request_id;
@ -37,63 +36,25 @@ AsyncClientCallResult BuildCallResult(int saved_request_id) {
return result; return result;
} }
LoadBalancerStatsResponse::MetadataByPeer BuildMetadatas( TEST(XdsStatsWatcherTest, CollectsMetadata) {
const std::initializer_list< XdsStatsWatcher watcher(0, 3);
std::initializer_list<std::pair<std::string, std::string>>>& values) { watcher.RpcCompleted(BuildCallResult(0), "peer1");
LoadBalancerStatsResponse::MetadataByPeer metadata_by_peer; watcher.RpcCompleted(BuildCallResult(1), "peer1");
for (const auto& per_rpc : values) { watcher.RpcCompleted(BuildCallResult(2), "peer2");
auto rpc_metadata = metadata_by_peer.add_rpc_metadata(); LoadBalancerStatsResponse lb_response;
for (const auto& key_value : per_rpc) { watcher.WaitForRpcStatsResponse(&lb_response, 1);
auto entry = rpc_metadata->add_metadata(); EXPECT_EQ(
entry->set_key(key_value.first); (std::multimap<std::string, int32_t>(lb_response.rpcs_by_peer().begin(),
entry->set_value(key_value.second); lb_response.rpcs_by_peer().end())),
} (std::multimap<std::string, int32_t>({{"peer1", 2}, {"peer2", 1}})));
} EXPECT_EQ(lb_response.rpcs_by_method_size(), 1);
return metadata_by_peer; auto rpcs = lb_response.rpcs_by_method().find("UnaryCall");
} EXPECT_NE(rpcs, lb_response.rpcs_by_method().end());
std::multimap<std::string, int32_t> by_peer(
TEST(XdsStatsWatcherTest, WaitForRpcStatsResponse) { rpcs->second.rpcs_by_peer().begin(), rpcs->second.rpcs_by_peer().end());
// "k3" will be ignored EXPECT_EQ(
XdsStatsWatcher watcher(0, 3, {"k1", "k2"}); by_peer,
watcher.RpcCompleted(BuildCallResult(0), "peer1", (std::multimap<std::string, int32_t>({{"peer1", 2}, {"peer2", 1}})));
{{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}});
watcher.RpcCompleted(BuildCallResult(1), "peer1", {{"k1", "v4"}});
watcher.RpcCompleted(BuildCallResult(2), "peer2",
{{"k1", "v5"}, {"k2", "v6"}, {"k3", "v7"}});
LoadBalancerStatsResponse expected;
expected.mutable_rpcs_by_peer()->insert({{"peer1", 2}, {"peer2", 1}});
expected.mutable_metadatas_by_peer()->insert({
{"peer1", BuildMetadatas({{{"k1", "v1"}, {"k2", "v2"}}, {{"k1", "v4"}}})},
{"peer2", BuildMetadatas({{{"k1", "v5"}, {"k2", "v6"}}})},
});
(*expected.mutable_rpcs_by_method())["UnaryCall"]
.mutable_rpcs_by_peer()
->insert({{"peer1", 2}, {"peer2", 1}});
EXPECT_EQ(watcher.WaitForRpcStatsResponse(0).DebugString(),
expected.DebugString());
}
TEST(XdsStatsWatcherTest, WaitForRpcStatsResponseIgnoresMetadata) {
XdsStatsWatcher watcher(0, 3, {});
// RPC had metadata - but watcher should ignore it
watcher.RpcCompleted(BuildCallResult(0), "peer1",
{{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}});
// No metadata came with RPC
watcher.RpcCompleted(BuildCallResult(1), "peer1", {});
watcher.RpcCompleted(BuildCallResult(2), "peer2",
{{"k1", "v5"}, {"k2", "v6"}, {"k3", "v7"}});
LoadBalancerStatsResponse expected;
expected.mutable_rpcs_by_peer()->insert({{"peer1", 2}, {"peer2", 1}});
// There will still be an empty metadata collection for each RPC
expected.mutable_metadatas_by_peer()->insert({
{"peer1", BuildMetadatas({{}, {}})},
{"peer2", BuildMetadatas({{}})},
});
(*expected.mutable_rpcs_by_method())["UnaryCall"]
.mutable_rpcs_by_peer()
->insert({{"peer1", 2}, {"peer2", 1}});
EXPECT_EQ(watcher.WaitForRpcStatsResponse(0).DebugString(),
expected.DebugString());
} }
} // namespace } // namespace

Loading…
Cancel
Save