From f45d7d87b49eabe670ab6d7183aef078dafaa983 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 19 Sep 2024 20:52:29 +0000 Subject: [PATCH] e2e tests --- src/core/xds/xds_client/lrs_client.cc | 33 +- .../end2end/xds/xds_cluster_end2end_test.cc | 318 ++++++++++++++++++ test/cpp/end2end/xds/xds_end2end_test_lib.h | 12 + 3 files changed, 354 insertions(+), 9 deletions(-) diff --git a/src/core/xds/xds_client/lrs_client.cc b/src/core/xds/xds_client/lrs_client.cc index c3d5b9a8bcb..9ba2cf70c59 100644 --- a/src/core/xds/xds_client/lrs_client.cc +++ b/src/core/xds/xds_client/lrs_client.cc @@ -1095,6 +1095,23 @@ std::string LrsClient::CreateLrsInitialRequest() { namespace { +void MaybeAddLoadMetric( + const LrsApiContext& context, + envoy_config_endpoint_v3_UpstreamLocalityStats* output, + absl::string_view metric_name, + const LrsClient::ClusterLocalityStats::BackendMetric& backend_metric) { + if (backend_metric.IsZero()) return; + envoy_config_endpoint_v3_EndpointLoadMetricStats* load_metric = + envoy_config_endpoint_v3_UpstreamLocalityStats_add_load_metric_stats( + output, context.arena); + envoy_config_endpoint_v3_EndpointLoadMetricStats_set_metric_name( + load_metric, StdStringToUpbString(metric_name)); + envoy_config_endpoint_v3_EndpointLoadMetricStats_set_num_requests_finished_with_metric( + load_metric, backend_metric.num_requests_finished_with_metric); + envoy_config_endpoint_v3_EndpointLoadMetricStats_set_total_metric_value( + load_metric, backend_metric.total_metric_value); +} + void LocalityStatsPopulate( const LrsApiContext& context, envoy_config_endpoint_v3_UpstreamLocalityStats* output, @@ -1126,19 +1143,17 @@ void LocalityStatsPopulate( envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_issued_requests( output, snapshot.total_issued_requests); // Add backend metrics. + MaybeAddLoadMetric(context, output, "cpu_utilization", + snapshot.cpu_utilization); + MaybeAddLoadMetric(context, output, "mem_utilization", + snapshot.mem_utilization); + MaybeAddLoadMetric(context, output, "application_utilization", + snapshot.application_utilization); for (const auto& p : snapshot.backend_metrics) { const std::string& metric_name = p.first; const LrsClient::ClusterLocalityStats::BackendMetric& metric_value = p.second; - envoy_config_endpoint_v3_EndpointLoadMetricStats* load_metric = - envoy_config_endpoint_v3_UpstreamLocalityStats_add_load_metric_stats( - output, context.arena); - envoy_config_endpoint_v3_EndpointLoadMetricStats_set_metric_name( - load_metric, StdStringToUpbString(metric_name)); - envoy_config_endpoint_v3_EndpointLoadMetricStats_set_num_requests_finished_with_metric( - load_metric, metric_value.num_requests_finished_with_metric); - envoy_config_endpoint_v3_EndpointLoadMetricStats_set_total_metric_value( - load_metric, metric_value.total_metric_value); + MaybeAddLoadMetric(context, output, metric_name, metric_value); } } diff --git a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc index 24398adf7f3..47b683c9059 100644 --- a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc @@ -1724,6 +1724,324 @@ TEST_P(ClientLoadReportingTest, Vanilla) { EXPECT_EQ(1U, balancer_->lrs_service()->response_count()); } +// Tests ORCA to LRS propagation. +TEST_P(ClientLoadReportingTest, OrcaPropagation) { + grpc_core::testing::ScopedExperimentalEnvVar env( + "GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION"); + CreateAndStartBackends(4); + const size_t kNumRpcsPerAddress = 10; + const size_t kNumFailuresPerAddress = 3; + Cluster cluster = default_cluster_; + cluster.add_lrs_report_endpoint_metrics("named_metrics.foo"); + cluster.add_lrs_report_endpoint_metrics("cpu_utilization"); + cluster.add_lrs_report_endpoint_metrics("mem_utilization"); + cluster.add_lrs_report_endpoint_metrics("application_utilization"); + cluster.add_lrs_report_endpoint_metrics("unknown_field"); + balancer_->ads_service()->SetCdsResource(cluster); + EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends(0, 2)}, + {"locality1", CreateEndpointsForBackends(2, 4)}, + }); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // Wait until all backends are ready. + size_t num_warmup_rpcs = + WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr, + WaitForBackendOptions().set_reset_counters(false)); + // Send kNumRpcsPerAddress RPCs per server with named metrics. + xds::data::orca::v3::OrcaLoadReport backend_metrics; + backend_metrics.set_cpu_utilization(0.8); + backend_metrics.set_mem_utilization(0.6); + backend_metrics.set_application_utilization(0.4); + auto& named_metrics = (*backend_metrics.mutable_named_metrics()); + named_metrics["foo"] = 1.0; + named_metrics["bar"] = 2.0; // Not propagated. + CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(), + RpcOptions().set_backend_metrics(backend_metrics)); + backend_metrics.set_cpu_utilization(0.4); + backend_metrics.set_mem_utilization(0.3); + backend_metrics.set_application_utilization(0.2); + named_metrics["foo"] = 0.3; + named_metrics["bar"] = 0.4; // Not propagated. + for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) { + CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "", + RpcOptions().set_server_fail(true).set_backend_metrics( + backend_metrics)); + } + const size_t total_successful_rpcs_sent = + (kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs; + const size_t total_failed_rpcs_sent = + kNumFailuresPerAddress * backends_.size(); + // Check that the backends got the right number of requests. + size_t total_rpcs_sent = 0; + for (const auto& backend : backends_) { + total_rpcs_sent += backend->backend_service()->request_count(); + } + EXPECT_EQ(total_rpcs_sent, + total_successful_rpcs_sent + total_failed_rpcs_sent); + // The load report received at the balancer should be correct. + std::vector load_report = + balancer_->lrs_service()->WaitForLoadReport(); + ASSERT_EQ(load_report.size(), 1UL); + ClientStats& client_stats = load_report.front(); + EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName); + EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName); + EXPECT_EQ(total_successful_rpcs_sent, + client_stats.total_successful_requests()); + EXPECT_EQ(0U, client_stats.total_requests_in_progress()); + EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests()); + EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests()); + EXPECT_EQ(0U, client_stats.total_dropped_requests()); + ASSERT_THAT( + client_stats.locality_stats(), + ::testing::ElementsAre(::testing::Pair("locality0", ::testing::_), + ::testing::Pair("locality1", ::testing::_))); + size_t num_successful_rpcs = 0; + size_t num_failed_rpcs = 0; + std::map + named_metrics_total; + for (const auto& p : client_stats.locality_stats()) { + EXPECT_EQ(p.second.total_requests_in_progress, 0U); + EXPECT_EQ( + p.second.total_issued_requests, + p.second.total_successful_requests + p.second.total_error_requests); + num_successful_rpcs += p.second.total_successful_requests; + num_failed_rpcs += p.second.total_error_requests; + for (const auto& s : p.second.load_metrics) { + named_metrics_total[s.first] += s.second; + } + } + EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent); + EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent); + EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent); + EXPECT_THAT( + named_metrics_total, + ::testing::UnorderedElementsAre( + ::testing::Pair( + "named_metrics.foo", + LoadMetricEq( + (kNumRpcsPerAddress + kNumFailuresPerAddress) * + backends_.size(), + (kNumRpcsPerAddress * backends_.size()) * 1.0 + + (kNumFailuresPerAddress * backends_.size()) * 0.3)), + ::testing::Pair( + "cpu_utilization", + LoadMetricEq( + (kNumRpcsPerAddress + kNumFailuresPerAddress) * + backends_.size(), + (kNumRpcsPerAddress * backends_.size()) * 0.8 + + (kNumFailuresPerAddress * backends_.size()) * 0.4)), + ::testing::Pair( + "mem_utilization", + LoadMetricEq( + (kNumRpcsPerAddress + kNumFailuresPerAddress) * + backends_.size(), + (kNumRpcsPerAddress * backends_.size()) * 0.6 + + (kNumFailuresPerAddress * backends_.size()) * 0.3)), + ::testing::Pair( + "application_utilization", + LoadMetricEq( + (kNumRpcsPerAddress + kNumFailuresPerAddress) * + backends_.size(), + (kNumRpcsPerAddress * backends_.size()) * 0.4 + + (kNumFailuresPerAddress * backends_.size()) * 0.2)))); + // The LRS service got a single request, and sent a single response. + EXPECT_EQ(1U, balancer_->lrs_service()->request_count()); + EXPECT_EQ(1U, balancer_->lrs_service()->response_count()); +} + +TEST_P(ClientLoadReportingTest, OrcaPropagationNamedMetricsAll) { + grpc_core::testing::ScopedExperimentalEnvVar env( + "GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION"); + CreateAndStartBackends(4); + const size_t kNumRpcsPerAddress = 10; + const size_t kNumFailuresPerAddress = 3; + Cluster cluster = default_cluster_; + cluster.add_lrs_report_endpoint_metrics("named_metrics.*"); + balancer_->ads_service()->SetCdsResource(cluster); + EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends(0, 2)}, + {"locality1", CreateEndpointsForBackends(2, 4)}, + }); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // Wait until all backends are ready. + size_t num_warmup_rpcs = + WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr, + WaitForBackendOptions().set_reset_counters(false)); + // Send kNumRpcsPerAddress RPCs per server with named metrics. + xds::data::orca::v3::OrcaLoadReport backend_metrics; + backend_metrics.set_cpu_utilization(0.8); + backend_metrics.set_mem_utilization(0.6); + backend_metrics.set_application_utilization(0.4); + auto& named_metrics = (*backend_metrics.mutable_named_metrics()); + named_metrics["foo"] = 1.0; + named_metrics["bar"] = 2.0; + CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(), + RpcOptions().set_backend_metrics(backend_metrics)); + backend_metrics.set_cpu_utilization(0.4); + backend_metrics.set_mem_utilization(0.3); + backend_metrics.set_application_utilization(0.2); + named_metrics["foo"] = 0.3; + named_metrics["bar"] = 0.4; + for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) { + CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "", + RpcOptions().set_server_fail(true).set_backend_metrics( + backend_metrics)); + } + const size_t total_successful_rpcs_sent = + (kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs; + const size_t total_failed_rpcs_sent = + kNumFailuresPerAddress * backends_.size(); + // Check that the backends got the right number of requests. + size_t total_rpcs_sent = 0; + for (const auto& backend : backends_) { + total_rpcs_sent += backend->backend_service()->request_count(); + } + EXPECT_EQ(total_rpcs_sent, + total_successful_rpcs_sent + total_failed_rpcs_sent); + // The load report received at the balancer should be correct. + std::vector load_report = + balancer_->lrs_service()->WaitForLoadReport(); + ASSERT_EQ(load_report.size(), 1UL); + ClientStats& client_stats = load_report.front(); + EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName); + EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName); + EXPECT_EQ(total_successful_rpcs_sent, + client_stats.total_successful_requests()); + EXPECT_EQ(0U, client_stats.total_requests_in_progress()); + EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests()); + EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests()); + EXPECT_EQ(0U, client_stats.total_dropped_requests()); + ASSERT_THAT( + client_stats.locality_stats(), + ::testing::ElementsAre(::testing::Pair("locality0", ::testing::_), + ::testing::Pair("locality1", ::testing::_))); + size_t num_successful_rpcs = 0; + size_t num_failed_rpcs = 0; + std::map + named_metrics_total; + for (const auto& p : client_stats.locality_stats()) { + EXPECT_EQ(p.second.total_requests_in_progress, 0U); + EXPECT_EQ( + p.second.total_issued_requests, + p.second.total_successful_requests + p.second.total_error_requests); + num_successful_rpcs += p.second.total_successful_requests; + num_failed_rpcs += p.second.total_error_requests; + for (const auto& s : p.second.load_metrics) { + named_metrics_total[s.first] += s.second; + } + } + EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent); + EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent); + EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent); + EXPECT_THAT( + named_metrics_total, + ::testing::UnorderedElementsAre( + ::testing::Pair( + "named_metrics.foo", + LoadMetricEq( + (kNumRpcsPerAddress + kNumFailuresPerAddress) * + backends_.size(), + (kNumRpcsPerAddress * backends_.size()) * 1.0 + + (kNumFailuresPerAddress * backends_.size()) * 0.3)), + ::testing::Pair( + "named_metrics.bar", + LoadMetricEq( + (kNumRpcsPerAddress + kNumFailuresPerAddress) * + backends_.size(), + (kNumRpcsPerAddress * backends_.size()) * 2.0 + + (kNumFailuresPerAddress * backends_.size()) * 0.4)))); + // The LRS service got a single request, and sent a single response. + EXPECT_EQ(1U, balancer_->lrs_service()->request_count()); + EXPECT_EQ(1U, balancer_->lrs_service()->response_count()); +} + +TEST_P(ClientLoadReportingTest, OrcaPropagationNotConfigured) { + grpc_core::testing::ScopedExperimentalEnvVar env( + "GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION"); + CreateAndStartBackends(4); + const size_t kNumRpcsPerAddress = 10; + const size_t kNumFailuresPerAddress = 3; + EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends(0, 2)}, + {"locality1", CreateEndpointsForBackends(2, 4)}, + }); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // Wait until all backends are ready. + size_t num_warmup_rpcs = + WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr, + WaitForBackendOptions().set_reset_counters(false)); + // Send kNumRpcsPerAddress RPCs per server with named metrics. + xds::data::orca::v3::OrcaLoadReport backend_metrics; + backend_metrics.set_cpu_utilization(0.8); + backend_metrics.set_mem_utilization(0.6); + backend_metrics.set_application_utilization(0.4); + auto& named_metrics = (*backend_metrics.mutable_named_metrics()); + named_metrics["foo"] = 1.0; + named_metrics["bar"] = 2.0; + CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(), + RpcOptions().set_backend_metrics(backend_metrics)); + backend_metrics.set_cpu_utilization(0.4); + backend_metrics.set_mem_utilization(0.3); + backend_metrics.set_application_utilization(0.2); + named_metrics["foo"] = 0.3; + named_metrics["bar"] = 0.4; + for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) { + CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "", + RpcOptions().set_server_fail(true).set_backend_metrics( + backend_metrics)); + } + const size_t total_successful_rpcs_sent = + (kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs; + const size_t total_failed_rpcs_sent = + kNumFailuresPerAddress * backends_.size(); + // Check that the backends got the right number of requests. + size_t total_rpcs_sent = 0; + for (const auto& backend : backends_) { + total_rpcs_sent += backend->backend_service()->request_count(); + } + EXPECT_EQ(total_rpcs_sent, + total_successful_rpcs_sent + total_failed_rpcs_sent); + // The load report received at the balancer should be correct. + std::vector load_report = + balancer_->lrs_service()->WaitForLoadReport(); + ASSERT_EQ(load_report.size(), 1UL); + ClientStats& client_stats = load_report.front(); + EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName); + EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName); + EXPECT_EQ(total_successful_rpcs_sent, + client_stats.total_successful_requests()); + EXPECT_EQ(0U, client_stats.total_requests_in_progress()); + EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests()); + EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests()); + EXPECT_EQ(0U, client_stats.total_dropped_requests()); + ASSERT_THAT( + client_stats.locality_stats(), + ::testing::ElementsAre(::testing::Pair("locality0", ::testing::_), + ::testing::Pair("locality1", ::testing::_))); + size_t num_successful_rpcs = 0; + size_t num_failed_rpcs = 0; + std::map + named_metrics_total; + for (const auto& p : client_stats.locality_stats()) { + EXPECT_EQ(p.second.total_requests_in_progress, 0U); + EXPECT_EQ( + p.second.total_issued_requests, + p.second.total_successful_requests + p.second.total_error_requests); + num_successful_rpcs += p.second.total_successful_requests; + num_failed_rpcs += p.second.total_error_requests; + for (const auto& s : p.second.load_metrics) { + named_metrics_total[s.first] += s.second; + } + } + EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent); + EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent); + EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent); + EXPECT_THAT(named_metrics_total, ::testing::UnorderedElementsAre()); + // The LRS service got a single request, and sent a single response. + EXPECT_EQ(1U, balancer_->lrs_service()->request_count()); + EXPECT_EQ(1U, balancer_->lrs_service()->response_count()); +} + // Tests send_all_clusters. TEST_P(ClientLoadReportingTest, SendAllClusters) { CreateAndStartBackends(2); diff --git a/test/cpp/end2end/xds/xds_end2end_test_lib.h b/test/cpp/end2end/xds/xds_end2end_test_lib.h index 5183e9cd562..02e3fcbe50d 100644 --- a/test/cpp/end2end/xds/xds_end2end_test_lib.h +++ b/test/cpp/end2end/xds/xds_end2end_test_lib.h @@ -304,6 +304,18 @@ class XdsEnd2endTest : public ::testing::TestWithParam, if (request->has_param() && request->param().has_backend_metrics()) { const auto& request_metrics = request->param().backend_metrics(); auto* recorder = context->ExperimentalGetCallMetricRecorder(); + if (request_metrics.cpu_utilization() != 0) { + recorder->RecordCpuUtilizationMetric( + request_metrics.cpu_utilization()); + } + if (request_metrics.mem_utilization() != 0) { + recorder->RecordMemoryUtilizationMetric( + request_metrics.mem_utilization()); + } + if (request_metrics.application_utilization() != 0) { + recorder->RecordApplicationUtilizationMetric( + request_metrics.application_utilization()); + } for (const auto& p : request_metrics.named_metrics()) { char* key = static_cast( grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));