[PSM Interop] Return trailing metadata. (#34096)

1. Trailing metadata is now reported.
2. messages.proto was synchronized.
3. Corrected order of arguments in EXPECT_EQ so the output makes sense now.
pull/34174/head
Eugene Ostroukhov 2 years ago committed by GitHub
parent aaf6c3475c
commit 88df0a1c71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      src/proto/grpc/testing/messages.proto
  2. 3
      test/cpp/interop/xds_interop_client.cc
  3. 24
      test/cpp/interop/xds_stats_watcher.cc
  4. 8
      test/cpp/interop/xds_stats_watcher.h
  5. 104
      test/cpp/interop/xds_stats_watcher_test.cc

@ -1,4 +1,3 @@
// Copyright 2015-2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
@ -207,20 +206,29 @@ message LoadBalancerStatsRequest {
int32 num_rpcs = 1;
// If num_rpcs have not completed within timeout_sec, return partial results.
int32 timeout_sec = 2;
// response header+trailer we want the values of
// Response header + trailer metadata entries we want the values of.
// Matching of the keys is case-insensitive as per rfc7540#section-8.1.2
// * (asterisk) is a special value that will return all metadata entries
repeated string metadata_keys = 3;
}
message LoadBalancerStatsResponse {
enum MetadataType {
Initial = 0;
Trailing = 1;
}
message MetadataEntry {
// Key, exactly as received from the server. Case may be different from what
// was requested in the LoadBalancerStatsRequest)
string key = 1;
// Value, exactly as received from the server.
string value = 2;
// Metadata type is either Initial or Trailing
MetadataType type = 3;
}
message RpcMetadata {
// metadata values for each rpc for the keys specified in
// LoadBalancerStatsRequest.metadata_keys.
// metadata keys and values are returned exactly as was received
// from the server.
repeated MetadataEntry metadata = 1;
}
message MetadataByPeer {

@ -209,7 +209,8 @@ class TestClient {
: call->result.simple_response.hostname();
for (auto watcher : stats_watchers_->watchers) {
watcher->RpcCompleted(call->result, hostname,
call->context.GetServerInitialMetadata());
call->context.GetServerInitialMetadata(),
call->context.GetServerTrailingMetadata());
}
}

@ -24,21 +24,22 @@ namespace testing {
namespace {
LoadBalancerStatsResponse::RpcMetadata BuildRpcMetadata(
void AddRpcMetadata(
LoadBalancerStatsResponse::RpcMetadata* rpc_metadata,
const std::unordered_set<std::string>& included_keys, bool include_all_keys,
const std::multimap<grpc::string_ref, grpc::string_ref>& initial_metadata) {
LoadBalancerStatsResponse::RpcMetadata rpc_metadata;
for (const auto& key_value : initial_metadata) {
const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
LoadBalancerStatsResponse::MetadataType type) {
for (const auto& key_value : metadata) {
absl::string_view key(key_value.first.data(), key_value.first.length());
if (include_all_keys ||
included_keys.find(absl::AsciiStrToLower(key)) != included_keys.end()) {
auto entry = rpc_metadata.add_metadata();
auto entry = rpc_metadata->add_metadata();
entry->set_key(key);
entry->set_value(absl::string_view(key_value.second.data(),
key_value.second.length()));
entry->set_type(type);
}
}
return rpc_metadata;
}
std::unordered_set<std::string> ToLowerCase(
@ -65,7 +66,9 @@ XdsStatsWatcher::XdsStatsWatcher(int start_id, int end_id,
void XdsStatsWatcher::RpcCompleted(
const AsyncClientCallResult& call, const std::string& peer,
const std::multimap<grpc::string_ref, grpc::string_ref>& initial_metadata) {
const std::multimap<grpc::string_ref, grpc::string_ref>& initial_metadata,
const std::multimap<grpc::string_ref, grpc::string_ref>&
trailing_metadata) {
// We count RPCs for global watcher or if the request_id falls into the
// watcher's interested range of request ids.
if ((start_id_ == 0 && end_id_ == 0) ||
@ -79,8 +82,11 @@ void XdsStatsWatcher::RpcCompleted(
// RPC is counted into both per-peer bin and per-method-per-peer bin.
rpcs_by_peer_[peer]++;
rpcs_by_type_[call.rpc_type][peer]++;
*metadata_by_peer_[peer].add_rpc_metadata() = BuildRpcMetadata(
metadata_keys_, include_all_metadata_, initial_metadata);
auto* rpc_metadata = metadata_by_peer_[peer].add_rpc_metadata();
AddRpcMetadata(rpc_metadata, metadata_keys_, include_all_metadata_,
initial_metadata, LoadBalancerStatsResponse::Initial);
AddRpcMetadata(rpc_metadata, metadata_keys_, include_all_metadata_,
trailing_metadata, LoadBalancerStatsResponse::Trailing);
}
rpcs_needed_--;
// Report accumulated stats.

@ -76,9 +76,11 @@ class XdsStatsWatcher {
// Upon the completion of an RPC, we will look at the request_id, the
// rpc_type, and the peer the RPC was sent to in order to count
// this RPC into the right stats bin.
void RpcCompleted(const AsyncClientCallResult& call, const std::string& peer,
const std::multimap<grpc::string_ref, grpc::string_ref>&
initial_metadata);
void RpcCompleted(
const AsyncClientCallResult& call, const std::string& peer,
const std::multimap<grpc::string_ref, grpc::string_ref>& initial_metadata,
const std::multimap<grpc::string_ref, grpc::string_ref>&
trailing_metadata);
LoadBalancerStatsResponse WaitForRpcStatsResponse(int timeout_sec);

@ -37,16 +37,25 @@ AsyncClientCallResult BuildCallResult(int saved_request_id) {
return result;
}
struct MetadataEntryInit {
absl::string_view key;
absl::string_view value;
bool is_trailing;
};
LoadBalancerStatsResponse::MetadataByPeer BuildMetadatas(
const std::initializer_list<
std::initializer_list<std::pair<std::string, std::string>>>& values) {
const std::initializer_list<std::initializer_list<MetadataEntryInit>>&
values) {
LoadBalancerStatsResponse::MetadataByPeer metadata_by_peer;
for (const auto& per_rpc : values) {
auto rpc_metadata = metadata_by_peer.add_rpc_metadata();
for (const auto& key_value : per_rpc) {
auto entry = rpc_metadata->add_metadata();
entry->set_key(key_value.first);
entry->set_value(key_value.second);
entry->set_key(key_value.key);
entry->set_value(key_value.value);
entry->set_type(key_value.is_trailing
? LoadBalancerStatsResponse::Trailing
: LoadBalancerStatsResponse::Initial);
}
}
return metadata_by_peer;
@ -54,76 +63,103 @@ LoadBalancerStatsResponse::MetadataByPeer BuildMetadatas(
TEST(XdsStatsWatcherTest, WaitForRpcStatsResponse) {
// "k3" will be ignored
XdsStatsWatcher watcher(0, 3, {"k1", "k2"});
XdsStatsWatcher watcher(0, 4, {"k1", "k2"});
watcher.RpcCompleted(BuildCallResult(0), "peer1",
{{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}});
watcher.RpcCompleted(BuildCallResult(1), "peer1", {{"k1", "v4"}});
watcher.RpcCompleted(BuildCallResult(2), "peer2",
{{"k1", "v5"}, {"k2", "v6"}, {"k3", "v7"}});
{{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}},
{{"k1", "t1"}, {"k3", "t3"}});
watcher.RpcCompleted(BuildCallResult(1), "peer1", {{"k1", "v4"}}, {});
watcher.RpcCompleted(BuildCallResult(2), "peer1", {}, {});
watcher.RpcCompleted(BuildCallResult(3), "peer2",
{{"k1", "v5"}, {"k2", "v6"}, {"k3", "v7"}},
{{"k1", "t5"}, {"k3", "t7"}});
LoadBalancerStatsResponse expected;
expected.mutable_rpcs_by_peer()->insert({{"peer1", 2}, {"peer2", 1}});
expected.mutable_rpcs_by_peer()->insert({{"peer1", 3}, {"peer2", 1}});
expected.mutable_metadatas_by_peer()->insert({
{"peer1", BuildMetadatas({{{"k1", "v1"}, {"k2", "v2"}}, {{"k1", "v4"}}})},
{"peer2", BuildMetadatas({{{"k1", "v5"}, {"k2", "v6"}}})},
{"peer1",
BuildMetadatas({
{{"k1", "v1", false}, {"k2", "v2", false}, {"k1", "t1", true}},
{{"k1", "v4", false}},
{},
})},
{"peer2",
BuildMetadatas({
{{"k1", "v5", false}, {"k2", "v6", false}, {"k1", "t5", true}},
})},
});
(*expected.mutable_rpcs_by_method())["UnaryCall"]
.mutable_rpcs_by_peer()
->insert({{"peer1", 2}, {"peer2", 1}});
EXPECT_EQ(watcher.WaitForRpcStatsResponse(0).DebugString(),
expected.DebugString());
->insert({{"peer1", 3}, {"peer2", 1}});
EXPECT_EQ(expected.DebugString(),
watcher.WaitForRpcStatsResponse(0).DebugString());
}
TEST(XdsStatsWatcherTest, WaitForRpcStatsResponseIgnoresCase) {
// "k3" will be ignored
XdsStatsWatcher watcher(0, 3, {"k1", "K2"});
watcher.RpcCompleted(BuildCallResult(0), "peer1",
{{"K1", "v1"}, {"k2", "v2"}, {"k3", "v3"}});
watcher.RpcCompleted(BuildCallResult(1), "peer1", {{"k1", "v4"}});
watcher.RpcCompleted(BuildCallResult(2), "peer2",
{{"K1", "v1"}, {"k2", "v2"}, {"k3", "v3"}},
{{"K1", "t1"}, {"k2", "t2"}});
watcher.RpcCompleted(BuildCallResult(1), "peer1", {}, {});
watcher.RpcCompleted(BuildCallResult(2), "peer2", {},
{{"k1", "v5"}, {"K2", "v6"}, {"k3", "v7"}});
LoadBalancerStatsResponse expected;
expected.mutable_rpcs_by_peer()->insert({{"peer1", 2}, {"peer2", 1}});
expected.mutable_metadatas_by_peer()->insert({
{"peer1", BuildMetadatas({{{"K1", "v1"}, {"k2", "v2"}}, {{"k1", "v4"}}})},
{"peer2", BuildMetadatas({{{"K2", "v6"}, {"k1", "v5"}}})},
{"peer1", BuildMetadatas({
{{"K1", "v1", false},
{"k2", "v2", false},
{"K1", "t1", true},
{"k2", "t2", true}},
{},
})},
{"peer2", BuildMetadatas({{{"K2", "v6", true}, {"k1", "v5", true}}})},
});
(*expected.mutable_rpcs_by_method())["UnaryCall"]
.mutable_rpcs_by_peer()
->insert({{"peer1", 2}, {"peer2", 1}});
EXPECT_EQ(watcher.WaitForRpcStatsResponse(0).DebugString(),
expected.DebugString());
EXPECT_EQ(expected.DebugString(),
watcher.WaitForRpcStatsResponse(0).DebugString());
}
TEST(XdsStatsWatcherTest, WaitForRpcStatsResponseReturnsAll) {
// "k3" will be ignored
XdsStatsWatcher watcher(0, 3, {"*"});
watcher.RpcCompleted(BuildCallResult(0), "peer1",
{{"K1", "v1"}, {"k2", "v2"}, {"k3", "v3"}});
watcher.RpcCompleted(BuildCallResult(1), "peer1", {{"k1", "v4"}});
watcher.RpcCompleted(BuildCallResult(2), "peer2",
{{"K1", "v1"}, {"k2", "v2"}, {"k3", "v3"}},
{{"K1", "t1"}, {"k2", "t2"}});
watcher.RpcCompleted(BuildCallResult(1), "peer1", {}, {});
watcher.RpcCompleted(BuildCallResult(2), "peer2", {},
{{"k1", "v5"}, {"K2", "v6"}, {"k3", "v7"}});
LoadBalancerStatsResponse expected;
expected.mutable_rpcs_by_peer()->insert({{"peer1", 2}, {"peer2", 1}});
expected.mutable_metadatas_by_peer()->insert({
{"peer1", BuildMetadatas({{{"K1", "v1"}, {"k2", "v2"}, {"k3", "v3"}},
{{"k1", "v4"}}})},
{"peer2", BuildMetadatas({{{"K2", "v6"}, {"k1", "v5"}, {"k3", "v7"}}})},
{"peer1", BuildMetadatas({
{{"K1", "v1", false},
{"k2", "v2", false},
{"k3", "v3", false},
{"K1", "t1", true},
{"k2", "t2", true}},
{},
})},
{"peer2",
BuildMetadatas(
{{{"K2", "v6", true}, {"k1", "v5", true}, {"k3", "v7", true}}})},
});
(*expected.mutable_rpcs_by_method())["UnaryCall"]
.mutable_rpcs_by_peer()
->insert({{"peer1", 2}, {"peer2", 1}});
EXPECT_EQ(watcher.WaitForRpcStatsResponse(0).DebugString(),
expected.DebugString());
EXPECT_EQ(expected.DebugString(),
watcher.WaitForRpcStatsResponse(0).DebugString());
}
TEST(XdsStatsWatcherTest, WaitForRpcStatsResponseIgnoresMetadata) {
XdsStatsWatcher watcher(0, 3, {});
// RPC had metadata - but watcher should ignore it
watcher.RpcCompleted(BuildCallResult(0), "peer1",
{{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}});
// No metadata came with RPC
watcher.RpcCompleted(BuildCallResult(1), "peer1", {});
watcher.RpcCompleted(BuildCallResult(2), "peer2",
{{"K1", "v1"}, {"k2", "v2"}, {"k3", "v3"}},
{{"K1", "t1"}, {"k2", "t2"}});
watcher.RpcCompleted(BuildCallResult(1), "peer1", {{"k1", "v4"}}, {});
watcher.RpcCompleted(BuildCallResult(2), "peer2", {},
{{"k1", "v5"}, {"k2", "v6"}, {"k3", "v7"}});
LoadBalancerStatsResponse expected;
expected.mutable_rpcs_by_peer()->insert({{"peer1", 2}, {"peer2", 1}});

Loading…
Cancel
Save