xDS: Include orca named_metrics in LRS load reports (#32690)

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/32744/head
Yousuk Seung 2 years ago committed by GitHub
parent ed38592d76
commit c02b3e695c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 10
      include/grpcpp/ext/call_metric_recorder.h
  3. 1
      src/core/BUILD
  4. 7
      src/core/ext/filters/backend_metrics/backend_metric_filter.cc
  5. 5
      src/core/ext/filters/client_channel/backend_metric.cc
  6. 3
      src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h
  7. 9
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  8. 44
      src/core/ext/xds/xds_client_stats.cc
  9. 44
      src/core/ext/xds/xds_client_stats.h
  10. 12
      src/core/lib/gprpp/per_cpu.h
  11. 21
      src/cpp/server/backend_metric_recorder.cc
  12. 3
      src/cpp/server/backend_metric_recorder.h
  13. 20
      test/cpp/end2end/client_lb_end2end_test.cc
  14. 119
      test/cpp/end2end/xds/xds_cluster_end2end_test.cc
  15. 3
      test/cpp/end2end/xds/xds_end2end_test_lib.cc
  16. 19
      test/cpp/end2end/xds/xds_end2end_test_lib.h
  17. 22
      test/cpp/end2end/xds/xds_server.h

@ -3484,6 +3484,7 @@ grpc_cc_library(
"//src/core:dual_ref_counted",
"//src/core:env",
"//src/core:json",
"//src/core:per_cpu",
"//src/core:ref_counted",
"//src/core:time",
"//src/core:upb_utils",

@ -78,6 +78,16 @@ class CallMetricRecorder {
/// are global constants.
virtual CallMetricRecorder& RecordRequestCostMetric(string_ref name,
double value) = 0;
/// Records an application-specific opaque metric measurement.
/// Multiple calls to this method with the same name will
/// override the corresponding stored value. The lifetime of the
/// name string needs to be longer than the lifetime of the RPC
/// itself, since it's going to be sent as trailers after the RPC
/// finishes. It is assumed the strings are common names that
/// are global constants.
virtual CallMetricRecorder& RecordNamedMetric(string_ref name,
double value) = 0;
};
} // namespace experimental

@ -4239,6 +4239,7 @@ grpc_cc_library(
language = "c++",
deps = [
"channel_args",
"grpc_backend_metric_data",
"grpc_lb_xds_attributes",
"grpc_lb_xds_channel_args",
"grpc_xds_client",

@ -89,6 +89,13 @@ absl::optional<std::string> BackendMetricFilter::MaybeSerializeBackendMetrics(
p.second, arena.ptr());
has_data = true;
}
for (const auto& p : data.named_metrics) {
xds_data_orca_v3_OrcaLoadReport_named_metrics_set(
response,
upb_StringView_FromDataAndSize(p.first.data(), p.first.size()),
p.second, arena.ptr());
has_data = true;
}
if (!has_data) {
return absl::nullopt;
}

@ -81,6 +81,11 @@ const BackendMetricData* ParseBackendMetricData(
msg, xds_data_orca_v3_OrcaLoadReport_utilization_next,
xds_data_orca_v3_OrcaLoadReport_UtilizationEntry_key,
xds_data_orca_v3_OrcaLoadReport_UtilizationEntry_value, allocator);
backend_metric_data->named_metrics =
ParseMap<xds_data_orca_v3_OrcaLoadReport_NamedMetricsEntry>(
msg, xds_data_orca_v3_OrcaLoadReport_named_metrics_next,
xds_data_orca_v3_OrcaLoadReport_NamedMetricsEntry_key,
xds_data_orca_v3_OrcaLoadReport_NamedMetricsEntry_value, allocator);
return backend_metric_data;
}

@ -44,6 +44,9 @@ struct BackendMetricData {
/// are determined by the application. Each value is expressed as a
/// fraction of total resources available.
std::map<absl::string_view, double> utilization;
/// Application-specific opaque metrics. Metric names are determined by the
/// the application. Each value is an opaque measurement.
std::map<absl::string_view, double> named_metrics;
};
} // namespace grpc_core

