From c02b3e695cd2fea7b611dad9926845df0b4ee804 Mon Sep 17 00:00:00 2001 From: Yousuk Seung Date: Tue, 28 Mar 2023 12:04:01 -0700 Subject: [PATCH] xDS: Include orca named_metrics in LRS load reports (#32690) --- BUILD | 1 + include/grpcpp/ext/call_metric_recorder.h | 10 ++ src/core/BUILD | 1 + .../backend_metrics/backend_metric_filter.cc | 7 ++ .../filters/client_channel/backend_metric.cc | 5 + .../lb_policy/backend_metric_data.h | 3 + .../lb_policy/xds/xds_cluster_impl.cc | 9 +- src/core/ext/xds/xds_client_stats.cc | 44 ++++--- src/core/ext/xds/xds_client_stats.h | 44 ++++--- src/core/lib/gprpp/per_cpu.h | 12 +- src/cpp/server/backend_metric_recorder.cc | 21 +++- src/cpp/server/backend_metric_recorder.h | 3 + test/cpp/end2end/client_lb_end2end_test.cc | 20 +++ .../end2end/xds/xds_cluster_end2end_test.cc | 119 ++++++++++++++++-- test/cpp/end2end/xds/xds_end2end_test_lib.cc | 3 + test/cpp/end2end/xds/xds_end2end_test_lib.h | 19 +++ test/cpp/end2end/xds/xds_server.h | 22 +++- 17 files changed, 292 insertions(+), 51 deletions(-) diff --git a/BUILD b/BUILD index a485e0e2517..ed64d0b038f 100644 --- a/BUILD +++ b/BUILD @@ -3484,6 +3484,7 @@ grpc_cc_library( "//src/core:dual_ref_counted", "//src/core:env", "//src/core:json", + "//src/core:per_cpu", "//src/core:ref_counted", "//src/core:time", "//src/core:upb_utils", diff --git a/include/grpcpp/ext/call_metric_recorder.h b/include/grpcpp/ext/call_metric_recorder.h index 3f64b0cad76..140a0b18278 100644 --- a/include/grpcpp/ext/call_metric_recorder.h +++ b/include/grpcpp/ext/call_metric_recorder.h @@ -78,6 +78,16 @@ class CallMetricRecorder { /// are global constants. virtual CallMetricRecorder& RecordRequestCostMetric(string_ref name, double value) = 0; + + /// Records an application-specific opaque metric measurement. + /// Multiple calls to this method with the same name will + /// override the corresponding stored value. The lifetime of the + /// name string needs to be longer than the lifetime of the RPC + /// itself, since it's going to be sent as trailers after the RPC + /// finishes. It is assumed the strings are common names that + /// are global constants. + virtual CallMetricRecorder& RecordNamedMetric(string_ref name, + double value) = 0; }; } // namespace experimental diff --git a/src/core/BUILD b/src/core/BUILD index c01ff574086..3f8ef0d0547 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4239,6 +4239,7 @@ grpc_cc_library( language = "c++", deps = [ "channel_args", + "grpc_backend_metric_data", "grpc_lb_xds_attributes", "grpc_lb_xds_channel_args", "grpc_xds_client", diff --git a/src/core/ext/filters/backend_metrics/backend_metric_filter.cc b/src/core/ext/filters/backend_metrics/backend_metric_filter.cc index 311537490ce..2eb13bcd888 100644 --- a/src/core/ext/filters/backend_metrics/backend_metric_filter.cc +++ b/src/core/ext/filters/backend_metrics/backend_metric_filter.cc @@ -89,6 +89,13 @@ absl::optional BackendMetricFilter::MaybeSerializeBackendMetrics( p.second, arena.ptr()); has_data = true; } + for (const auto& p : data.named_metrics) { + xds_data_orca_v3_OrcaLoadReport_named_metrics_set( + response, + upb_StringView_FromDataAndSize(p.first.data(), p.first.size()), + p.second, arena.ptr()); + has_data = true; + } if (!has_data) { return absl::nullopt; } diff --git a/src/core/ext/filters/client_channel/backend_metric.cc b/src/core/ext/filters/client_channel/backend_metric.cc index f3ceb86b37d..a0504d7be1d 100644 --- a/src/core/ext/filters/client_channel/backend_metric.cc +++ b/src/core/ext/filters/client_channel/backend_metric.cc @@ -81,6 +81,11 @@ const BackendMetricData* ParseBackendMetricData( msg, xds_data_orca_v3_OrcaLoadReport_utilization_next, xds_data_orca_v3_OrcaLoadReport_UtilizationEntry_key, xds_data_orca_v3_OrcaLoadReport_UtilizationEntry_value, allocator); + backend_metric_data->named_metrics = + ParseMap( + msg, xds_data_orca_v3_OrcaLoadReport_named_metrics_next, + xds_data_orca_v3_OrcaLoadReport_NamedMetricsEntry_key, + xds_data_orca_v3_OrcaLoadReport_NamedMetricsEntry_value, allocator); return backend_metric_data; } diff --git a/src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h b/src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h index 026b785c876..49cadea0666 100644 --- a/src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h +++ b/src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h @@ -44,6 +44,9 @@ struct BackendMetricData { /// are determined by the application. Each value is expressed as a /// fraction of total resources available. std::map utilization; + /// Application-specific opaque metrics. Metric names are determined by the + /// the application. Each value is an opaque measurement. + std::map named_metrics; }; } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc index e0cba751e5b..1f2929108ed 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc @@ -38,6 +38,7 @@ #include #include +#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_attributes.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h" @@ -332,7 +333,13 @@ class XdsClusterImplLb::Picker::SubchannelCallTracker } // Record call completion for load reporting. if (locality_stats_ != nullptr) { - locality_stats_->AddCallFinished(!args.status.ok()); + auto* backend_metric_data = + args.backend_metric_accessor->GetBackendMetricData(); + const std::map* named_metrics = nullptr; + if (backend_metric_data != nullptr) { + named_metrics = &backend_metric_data->named_metrics; + } + locality_stats_->AddCallFinished(named_metrics, !args.status.ok()); } // Decrement number of calls in flight. call_counter_->Decrement(); diff --git a/src/core/ext/xds/xds_client_stats.cc b/src/core/ext/xds/xds_client_stats.cc index 1cdca73220c..512d5996b12 100644 --- a/src/core/ext/xds/xds_client_stats.cc +++ b/src/core/ext/xds/xds_client_stats.cc @@ -131,29 +131,43 @@ XdsClusterLocalityStats::~XdsClusterLocalityStats() { XdsClusterLocalityStats::Snapshot XdsClusterLocalityStats::GetSnapshotAndReset() { - Snapshot snapshot = { - GetAndResetCounter(&total_successful_requests_), - // Don't reset total_requests_in_progress because it's - // not related to a single reporting interval. - total_requests_in_progress_.load(std::memory_order_relaxed), - GetAndResetCounter(&total_error_requests_), - GetAndResetCounter(&total_issued_requests_), - {}}; - MutexLock lock(&backend_metrics_mu_); - snapshot.backend_metrics = std::move(backend_metrics_); + Snapshot snapshot; + for (auto& percpu_stats : stats_) { + Snapshot percpu_snapshot = { + GetAndResetCounter(&percpu_stats.total_successful_requests), + // Don't reset total_requests_in_progress because it's + // not related to a single reporting interval. + percpu_stats.total_requests_in_progress.load(std::memory_order_relaxed), + GetAndResetCounter(&percpu_stats.total_error_requests), + GetAndResetCounter(&percpu_stats.total_issued_requests), + {}}; + { + MutexLock lock(&percpu_stats.backend_metrics_mu); + percpu_snapshot.backend_metrics = std::move(percpu_stats.backend_metrics); + } + snapshot += percpu_snapshot; + } return snapshot; } void XdsClusterLocalityStats::AddCallStarted() { - total_issued_requests_.fetch_add(1, std::memory_order_relaxed); - total_requests_in_progress_.fetch_add(1, std::memory_order_relaxed); + Stats& stats = stats_.this_cpu(); + stats.total_issued_requests.fetch_add(1, std::memory_order_relaxed); + stats.total_requests_in_progress.fetch_add(1, std::memory_order_relaxed); } -void XdsClusterLocalityStats::AddCallFinished(bool fail) { +void XdsClusterLocalityStats::AddCallFinished( + const std::map* named_metrics, bool fail) { + Stats& stats = stats_.this_cpu(); std::atomic& to_increment = - fail ? total_error_requests_ : total_successful_requests_; + fail ? stats.total_error_requests : stats.total_successful_requests; to_increment.fetch_add(1, std::memory_order_relaxed); - total_requests_in_progress_.fetch_add(-1, std::memory_order_acq_rel); + stats.total_requests_in_progress.fetch_add(-1, std::memory_order_acq_rel); + if (named_metrics == nullptr) return; + MutexLock lock(&stats.backend_metrics_mu); + for (const auto& m : *named_metrics) { + stats.backend_metrics[std::string(m.first)] += BackendMetric{1, m.second}; + } } } // namespace grpc_core diff --git a/src/core/ext/xds/xds_client_stats.h b/src/core/ext/xds/xds_client_stats.h index 0c5687e5f5b..d2f592e41e6 100644 --- a/src/core/ext/xds/xds_client_stats.h +++ b/src/core/ext/xds/xds_client_stats.h @@ -34,6 +34,7 @@ #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/per_cpu.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" @@ -159,8 +160,8 @@ class XdsClusterDropStats : public RefCounted { class XdsClusterLocalityStats : public RefCounted { public: struct BackendMetric { - uint64_t num_requests_finished_with_metric; - double total_metric_value; + uint64_t num_requests_finished_with_metric = 0; + double total_metric_value = 0; BackendMetric& operator+=(const BackendMetric& other) { num_requests_finished_with_metric += @@ -175,10 +176,10 @@ class XdsClusterLocalityStats : public RefCounted { }; struct Snapshot { - uint64_t total_successful_requests; - uint64_t total_requests_in_progress; - uint64_t total_error_requests; - uint64_t total_issued_requests; + uint64_t total_successful_requests = 0; + uint64_t total_requests_in_progress = 0; + uint64_t total_error_requests = 0; + uint64_t total_issued_requests = 0; std::map backend_metrics; Snapshot& operator+=(const Snapshot& other) { @@ -215,27 +216,30 @@ class XdsClusterLocalityStats : public RefCounted { Snapshot GetSnapshotAndReset(); void AddCallStarted(); - void AddCallFinished(bool fail = false); + void AddCallFinished(const std::map* named_metrics, + bool fail = false); private: + struct Stats { + std::atomic total_successful_requests{0}; + std::atomic total_requests_in_progress{0}; + std::atomic total_error_requests{0}; + std::atomic total_issued_requests{0}; + + // Protects backend_metrics. A mutex is necessary because the length of + // backend_metrics_ can be accessed by both the callback intercepting the + // call's recv_trailing_metadata and the load reporting thread. + Mutex backend_metrics_mu; + std::map backend_metrics + ABSL_GUARDED_BY(backend_metrics_mu); + }; + RefCountedPtr xds_client_; const XdsBootstrap::XdsServer& lrs_server_; absl::string_view cluster_name_; absl::string_view eds_service_name_; RefCountedPtr name_; - - std::atomic total_successful_requests_{0}; - std::atomic total_requests_in_progress_{0}; - std::atomic total_error_requests_{0}; - std::atomic total_issued_requests_{0}; - - // Protects backend_metrics_. A mutex is necessary because the length of - // backend_metrics_ can be accessed by both the callback intercepting the - // call's recv_trailing_metadata (not from the control plane work serializer) - // and the load reporting thread (from the control plane work serializer). - Mutex backend_metrics_mu_; - std::map backend_metrics_ - ABSL_GUARDED_BY(backend_metrics_mu_); + PerCpu stats_{32}; }; } // namespace grpc_core diff --git a/src/core/lib/gprpp/per_cpu.h b/src/core/lib/gprpp/per_cpu.h index 116c9e215ac..2476b6f370f 100644 --- a/src/core/lib/gprpp/per_cpu.h +++ b/src/core/lib/gprpp/per_cpu.h @@ -17,7 +17,9 @@ #include +#include #include +#include #include #include @@ -29,7 +31,11 @@ namespace grpc_core { template class PerCpu { public: - T& this_cpu() { return data_[ExecCtx::Get()->starting_cpu()]; } + explicit PerCpu(size_t max = std::numeric_limits::max()) + : cpus_(std::min(max, gpr_cpu_num_cores())), + data_{new T[cpus_]} {} + + T& this_cpu() { return data_[ExecCtx::Get()->starting_cpu() % cpus_]; } T* begin() { return data_.get(); } T* end() { return data_.get() + cpus_; } @@ -37,8 +43,8 @@ class PerCpu { const T* end() const { return data_.get() + cpus_; } private: - const size_t cpus_ = gpr_cpu_num_cores(); - std::unique_ptr data_{new T[cpus_]}; + const size_t cpus_; + std::unique_ptr data_; }; } // namespace grpc_core diff --git a/src/cpp/server/backend_metric_recorder.cc b/src/cpp/server/backend_metric_recorder.cc index ef79a119fcf..47a930081e8 100644 --- a/src/cpp/server/backend_metric_recorder.cc +++ b/src/cpp/server/backend_metric_recorder.cc @@ -309,6 +309,18 @@ experimental::CallMetricRecorder& BackendMetricState::RecordRequestCostMetric( return *this; } +experimental::CallMetricRecorder& BackendMetricState::RecordNamedMetric( + string_ref name, double value) { + internal::MutexLock lock(&mu_); + absl::string_view name_sv(name.data(), name.length()); + named_metrics_[name_sv] = value; + if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { + gpr_log(GPR_INFO, "[%p] Named metric recorded: %s %f", this, + std::string(name_sv).c_str(), value); + } + return *this; +} + BackendMetricData BackendMetricState::GetBackendMetricData() { // Merge metrics from the ServerMetricRecorder first since metrics recorded // to CallMetricRecorder takes a higher precedence. @@ -341,13 +353,18 @@ BackendMetricData BackendMetricState::GetBackendMetricData() { for (const auto& r : request_cost_) { data.request_cost[r.first] = r.second; } + for (const auto& r : named_metrics_) { + data.named_metrics[r.first] = r.second; + } } if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) { gpr_log(GPR_INFO, "[%p] Backend metric data returned: cpu:%f mem:%f qps:%f eps:%f " - "utilization size:%" PRIuPTR " request_cost size:%" PRIuPTR, + "utilization size:%" PRIuPTR " request_cost size:%" PRIuPTR + "named_metrics size:%" PRIuPTR, this, data.cpu_utilization, data.mem_utilization, data.qps, - data.eps, data.utilization.size(), data.request_cost.size()); + data.eps, data.utilization.size(), data.request_cost.size(), + data.named_metrics.size()); } return data; } diff --git a/src/cpp/server/backend_metric_recorder.h b/src/cpp/server/backend_metric_recorder.h index 402e7b53a40..7ab448a9efc 100644 --- a/src/cpp/server/backend_metric_recorder.h +++ b/src/cpp/server/backend_metric_recorder.h @@ -64,6 +64,8 @@ class BackendMetricState : public grpc_core::BackendMetricProvider, string_ref name, double value) override; experimental::CallMetricRecorder& RecordRequestCostMetric( string_ref name, double value) override; + experimental::CallMetricRecorder& RecordNamedMetric(string_ref name, + double value) override; // This clears metrics currently recorded. Don't call twice. grpc_core::BackendMetricData GetBackendMetricData() override; @@ -76,6 +78,7 @@ class BackendMetricState : public grpc_core::BackendMetricProvider, internal::Mutex mu_; std::map utilization_ ABSL_GUARDED_BY(mu_); std::map request_cost_ ABSL_GUARDED_BY(mu_); + std::map named_metrics_ ABSL_GUARDED_BY(mu_); }; } // namespace grpc diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 2d8f0654066..d20942bfe78 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -155,6 +155,13 @@ class MyTestServiceImpl : public TestServiceImpl { key[p.first.size()] = '\0'; recorder->RecordUtilizationMetric(key, p.second); } + for (const auto& p : request_metrics.named_metrics()) { + char* key = static_cast( + grpc_call_arena_alloc(context->c_call(), p.first.size() + 1)); + strncpy(key, p.first.data(), p.first.size()); + key[p.first.size()] = '\0'; + recorder->RecordNamedMetric(key, p.second); + } } return TestServiceImpl::Echo(context, request, response); } @@ -2357,6 +2364,10 @@ xds::data::orca::v3::OrcaLoadReport BackendMetricDataToOrcaLoadReport( std::string name(p.first); (*load_report.mutable_utilization())[name] = p.second; } + for (const auto& p : backend_metric_data.named_metrics) { + std::string name(p.first); + (*load_report.mutable_named_metrics())[name] = p.second; + } return load_report; } @@ -2593,6 +2604,12 @@ void CheckLoadReportAsExpected( ASSERT_NE(it, expected.utilization().end()); EXPECT_EQ(it->second, p.second); } + EXPECT_EQ(actual.named_metrics().size(), expected.named_metrics().size()); + for (const auto& p : actual.named_metrics()) { + auto it = expected.named_metrics().find(p.first); + ASSERT_NE(it, expected.named_metrics().end()); + EXPECT_EQ(it->second, p.second); + } } TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) { @@ -2613,6 +2630,9 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) { // This will be rejected. utilization["out_of_range_invalid1"] = 1.1; utilization["out_of_range_invalid2"] = -1.1; + auto& named_metrics = *load_report.mutable_named_metrics(); + named_metrics["metric0"] = 3.0; + named_metrics["metric1"] = -1.0; auto expected = load_report; expected.mutable_utilization()->erase("out_of_range_invalid1"); expected.mutable_utilization()->erase("out_of_range_invalid2"); diff --git a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc index 461824a4488..68a1f3e873f 100644 --- a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc @@ -26,6 +26,7 @@ #include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/config/config_vars.h" +#include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h" #include "test/cpp/end2end/connection_attempt_injector.h" #include "test/cpp/end2end/xds/xds_end2end_test_lib.h" @@ -1375,6 +1376,18 @@ INSTANTIATE_TEST_SUITE_P( ::testing::Values(XdsTestType().set_enable_load_reporting()), &XdsTestType::Name); +MATCHER_P2(LoadMetricEq, num_requests_finished_with_metric, total_metric_value, + "equals LoadMetric") { + bool match = true; + match &= ::testing::ExplainMatchResult(num_requests_finished_with_metric, + arg.num_requests_finished_with_metric, + result_listener); + match &= + ::testing::ExplainMatchResult(::testing::DoubleEq(total_metric_value), + arg.total_metric_value, result_listener); + return match; +} + // Tests that the load report received at the balancer is correct. TEST_P(ClientLoadReportingTest, Vanilla) { CreateAndStartBackends(4); @@ -1389,11 +1402,19 @@ TEST_P(ClientLoadReportingTest, Vanilla) { size_t num_warmup_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr, WaitForBackendOptions().set_reset_counters(false)); - // Send kNumRpcsPerAddress RPCs per server. - CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size()); + // Send kNumRpcsPerAddress RPCs per server with named metrics. + xds::data::orca::v3::OrcaLoadReport backend_metrics; + auto& named_metrics = (*backend_metrics.mutable_named_metrics()); + named_metrics["foo"] = 1.0; + named_metrics["bar"] = 2.0; + CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(), + RpcOptions().set_backend_metrics(backend_metrics)); + named_metrics["foo"] = 0.3; + named_metrics["bar"] = 0.4; for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) { CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "", - RpcOptions().set_server_fail(true)); + RpcOptions().set_server_fail(true).set_backend_metrics( + backend_metrics)); } const size_t total_successful_rpcs_sent = (kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs; @@ -1425,6 +1446,8 @@ TEST_P(ClientLoadReportingTest, Vanilla) { ::testing::Pair("locality1", ::testing::_))); size_t num_successful_rpcs = 0; size_t num_failed_rpcs = 0; + std::map + named_metrics_total; for (const auto& p : client_stats.locality_stats()) { EXPECT_EQ(p.second.total_requests_in_progress, 0U); EXPECT_EQ( @@ -1432,10 +1455,30 @@ TEST_P(ClientLoadReportingTest, Vanilla) { p.second.total_successful_requests + p.second.total_error_requests); num_successful_rpcs += p.second.total_successful_requests; num_failed_rpcs += p.second.total_error_requests; + for (const auto& s : p.second.load_metrics) { + named_metrics_total[s.first] += s.second; + } } EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent); EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent); EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent); + EXPECT_THAT( + named_metrics_total, + ::testing::UnorderedElementsAre( + ::testing::Pair( + "foo", + LoadMetricEq( + (kNumRpcsPerAddress + kNumFailuresPerAddress) * + backends_.size(), + (kNumRpcsPerAddress * backends_.size()) * 1.0 + + (kNumFailuresPerAddress * backends_.size()) * 0.3)), + ::testing::Pair( + "bar", + LoadMetricEq( + (kNumRpcsPerAddress + kNumFailuresPerAddress) * + backends_.size(), + (kNumRpcsPerAddress * backends_.size()) * 2.0 + + (kNumFailuresPerAddress * backends_.size()) * 0.4)))); // The LRS service got a single request, and sent a single response. EXPECT_EQ(1U, balancer_->lrs_service()->request_count()); EXPECT_EQ(1U, balancer_->lrs_service()->response_count()); @@ -1452,10 +1495,18 @@ TEST_P(ClientLoadReportingTest, SendAllClusters) { // Wait until all backends are ready. size_t num_warmup_rpcs = WaitForAllBackends(DEBUG_LOCATION); // Send kNumRpcsPerAddress RPCs per server. - CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size()); + xds::data::orca::v3::OrcaLoadReport backend_metrics; + auto& named_metrics = (*backend_metrics.mutable_named_metrics()); + named_metrics["foo"] = 1.0; + named_metrics["bar"] = 2.0; + CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(), + RpcOptions().set_backend_metrics(backend_metrics)); + named_metrics["foo"] = 0.3; + named_metrics["bar"] = 0.4; for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) { CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "", - RpcOptions().set_server_fail(true)); + RpcOptions().set_server_fail(true).set_backend_metrics( + backend_metrics)); } // Check that each backend got the right number of requests. for (size_t i = 0; i < backends_.size(); ++i) { @@ -1476,6 +1527,29 @@ TEST_P(ClientLoadReportingTest, SendAllClusters) { EXPECT_EQ(kNumFailuresPerAddress * backends_.size(), client_stats.total_error_requests()); EXPECT_EQ(0U, client_stats.total_dropped_requests()); + EXPECT_THAT( + client_stats.locality_stats(), + ::testing::ElementsAre(::testing::Pair( + "locality0", + ::testing::Field( + &ClientStats::LocalityStats::load_metrics, + ::testing::UnorderedElementsAre( + ::testing::Pair( + "foo", + LoadMetricEq( + (kNumRpcsPerAddress + kNumFailuresPerAddress) * + backends_.size(), + (kNumRpcsPerAddress * backends_.size()) * 1.0 + + (kNumFailuresPerAddress * backends_.size()) * + 0.3)), + ::testing::Pair( + "bar", + LoadMetricEq( + (kNumRpcsPerAddress + kNumFailuresPerAddress) * + backends_.size(), + (kNumRpcsPerAddress * backends_.size()) * 2.0 + + (kNumFailuresPerAddress * backends_.size()) * + 0.4))))))); // The LRS service got a single request, and sent a single response. EXPECT_EQ(1U, balancer_->lrs_service()->request_count()); EXPECT_EQ(1U, balancer_->lrs_service()->response_count()); @@ -1515,6 +1589,11 @@ TEST_P(ClientLoadReportingTest, BalancerRestart) { EXPECT_EQ(0U, client_stats.total_requests_in_progress()); EXPECT_EQ(0U, client_stats.total_error_requests()); EXPECT_EQ(0U, client_stats.total_dropped_requests()); + EXPECT_THAT(client_stats.locality_stats(), + ::testing::ElementsAre(::testing::Pair( + "locality0", + ::testing::Field(&ClientStats::LocalityStats::load_metrics, + ::testing::IsEmpty())))); // Shut down the balancer. balancer_->Shutdown(); // We should continue using the last EDS response we received from the @@ -1537,7 +1616,12 @@ TEST_P(ClientLoadReportingTest, BalancerRestart) { // This tells us that we're now using the new serverlist. num_rpcs += WaitForAllBackends(DEBUG_LOCATION, 2, 4); // Send one RPC per backend. - CheckRpcSendOk(DEBUG_LOCATION, 2); + xds::data::orca::v3::OrcaLoadReport backend_metrics; + auto& named_metrics = (*backend_metrics.mutable_named_metrics()); + named_metrics["foo"] = 1.0; + named_metrics["bar"] = 2.0; + CheckRpcSendOk(DEBUG_LOCATION, 2, + RpcOptions().set_backend_metrics(backend_metrics)); num_rpcs += 2; // Check client stats. load_report = balancer_->lrs_service()->WaitForLoadReport(); @@ -1547,6 +1631,14 @@ TEST_P(ClientLoadReportingTest, BalancerRestart) { EXPECT_EQ(0U, client_stats.total_requests_in_progress()); EXPECT_EQ(0U, client_stats.total_error_requests()); EXPECT_EQ(0U, client_stats.total_dropped_requests()); + EXPECT_THAT(client_stats.locality_stats(), + ::testing::ElementsAre(::testing::Pair( + "locality0", + ::testing::Field( + &ClientStats::LocalityStats::load_metrics, + ::testing::UnorderedElementsAre( + ::testing::Pair("foo", LoadMetricEq(2, 2.0)), + ::testing::Pair("bar", LoadMetricEq(2, 4.0))))))); } // Tests load reporting when switching over from one cluster to another. @@ -1600,7 +1692,10 @@ TEST_P(ClientLoadReportingTest, ChangeClusters) { 0UL), ::testing::Field( &ClientStats::LocalityStats::total_issued_requests, - num_rpcs))))), + num_rpcs), + ::testing::Field( + &ClientStats::LocalityStats::load_metrics, + ::testing::IsEmpty()))))), ::testing::Property(&ClientStats::total_dropped_requests, 0UL)))); // Change RDS resource to point to new cluster. RouteConfiguration new_route_config = default_route_config_; @@ -1638,7 +1733,10 @@ TEST_P(ClientLoadReportingTest, ChangeClusters) { 0UL), ::testing::Field(&ClientStats::LocalityStats:: total_issued_requests, - ::testing::Le(num_rpcs)))))), + ::testing::Le(num_rpcs)), + ::testing::Field( + &ClientStats::LocalityStats::load_metrics, + ::testing::IsEmpty()))))), ::testing::Property(&ClientStats::total_dropped_requests, 0UL)), ::testing::AllOf( ::testing::Property(&ClientStats::cluster_name, kNewClusterName), @@ -1660,7 +1758,10 @@ TEST_P(ClientLoadReportingTest, ChangeClusters) { 0UL), ::testing::Field(&ClientStats::LocalityStats:: total_issued_requests, - ::testing::Le(num_rpcs)))))), + ::testing::Le(num_rpcs)), + ::testing::Field( + &ClientStats::LocalityStats::load_metrics, + ::testing::IsEmpty()))))), ::testing::Property(&ClientStats::total_dropped_requests, 0UL)))); size_t total_ok = 0; for (const ClientStats& client_stats : load_report) { diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.cc b/test/cpp/end2end/xds/xds_end2end_test_lib.cc index 36344e2befb..5d6b0f77930 100644 --- a/test/cpp/end2end/xds/xds_end2end_test_lib.cc +++ b/test/cpp/end2end/xds/xds_end2end_test_lib.cc @@ -435,6 +435,9 @@ void XdsEnd2endTest::RpcOptions::SetupRpc(ClientContext* context, if (skip_cancelled_check) { request->mutable_param()->set_skip_cancelled_check(true); } + if (backend_metrics.has_value()) { + *request->mutable_param()->mutable_backend_metrics() = *backend_metrics; + } } // diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.h b/test/cpp/end2end/xds/xds_end2end_test_lib.h index cbe31eaf3c3..156341c8676 100644 --- a/test/cpp/end2end/xds/xds_end2end_test_lib.h +++ b/test/cpp/end2end/xds/xds_end2end_test_lib.h @@ -44,6 +44,7 @@ #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/http_filter_rbac.grpc.pb.h" +#include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h" #include "test/core/util/port.h" #include "test/cpp/end2end/counted_service.h" #include "test/cpp/end2end/test_service_impl.h" @@ -304,6 +305,17 @@ class XdsEnd2endTest : public ::testing::TestWithParam { last_peer_identity_.emplace_back(entry.data(), entry.size()); } } + if (request->has_param() && request->param().has_backend_metrics()) { + const auto& request_metrics = request->param().backend_metrics(); + auto* recorder = context->ExperimentalGetCallMetricRecorder(); + for (const auto& p : request_metrics.named_metrics()) { + char* key = static_cast( + grpc_call_arena_alloc(context->c_call(), p.first.size() + 1)); + strncpy(key, p.first.data(), p.first.size()); + key[p.first.size()] = '\0'; + recorder->RecordNamedMetric(key, p.second); + } + } return status; } @@ -710,6 +722,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { int client_cancel_after_us = 0; bool skip_cancelled_check = false; StatusCode server_expected_error = StatusCode::OK; + absl::optional backend_metrics; RpcOptions() {} @@ -769,6 +782,12 @@ class XdsEnd2endTest : public ::testing::TestWithParam { return *this; } + RpcOptions& set_backend_metrics( + absl::optional metrics) { + backend_metrics = std::move(metrics); + return *this; + } + // Populates context and request. void SetupRpc(ClientContext* context, EchoRequest* request) const; }; diff --git a/test/cpp/end2end/xds/xds_server.h b/test/cpp/end2end/xds/xds_server.h index fa21ad08768..8b777fc2f0d 100644 --- a/test/cpp/end2end/xds/xds_server.h +++ b/test/cpp/end2end/xds/xds_server.h @@ -612,6 +612,17 @@ class LrsServiceImpl public: // Stats for a given locality. struct LocalityStats { + struct LoadMetric { + uint64_t num_requests_finished_with_metric; + double total_metric_value; + LoadMetric& operator+=(const LoadMetric& other) { + num_requests_finished_with_metric += + other.num_requests_finished_with_metric; + total_metric_value += other.total_metric_value; + return *this; + } + }; + LocalityStats() {} // Converts from proto message class. @@ -625,13 +636,21 @@ class LrsServiceImpl total_error_requests( upstream_locality_stats.total_error_requests()), total_issued_requests( - upstream_locality_stats.total_issued_requests()) {} + upstream_locality_stats.total_issued_requests()) { + for (const auto& s : upstream_locality_stats.load_metric_stats()) { + load_metrics[s.metric_name()] += LoadMetric{ + s.num_requests_finished_with_metric(), s.total_metric_value()}; + } + } LocalityStats& operator+=(const LocalityStats& other) { total_successful_requests += other.total_successful_requests; total_requests_in_progress += other.total_requests_in_progress; total_error_requests += other.total_error_requests; total_issued_requests += other.total_issued_requests; + for (const auto& p : other.load_metrics) { + load_metrics[p.first] += p.second; + } return *this; } @@ -639,6 +658,7 @@ class LrsServiceImpl uint64_t total_requests_in_progress = 0; uint64_t total_error_requests = 0; uint64_t total_issued_requests = 0; + std::map load_metrics; }; ClientStats() {}