[PSM Interop] Restore "Report per-RPC metadata if requested. (#33939)" (#34037)

pull/34040/head
Eugene Ostroukhov 2 years ago committed by GitHub
parent 4a24d8250f
commit 44de3ab221
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      .clang-format
  2. 22
      src/proto/grpc/testing/messages.proto
  3. 24
      test/cpp/interop/xds_interop_client.cc
  4. 51
      test/cpp/interop/xds_stats_watcher.cc
  5. 14
      test/cpp/interop/xds_stats_watcher.h
  6. 77
      test/cpp/interop/xds_stats_watcher_test.cc

@ -46,4 +46,8 @@ Language: ObjC
BasedOnStyle: Google BasedOnStyle: Google
ColumnLimit: 100 ColumnLimit: 100
ObjCBlockIndentWidth: 2 ObjCBlockIndentWidth: 2
---
Language: Proto
BasedOnStyle: Google
ColumnLimit: 100
... ...

@ -103,9 +103,6 @@ message SimpleRequest {
// If set the server should record this metrics report data for the current RPC. // If set the server should record this metrics report data for the current RPC.
TestOrcaReport orca_per_query_report = 11; TestOrcaReport orca_per_query_report = 11;
// If set the server should update this metrics report data at the OOB server.
TestOrcaReport orca_oob_report = 12;
} }
// Unary response, as configured by the request. // Unary response, as configured by the request.
@ -210,9 +207,26 @@ message LoadBalancerStatsRequest {
int32 num_rpcs = 1; int32 num_rpcs = 1;
// If num_rpcs have not completed within timeout_sec, return partial results. // If num_rpcs have not completed within timeout_sec, return partial results.
int32 timeout_sec = 2; int32 timeout_sec = 2;
// response header+trailer we want the values of
repeated string metadata_keys = 3;
} }
message LoadBalancerStatsResponse { message LoadBalancerStatsResponse {
message MetadataEntry {
string key = 1;
string value = 2;
}
message RpcMetadata {
// metadata values for each rpc for the keys specified in
// LoadBalancerStatsRequest.metadata_keys.
// metadata keys and values are returned exactly as was recieved
// from the server.
repeated MetadataEntry metadata = 1;
}
message MetadataByPeer {
// List of RpcMetadata in for each RPC with a given peer
repeated RpcMetadata rpc_metadata = 1;
}
message RpcsByPeer { message RpcsByPeer {
// The number of completed RPCs for each peer. // The number of completed RPCs for each peer.
map<string, int32> rpcs_by_peer = 1; map<string, int32> rpcs_by_peer = 1;
@ -222,6 +236,8 @@ message LoadBalancerStatsResponse {
// The number of RPCs that failed to record a remote peer. // The number of RPCs that failed to record a remote peer.
int32 num_failures = 2; int32 num_failures = 2;
map<string, RpcsByPeer> rpcs_by_method = 3; map<string, RpcsByPeer> rpcs_by_method = 3;
// All the metadata of all RPCs for each peer.
map<string, MetadataByPeer> metadatas_by_peer = 4;
} }
// Request for retrieving a test client's accumulated stats. // Request for retrieving a test client's accumulated stats.

@ -21,11 +21,13 @@
#include <condition_variable> #include <condition_variable>
#include <deque> #include <deque>
#include <map> #include <map>
#include <memory>
#include <mutex> #include <mutex>
#include <set> #include <set>
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <thread> #include <thread>
#include <utility>
#include <vector> #include <vector>
#include "absl/algorithm/container.h" #include "absl/algorithm/container.h"
@ -206,7 +208,8 @@ class TestClient {
metadata_hostname->second.length()) metadata_hostname->second.length())
: call->result.simple_response.hostname(); : call->result.simple_response.hostname();
for (auto watcher : stats_watchers_->watchers) { for (auto watcher : stats_watchers_->watchers) {
watcher->RpcCompleted(call->result, hostname); watcher->RpcCompleted(call->result, hostname,
call->context.GetServerInitialMetadata());
} }
} }
@ -265,20 +268,22 @@ class LoadBalancerStatsServiceImpl : public LoadBalancerStatsService::Service {
LoadBalancerStatsResponse* response) override { LoadBalancerStatsResponse* response) override {
int start_id; int start_id;
int end_id; int end_id;
XdsStatsWatcher* watcher; std::unique_ptr<XdsStatsWatcher> watcher;
{ {
std::lock_guard<std::mutex> lock(stats_watchers_->mu); std::lock_guard<std::mutex> lock(stats_watchers_->mu);
start_id = stats_watchers_->global_request_id + 1; start_id = stats_watchers_->global_request_id + 1;
end_id = start_id + request->num_rpcs(); end_id = start_id + request->num_rpcs();
watcher = new XdsStatsWatcher(start_id, end_id); watcher = std::make_unique<XdsStatsWatcher>(
stats_watchers_->watchers.insert(watcher); start_id, end_id,
std::vector<std::string>(request->metadata_keys().begin(),
request->metadata_keys().end()));
stats_watchers_->watchers.insert(watcher.get());
} }
watcher->WaitForRpcStatsResponse(response, request->timeout_sec()); *response = watcher->WaitForRpcStatsResponse(request->timeout_sec());
{ {
std::lock_guard<std::mutex> lock(stats_watchers_->mu); std::lock_guard<std::mutex> lock(stats_watchers_->mu);
stats_watchers_->watchers.erase(watcher); stats_watchers_->watchers.erase(watcher.get());
} }
delete watcher;
return Status::OK; return Status::OK;
} }
@ -356,8 +361,7 @@ void RunTestLoop(std::chrono::duration<double> duration_per_query,
std::vector<RpcConfig> configs; std::vector<RpcConfig> configs;
while (true) { while (true) {
{ {
std::lock_guard<std::mutex> lockk( std::lock_guard<std::mutex> lock(rpc_configs_queue->mu_rpc_configs_queue);
rpc_configs_queue->mu_rpc_configs_queue);
if (!rpc_configs_queue->rpc_configs_queue.empty()) { if (!rpc_configs_queue->rpc_configs_queue.empty()) {
configs = std::move(rpc_configs_queue->rpc_configs_queue.front()); configs = std::move(rpc_configs_queue->rpc_configs_queue.front());
rpc_configs_queue->rpc_configs_queue.pop_front(); rpc_configs_queue->rpc_configs_queue.pop_front();
@ -464,7 +468,7 @@ int main(int argc, char** argv) {
{ {
std::lock_guard<std::mutex> lock(stats_watchers.mu); std::lock_guard<std::mutex> lock(stats_watchers.mu);
stats_watchers.global_watcher = new XdsStatsWatcher(0, 0); stats_watchers.global_watcher = new XdsStatsWatcher(0, 0, {});
stats_watchers.watchers.insert(stats_watchers.global_watcher); stats_watchers.watchers.insert(stats_watchers.global_watcher);
} }

@ -19,11 +19,36 @@
namespace grpc { namespace grpc {
namespace testing { namespace testing {
XdsStatsWatcher::XdsStatsWatcher(int start_id, int end_id) namespace {
: start_id_(start_id), end_id_(end_id), rpcs_needed_(end_id - start_id) {}
void XdsStatsWatcher::RpcCompleted(const AsyncClientCallResult& call, LoadBalancerStatsResponse::RpcMetadata BuildRpcMetadata(
const std::string& peer) { absl::Span<const std::string> metadata_keys,
const std::multimap<grpc::string_ref, grpc::string_ref>& initial_metadata) {
LoadBalancerStatsResponse::RpcMetadata rpc_metadata;
for (const auto& key : metadata_keys) {
auto matching = initial_metadata.equal_range(key);
for (auto value = matching.first; value != matching.second; ++value) {
auto entry = rpc_metadata.add_metadata();
entry->set_key(key);
entry->set_value(
absl::string_view(value->second.data(), value->second.length()));
}
}
return rpc_metadata;
}
} // namespace
XdsStatsWatcher::XdsStatsWatcher(int start_id, int end_id,
absl::Span<const std::string> metadata_keys)
: start_id_(start_id),
end_id_(end_id),
rpcs_needed_(end_id - start_id),
metadata_keys_(metadata_keys.begin(), metadata_keys.end()) {}
void XdsStatsWatcher::RpcCompleted(
const AsyncClientCallResult& call, const std::string& peer,
const std::multimap<grpc::string_ref, grpc::string_ref>& initial_metadata) {
// We count RPCs for global watcher or if the request_id falls into the // We count RPCs for global watcher or if the request_id falls into the
// watcher's interested range of request ids. // watcher's interested range of request ids.
if ((start_id_ == 0 && end_id_ == 0) || if ((start_id_ == 0 && end_id_ == 0) ||
@ -37,6 +62,8 @@ void XdsStatsWatcher::RpcCompleted(const AsyncClientCallResult& call,
// RPC is counted into both per-peer bin and per-method-per-peer bin. // RPC is counted into both per-peer bin and per-method-per-peer bin.
rpcs_by_peer_[peer]++; rpcs_by_peer_[peer]++;
rpcs_by_type_[call.rpc_type][peer]++; rpcs_by_type_[call.rpc_type][peer]++;
*metadata_by_peer_[peer].add_rpc_metadata() =
BuildRpcMetadata(metadata_keys_, initial_metadata);
} }
rpcs_needed_--; rpcs_needed_--;
// Report accumulated stats. // Report accumulated stats.
@ -55,14 +82,17 @@ void XdsStatsWatcher::RpcCompleted(const AsyncClientCallResult& call,
} }
} }
void XdsStatsWatcher::WaitForRpcStatsResponse( LoadBalancerStatsResponse XdsStatsWatcher::WaitForRpcStatsResponse(
LoadBalancerStatsResponse* response, int timeout_sec) { int timeout_sec) {
LoadBalancerStatsResponse response;
std::unique_lock<std::mutex> lock(m_); std::unique_lock<std::mutex> lock(m_);
cv_.wait_for(lock, std::chrono::seconds(timeout_sec), cv_.wait_for(lock, std::chrono::seconds(timeout_sec),
[this] { return rpcs_needed_ == 0; }); [this] { return rpcs_needed_ == 0; });
response->mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(), response.mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(),
rpcs_by_peer_.end()); rpcs_by_peer_.end());
auto& response_rpcs_by_method = *response->mutable_rpcs_by_method(); response.mutable_metadatas_by_peer()->insert(metadata_by_peer_.begin(),
metadata_by_peer_.end());
auto& response_rpcs_by_method = *response.mutable_rpcs_by_method();
for (const auto& rpc_by_type : rpcs_by_type_) { for (const auto& rpc_by_type : rpcs_by_type_) {
std::string method_name; std::string method_name;
if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) { if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) {
@ -83,7 +113,8 @@ void XdsStatsWatcher::WaitForRpcStatsResponse(
response_rpc_by_peer = rpc_by_peer.second; response_rpc_by_peer = rpc_by_peer.second;
} }
} }
response->set_num_failures(no_remote_peer_ + rpcs_needed_); response.set_num_failures(no_remote_peer_ + rpcs_needed_);
return response;
} }
void XdsStatsWatcher::GetCurrentRpcStats( void XdsStatsWatcher::GetCurrentRpcStats(

@ -32,6 +32,7 @@
#include <vector> #include <vector>
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/types/span.h"
#include <grpcpp/grpcpp.h> #include <grpcpp/grpcpp.h>
@ -68,15 +69,17 @@ struct StatsWatchers {
/// Records the remote peer distribution for a given range of RPCs. /// Records the remote peer distribution for a given range of RPCs.
class XdsStatsWatcher { class XdsStatsWatcher {
public: public:
XdsStatsWatcher(int start_id, int end_id); XdsStatsWatcher(int start_id, int end_id,
absl::Span<const std::string> metadata_keys);
// Upon the completion of an RPC, we will look at the request_id, the // Upon the completion of an RPC, we will look at the request_id, the
// rpc_type, and the peer the RPC was sent to in order to count // rpc_type, and the peer the RPC was sent to in order to count
// this RPC into the right stats bin. // this RPC into the right stats bin.
void RpcCompleted(const AsyncClientCallResult& call, const std::string& peer); void RpcCompleted(const AsyncClientCallResult& call, const std::string& peer,
const std::multimap<grpc::string_ref, grpc::string_ref>&
initial_metadata);
void WaitForRpcStatsResponse(LoadBalancerStatsResponse* response, LoadBalancerStatsResponse WaitForRpcStatsResponse(int timeout_sec);
int timeout_sec);
void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response, void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response,
StatsWatchers* stats_watchers); StatsWatchers* stats_watchers);
@ -96,6 +99,9 @@ class XdsStatsWatcher {
LoadBalancerAccumulatedStatsResponse accumulated_stats_; LoadBalancerAccumulatedStatsResponse accumulated_stats_;
std::mutex m_; std::mutex m_;
std::condition_variable cv_; std::condition_variable cv_;
std::vector<std::string> metadata_keys_;
std::map<std::string, LoadBalancerStatsResponse::MetadataByPeer>
metadata_by_peer_;
}; };
} // namespace testing } // namespace testing

@ -29,6 +29,7 @@
namespace grpc { namespace grpc {
namespace testing { namespace testing {
namespace { namespace {
AsyncClientCallResult BuildCallResult(int saved_request_id) { AsyncClientCallResult BuildCallResult(int saved_request_id) {
AsyncClientCallResult result; AsyncClientCallResult result;
result.saved_request_id = saved_request_id; result.saved_request_id = saved_request_id;
@ -36,25 +37,63 @@ AsyncClientCallResult BuildCallResult(int saved_request_id) {
return result; return result;
} }
TEST(XdsStatsWatcherTest, CollectsMetadata) { LoadBalancerStatsResponse::MetadataByPeer BuildMetadatas(
XdsStatsWatcher watcher(0, 3); const std::initializer_list<
watcher.RpcCompleted(BuildCallResult(0), "peer1"); std::initializer_list<std::pair<std::string, std::string>>>& values) {
watcher.RpcCompleted(BuildCallResult(1), "peer1"); LoadBalancerStatsResponse::MetadataByPeer metadata_by_peer;
watcher.RpcCompleted(BuildCallResult(2), "peer2"); for (const auto& per_rpc : values) {
LoadBalancerStatsResponse lb_response; auto rpc_metadata = metadata_by_peer.add_rpc_metadata();
watcher.WaitForRpcStatsResponse(&lb_response, 1); for (const auto& key_value : per_rpc) {
EXPECT_EQ( auto entry = rpc_metadata->add_metadata();
(std::multimap<std::string, int32_t>(lb_response.rpcs_by_peer().begin(), entry->set_key(key_value.first);
lb_response.rpcs_by_peer().end())), entry->set_value(key_value.second);
(std::multimap<std::string, int32_t>({{"peer1", 2}, {"peer2", 1}}))); }
EXPECT_EQ(lb_response.rpcs_by_method_size(), 1); }
auto rpcs = lb_response.rpcs_by_method().find("UnaryCall"); return metadata_by_peer;
EXPECT_NE(rpcs, lb_response.rpcs_by_method().end()); }
std::multimap<std::string, int32_t> by_peer(
rpcs->second.rpcs_by_peer().begin(), rpcs->second.rpcs_by_peer().end()); TEST(XdsStatsWatcherTest, WaitForRpcStatsResponse) {
EXPECT_EQ( // "k3" will be ignored
by_peer, XdsStatsWatcher watcher(0, 3, {"k1", "k2"});
(std::multimap<std::string, int32_t>({{"peer1", 2}, {"peer2", 1}}))); watcher.RpcCompleted(BuildCallResult(0), "peer1",
{{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}});
watcher.RpcCompleted(BuildCallResult(1), "peer1", {{"k1", "v4"}});
watcher.RpcCompleted(BuildCallResult(2), "peer2",
{{"k1", "v5"}, {"k2", "v6"}, {"k3", "v7"}});
LoadBalancerStatsResponse expected;
expected.mutable_rpcs_by_peer()->insert({{"peer1", 2}, {"peer2", 1}});
expected.mutable_metadatas_by_peer()->insert({
{"peer1", BuildMetadatas({{{"k1", "v1"}, {"k2", "v2"}}, {{"k1", "v4"}}})},
{"peer2", BuildMetadatas({{{"k1", "v5"}, {"k2", "v6"}}})},
});
(*expected.mutable_rpcs_by_method())["UnaryCall"]
.mutable_rpcs_by_peer()
->insert({{"peer1", 2}, {"peer2", 1}});
EXPECT_EQ(watcher.WaitForRpcStatsResponse(0).DebugString(),
expected.DebugString());
}
TEST(XdsStatsWatcherTest, WaitForRpcStatsResponseIgnoresMetadata) {
XdsStatsWatcher watcher(0, 3, {});
// RPC had metadata - but watcher should ignore it
watcher.RpcCompleted(BuildCallResult(0), "peer1",
{{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}});
// No metadata came with RPC
watcher.RpcCompleted(BuildCallResult(1), "peer1", {});
watcher.RpcCompleted(BuildCallResult(2), "peer2",
{{"k1", "v5"}, {"k2", "v6"}, {"k3", "v7"}});
LoadBalancerStatsResponse expected;
expected.mutable_rpcs_by_peer()->insert({{"peer1", 2}, {"peer2", 1}});
// There will still be an empty metadata collection for each RPC
expected.mutable_metadatas_by_peer()->insert({
{"peer1", BuildMetadatas({{}, {}})},
{"peer2", BuildMetadatas({{}})},
});
(*expected.mutable_rpcs_by_method())["UnaryCall"]
.mutable_rpcs_by_peer()
->insert({{"peer1", 2}, {"peer2", 1}});
EXPECT_EQ(watcher.WaitForRpcStatsResponse(0).DebugString(),
expected.DebugString());
} }
} // namespace } // namespace

Loading…
Cancel
Save