@ -38,6 +38,7 @@
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
@ -332,7 +333,13 @@ class XdsClusterImplLb::Picker::SubchannelCallTracker
}
// Record call completion for load reporting.
if (locality_stats_ != nullptr) {
locality_stats_->AddCallFinished(!args.status.ok());
auto* backend_metric_data =
args.backend_metric_accessor->GetBackendMetricData();
const std::map<absl::string_view, double>* named_metrics = nullptr;
if (backend_metric_data != nullptr) {
named_metrics = &backend_metric_data->named_metrics;
}
locality_stats_->AddCallFinished(named_metrics, !args.status.ok());
}
// Decrement number of calls in flight.
call_counter_->Decrement();

@ -131,29 +131,43 @@ XdsClusterLocalityStats::~XdsClusterLocalityStats() {
XdsClusterLocalityStats::Snapshot
XdsClusterLocalityStats::GetSnapshotAndReset() {
Snapshot snapshot = {
GetAndResetCounter(&total_successful_requests_),
// Don't reset total_requests_in_progress because it's
// not related to a single reporting interval.
total_requests_in_progress_.load(std::memory_order_relaxed),
GetAndResetCounter(&total_error_requests_),
GetAndResetCounter(&total_issued_requests_),
{}};
MutexLock lock(&backend_metrics_mu_);
snapshot.backend_metrics = std::move(backend_metrics_);
Snapshot snapshot;
for (auto& percpu_stats : stats_) {
Snapshot percpu_snapshot = {
GetAndResetCounter(&percpu_stats.total_successful_requests),
// Don't reset total_requests_in_progress because it's
// not related to a single reporting interval.
percpu_stats.total_requests_in_progress.load(std::memory_order_relaxed),
GetAndResetCounter(&percpu_stats.total_error_requests),
GetAndResetCounter(&percpu_stats.total_issued_requests),
{}};
{
MutexLock lock(&percpu_stats.backend_metrics_mu);
percpu_snapshot.backend_metrics = std::move(percpu_stats.backend_metrics);
}
snapshot += percpu_snapshot;
}
return snapshot;
}
void XdsClusterLocalityStats::AddCallStarted() {
total_issued_requests_.fetch_add(1, std::memory_order_relaxed);
total_requests_in_progress_.fetch_add(1, std::memory_order_relaxed);
Stats& stats = stats_.this_cpu();
stats.total_issued_requests.fetch_add(1, std::memory_order_relaxed);
stats.total_requests_in_progress.fetch_add(1, std::memory_order_relaxed);
}
void XdsClusterLocalityStats::AddCallFinished(bool fail) {
void XdsClusterLocalityStats::AddCallFinished(
const std::map<absl::string_view, double>* named_metrics, bool fail) {
Stats& stats = stats_.this_cpu();
std::atomic<uint64_t>& to_increment =
fail ? total_error_requests_ : total_successful_requests_;
fail ? stats.total_error_requests : stats.total_successful_requests;
to_increment.fetch_add(1, std::memory_order_relaxed);
total_requests_in_progress_.fetch_add(-1, std::memory_order_acq_rel);
stats.total_requests_in_progress.fetch_add(-1, std::memory_order_acq_rel);
if (named_metrics == nullptr) return;
MutexLock lock(&stats.backend_metrics_mu);
for (const auto& m : *named_metrics) {
stats.backend_metrics[std::string(m.first)] += BackendMetric{1, m.second};
}
}
} // namespace grpc_core

@ -34,6 +34,7 @@
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/per_cpu.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@ -159,8 +160,8 @@ class XdsClusterDropStats : public RefCounted<XdsClusterDropStats> {
class XdsClusterLocalityStats : public RefCounted<XdsClusterLocalityStats> {
public:
struct BackendMetric {
uint64_t num_requests_finished_with_metric;
double total_metric_value;
uint64_t num_requests_finished_with_metric = 0;
double total_metric_value = 0;
BackendMetric& operator+=(const BackendMetric& other) {
num_requests_finished_with_metric +=
@ -175,10 +176,10 @@ class XdsClusterLocalityStats : public RefCounted<XdsClusterLocalityStats> {
};
struct Snapshot {
uint64_t total_successful_requests;
uint64_t total_requests_in_progress;
uint64_t total_error_requests;
uint64_t total_issued_requests;
uint64_t total_successful_requests = 0;
uint64_t total_requests_in_progress = 0;
uint64_t total_error_requests = 0;
uint64_t total_issued_requests = 0;
std::map<std::string, BackendMetric> backend_metrics;
Snapshot& operator+=(const Snapshot& other) {
@ -215,27 +216,30 @@ class XdsClusterLocalityStats : public RefCounted<XdsClusterLocalityStats> {
Snapshot GetSnapshotAndReset();
void AddCallStarted();
void AddCallFinished(bool fail = false);
void AddCallFinished(const std::map<absl::string_view, double>* named_metrics,
bool fail = false);
private:
struct Stats {
std::atomic<uint64_t> total_successful_requests{0};
std::atomic<uint64_t> total_requests_in_progress{0};
std::atomic<uint64_t> total_error_requests{0};
std::atomic<uint64_t> total_issued_requests{0};
// Protects backend_metrics. A mutex is necessary because the length of
// backend_metrics_ can be accessed by both the callback intercepting the
// call's recv_trailing_metadata and the load reporting thread.
Mutex backend_metrics_mu;
std::map<std::string, BackendMetric> backend_metrics
ABSL_GUARDED_BY(backend_metrics_mu);
};
RefCountedPtr<XdsClient> xds_client_;
const XdsBootstrap::XdsServer& lrs_server_;
absl::string_view cluster_name_;
absl::string_view eds_service_name_;
RefCountedPtr<XdsLocalityName> name_;
std::atomic<uint64_t> total_successful_requests_{0};
std::atomic<uint64_t> total_requests_in_progress_{0};
std::atomic<uint64_t> total_error_requests_{0};
std::atomic<uint64_t> total_issued_requests_{0};
// Protects backend_metrics_. A mutex is necessary because the length of
// backend_metrics_ can be accessed by both the callback intercepting the
// call's recv_trailing_metadata (not from the control plane work serializer)
// and the load reporting thread (from the control plane work serializer).
Mutex backend_metrics_mu_;
std::map<std::string, BackendMetric> backend_metrics_
ABSL_GUARDED_BY(backend_metrics_mu_);
PerCpu<Stats> stats_{32};
};
} // namespace grpc_core

@ -17,7 +17,9 @@
#include <grpc/support/port_platform.h>
#include <algorithm>
#include <cstddef>
#include <limits>
#include <memory>
#include <grpc/support/cpu.h>
@ -29,7 +31,11 @@ namespace grpc_core {
template <typename T>
class PerCpu {
public:
T& this_cpu() { return data_[ExecCtx::Get()->starting_cpu()]; }
explicit PerCpu(size_t max = std::numeric_limits<size_t>::max())
: cpus_(std::min<size_t>(max, gpr_cpu_num_cores())),
data_{new T[cpus_]} {}
T& this_cpu() { return data_[ExecCtx::Get()->starting_cpu() % cpus_]; }
T* begin() { return data_.get(); }
T* end() { return data_.get() + cpus_; }
@ -37,8 +43,8 @@ class PerCpu {
const T* end() const { return data_.get() + cpus_; }
private:
const size_t cpus_ = gpr_cpu_num_cores();
std::unique_ptr<T[]> data_{new T[cpus_]};
const size_t cpus_;
std::unique_ptr<T[]> data_;
};
} // namespace grpc_core

@ -309,6 +309,18 @@ experimental::CallMetricRecorder& BackendMetricState::RecordRequestCostMetric(
return *this;
}
experimental::CallMetricRecorder& BackendMetricState::RecordNamedMetric(
string_ref name, double value) {
internal::MutexLock lock(&mu_);
absl::string_view name_sv(name.data(), name.length());
named_metrics_[name_sv] = value;
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Named metric recorded: %s %f", this,
std::string(name_sv).c_str(), value);
}
return *this;
}
BackendMetricData BackendMetricState::GetBackendMetricData() {
// Merge metrics from the ServerMetricRecorder first since metrics recorded
// to CallMetricRecorder takes a higher precedence.
@ -341,13 +353,18 @@ BackendMetricData BackendMetricState::GetBackendMetricData() {
for (const auto& r : request_cost_) {
data.request_cost[r.first] = r.second;
}
for (const auto& r : named_metrics_) {
data.named_metrics[r.first] = r.second;
}
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO,
"[%p] Backend metric data returned: cpu:%f mem:%f qps:%f eps:%f "
"utilization size:%" PRIuPTR " request_cost size:%" PRIuPTR,
"utilization size:%" PRIuPTR " request_cost size:%" PRIuPTR
"named_metrics size:%" PRIuPTR,
this, data.cpu_utilization, data.mem_utilization, data.qps,
data.eps, data.utilization.size(), data.request_cost.size());
data.eps, data.utilization.size(), data.request_cost.size(),
data.named_metrics.size());
}
return data;
}

@ -64,6 +64,8 @@ class BackendMetricState : public grpc_core::BackendMetricProvider,
string_ref name, double value) override;
experimental::CallMetricRecorder& RecordRequestCostMetric(
string_ref name, double value) override;
experimental::CallMetricRecorder& RecordNamedMetric(string_ref name,
double value) override;
// This clears metrics currently recorded. Don't call twice.
grpc_core::BackendMetricData GetBackendMetricData() override;
@ -76,6 +78,7 @@ class BackendMetricState : public grpc_core::BackendMetricProvider,
internal::Mutex mu_;
std::map<absl::string_view, double> utilization_ ABSL_GUARDED_BY(mu_);
std::map<absl::string_view, double> request_cost_ ABSL_GUARDED_BY(mu_);
std::map<absl::string_view, double> named_metrics_ ABSL_GUARDED_BY(mu_);
};
} // namespace grpc

@ -155,6 +155,13 @@ class MyTestServiceImpl : public TestServiceImpl {
key[p.first.size()] = '\0';
recorder->RecordUtilizationMetric(key, p.second);
}
for (const auto& p : request_metrics.named_metrics()) {
char* key = static_cast<char*>(
grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));
strncpy(key, p.first.data(), p.first.size());
key[p.first.size()] = '\0';
recorder->RecordNamedMetric(key, p.second);
}
}
return TestServiceImpl::Echo(context, request, response);
}
@ -2357,6 +2364,10 @@ xds::data::orca::v3::OrcaLoadReport BackendMetricDataToOrcaLoadReport(
std::string name(p.first);
(*load_report.mutable_utilization())[name] = p.second;
}
for (const auto& p : backend_metric_data.named_metrics) {
std::string name(p.first);
(*load_report.mutable_named_metrics())[name] = p.second;
}
return load_report;
}
@ -2593,6 +2604,12 @@ void CheckLoadReportAsExpected(
ASSERT_NE(it, expected.utilization().end());
EXPECT_EQ(it->second, p.second);
}
EXPECT_EQ(actual.named_metrics().size(), expected.named_metrics().size());
for (const auto& p : actual.named_metrics()) {
auto it = expected.named_metrics().find(p.first);
ASSERT_NE(it, expected.named_metrics().end());
EXPECT_EQ(it->second, p.second);
}
}
TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) {
@ -2613,6 +2630,9 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) {
// This will be rejected.
utilization["out_of_range_invalid1"] = 1.1;
utilization["out_of_range_invalid2"] = -1.1;
auto& named_metrics = *load_report.mutable_named_metrics();
named_metrics["metric0"] = 3.0;
named_metrics["metric1"] = -1.0;
auto expected = load_report;
expected.mutable_utilization()->erase("out_of_range_invalid1");
expected.mutable_utilization()->erase("out_of_range_invalid2");

@ -26,6 +26,7 @@
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/config/config_vars.h"
#include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h"
#include "test/cpp/end2end/connection_attempt_injector.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
@ -1375,6 +1376,18 @@ INSTANTIATE_TEST_SUITE_P(
::testing::Values(XdsTestType().set_enable_load_reporting()),
&XdsTestType::Name);
MATCHER_P2(LoadMetricEq, num_requests_finished_with_metric, total_metric_value,
"equals LoadMetric") {
bool match = true;
match &= ::testing::ExplainMatchResult(num_requests_finished_with_metric,
arg.num_requests_finished_with_metric,
result_listener);
match &=
::testing::ExplainMatchResult(::testing::DoubleEq(total_metric_value),
arg.total_metric_value, result_listener);
return match;
}
// Tests that the load report received at the balancer is correct.
TEST_P(ClientLoadReportingTest, Vanilla) {
CreateAndStartBackends(4);
@ -1389,11 +1402,19 @@ TEST_P(ClientLoadReportingTest, Vanilla) {
size_t num_warmup_rpcs =
WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false));
// Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
// Send kNumRpcsPerAddress RPCs per server with named metrics.
xds::data::orca::v3::OrcaLoadReport backend_metrics;
auto& named_metrics = (*backend_metrics.mutable_named_metrics());
named_metrics["foo"] = 1.0;
named_metrics["bar"] = 2.0;
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
RpcOptions().set_backend_metrics(backend_metrics));
named_metrics["foo"] = 0.3;
named_metrics["bar"] = 0.4;
for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
RpcOptions().set_server_fail(true));
RpcOptions().set_server_fail(true).set_backend_metrics(
backend_metrics));
}
const size_t total_successful_rpcs_sent =
(kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
@ -1425,6 +1446,8 @@ TEST_P(ClientLoadReportingTest, Vanilla) {
::testing::Pair("locality1", ::testing::_)));
size_t num_successful_rpcs = 0;
size_t num_failed_rpcs = 0;
std::map<std::string, ClientStats::LocalityStats::LoadMetric>
named_metrics_total;
for (const auto& p : client_stats.locality_stats()) {
EXPECT_EQ(p.second.total_requests_in_progress, 0U);
EXPECT_EQ(
@ -1432,10 +1455,30 @@ TEST_P(ClientLoadReportingTest, Vanilla) {
p.second.total_successful_requests + p.second.total_error_requests);
num_successful_rpcs += p.second.total_successful_requests;
num_failed_rpcs += p.second.total_error_requests;
for (const auto& s : p.second.load_metrics) {
named_metrics_total[s.first] += s.second;
}
}
EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
EXPECT_THAT(
named_metrics_total,
::testing::UnorderedElementsAre(
::testing::Pair(
"foo",
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) *
backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 1.0 +
(kNumFailuresPerAddress * backends_.size()) * 0.3)),
::testing::Pair(
"bar",
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) *
backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 2.0 +
(kNumFailuresPerAddress * backends_.size()) * 0.4))));
// The LRS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
@ -1452,10 +1495,18 @@ TEST_P(ClientLoadReportingTest, SendAllClusters) {
// Wait until all backends are ready.
size_t num_warmup_rpcs = WaitForAllBackends(DEBUG_LOCATION);
// Send kNumRpcsPerAddress RPCs per server.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
xds::data::orca::v3::OrcaLoadReport backend_metrics;
auto& named_metrics = (*backend_metrics.mutable_named_metrics());
named_metrics["foo"] = 1.0;
named_metrics["bar"] = 2.0;
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
RpcOptions().set_backend_metrics(backend_metrics));
named_metrics["foo"] = 0.3;
named_metrics["bar"] = 0.4;
for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
RpcOptions().set_server_fail(true));
RpcOptions().set_server_fail(true).set_backend_metrics(
backend_metrics));
}
// Check that each backend got the right number of requests.
for (size_t i = 0; i < backends_.size(); ++i) {
@ -1476,6 +1527,29 @@ TEST_P(ClientLoadReportingTest, SendAllClusters) {
EXPECT_EQ(kNumFailuresPerAddress * backends_.size(),
client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
EXPECT_THAT(
client_stats.locality_stats(),
::testing::ElementsAre(::testing::Pair(
"locality0",
::testing::Field(
&ClientStats::LocalityStats::load_metrics,
::testing::UnorderedElementsAre(
::testing::Pair(
"foo",
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) *
backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 1.0 +
(kNumFailuresPerAddress * backends_.size()) *
0.3)),
::testing::Pair(
"bar",
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) *
backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 2.0 +
(kNumFailuresPerAddress * backends_.size()) *
0.4)))))));
// The LRS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
@ -1515,6 +1589,11 @@ TEST_P(ClientLoadReportingTest, BalancerRestart) {
EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(0U, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
EXPECT_THAT(client_stats.locality_stats(),
::testing::ElementsAre(::testing::Pair(
"locality0",
::testing::Field(&ClientStats::LocalityStats::load_metrics,
::testing::IsEmpty()))));
// Shut down the balancer.
balancer_->Shutdown();
// We should continue using the last EDS response we received from the
@ -1537,7 +1616,12 @@ TEST_P(ClientLoadReportingTest, BalancerRestart) {
// This tells us that we're now using the new serverlist.
num_rpcs += WaitForAllBackends(DEBUG_LOCATION, 2, 4);
// Send one RPC per backend.
CheckRpcSendOk(DEBUG_LOCATION, 2);
xds::data::orca::v3::OrcaLoadReport backend_metrics;
auto& named_metrics = (*backend_metrics.mutable_named_metrics());
named_metrics["foo"] = 1.0;
named_metrics["bar"] = 2.0;
CheckRpcSendOk(DEBUG_LOCATION, 2,
RpcOptions().set_backend_metrics(backend_metrics));
num_rpcs += 2;
// Check client stats.
load_report = balancer_->lrs_service()->WaitForLoadReport();
@ -1547,6 +1631,14 @@ TEST_P(ClientLoadReportingTest, BalancerRestart) {
EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(0U, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
EXPECT_THAT(client_stats.locality_stats(),
::testing::ElementsAre(::testing::Pair(
"locality0",
::testing::Field(
&ClientStats::LocalityStats::load_metrics,
::testing::UnorderedElementsAre(
::testing::Pair("foo", LoadMetricEq(2, 2.0)),
::testing::Pair("bar", LoadMetricEq(2, 4.0)))))));
}
// Tests load reporting when switching over from one cluster to another.
@ -1600,7 +1692,10 @@ TEST_P(ClientLoadReportingTest, ChangeClusters) {
0UL),
::testing::Field(
&ClientStats::LocalityStats::total_issued_requests,
num_rpcs))))),
num_rpcs),
::testing::Field(
&ClientStats::LocalityStats::load_metrics,
::testing::IsEmpty()))))),
::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
// Change RDS resource to point to new cluster.
RouteConfiguration new_route_config = default_route_config_;
@ -1638,7 +1733,10 @@ TEST_P(ClientLoadReportingTest, ChangeClusters) {
0UL),
::testing::Field(&ClientStats::LocalityStats::
total_issued_requests,
::testing::Le(num_rpcs)))))),
::testing::Le(num_rpcs)),
::testing::Field(
&ClientStats::LocalityStats::load_metrics,
::testing::IsEmpty()))))),
::testing::Property(&ClientStats::total_dropped_requests, 0UL)),
::testing::AllOf(
::testing::Property(&ClientStats::cluster_name, kNewClusterName),
@ -1660,7 +1758,10 @@ TEST_P(ClientLoadReportingTest, ChangeClusters) {
0UL),
::testing::Field(&ClientStats::LocalityStats::
total_issued_requests,
::testing::Le(num_rpcs)))))),
::testing::Le(num_rpcs)),
::testing::Field(
&ClientStats::LocalityStats::load_metrics,
::testing::IsEmpty()))))),
::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
size_t total_ok = 0;
for (const ClientStats& client_stats : load_report) {

@ -435,6 +435,9 @@ void XdsEnd2endTest::RpcOptions::SetupRpc(ClientContext* context,
if (skip_cancelled_check) {
request->mutable_param()->set_skip_cancelled_check(true);
}
if (backend_metrics.has_value()) {
*request->mutable_param()->mutable_backend_metrics() = *backend_metrics;
}
}
//

@ -44,6 +44,7 @@
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/http_filter_rbac.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h"
#include "test/core/util/port.h"
#include "test/cpp/end2end/counted_service.h"
#include "test/cpp/end2end/test_service_impl.h"
@ -304,6 +305,17 @@ class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType> {
last_peer_identity_.emplace_back(entry.data(), entry.size());
}
}
if (request->has_param() && request->param().has_backend_metrics()) {
const auto& request_metrics = request->param().backend_metrics();
auto* recorder = context->ExperimentalGetCallMetricRecorder();
for (const auto& p : request_metrics.named_metrics()) {
char* key = static_cast<char*>(
grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));
strncpy(key, p.first.data(), p.first.size());
key[p.first.size()] = '\0';
recorder->RecordNamedMetric(key, p.second);
}
}
return status;
}
@ -710,6 +722,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType> {
int client_cancel_after_us = 0;
bool skip_cancelled_check = false;
StatusCode server_expected_error = StatusCode::OK;
absl::optional<xds::data::orca::v3::OrcaLoadReport> backend_metrics;
RpcOptions() {}
@ -769,6 +782,12 @@ class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType> {
return *this;
}
RpcOptions& set_backend_metrics(
absl::optional<xds::data::orca::v3::OrcaLoadReport> metrics) {
backend_metrics = std::move(metrics);
return *this;
}
// Populates context and request.
void SetupRpc(ClientContext* context, EchoRequest* request) const;
};

@ -612,6 +612,17 @@ class LrsServiceImpl
public:
// Stats for a given locality.
struct LocalityStats {
struct LoadMetric {
uint64_t num_requests_finished_with_metric;
double total_metric_value;
LoadMetric& operator+=(const LoadMetric& other) {
num_requests_finished_with_metric +=
other.num_requests_finished_with_metric;
total_metric_value += other.total_metric_value;
return *this;
}
};
LocalityStats() {}
// Converts from proto message class.
@ -625,13 +636,21 @@ class LrsServiceImpl
total_error_requests(
upstream_locality_stats.total_error_requests()),
total_issued_requests(
upstream_locality_stats.total_issued_requests()) {}
upstream_locality_stats.total_issued_requests()) {
for (const auto& s : upstream_locality_stats.load_metric_stats()) {
load_metrics[s.metric_name()] += LoadMetric{
s.num_requests_finished_with_metric(), s.total_metric_value()};
}
}
LocalityStats& operator+=(const LocalityStats& other) {
total_successful_requests += other.total_successful_requests;
total_requests_in_progress += other.total_requests_in_progress;
total_error_requests += other.total_error_requests;
total_issued_requests += other.total_issued_requests;
for (const auto& p : other.load_metrics) {
load_metrics[p.first] += p.second;
}
return *this;
}
@ -639,6 +658,7 @@ class LrsServiceImpl
uint64_t total_requests_in_progress = 0;
uint64_t total_error_requests = 0;
uint64_t total_issued_requests = 0;
std::map<std::string, LoadMetric> load_metrics;
};
ClientStats() {}

Loading…
Cancel
Save