pull/37467/head
Mark D. Roth 2 months ago
parent b7c1bfd8aa
commit f45d7d87b4
  1. 33
      src/core/xds/xds_client/lrs_client.cc
  2. 318
      test/cpp/end2end/xds/xds_cluster_end2end_test.cc
  3. 12
      test/cpp/end2end/xds/xds_end2end_test_lib.h

@ -1095,6 +1095,23 @@ std::string LrsClient::CreateLrsInitialRequest() {
namespace {
void MaybeAddLoadMetric(
const LrsApiContext& context,
envoy_config_endpoint_v3_UpstreamLocalityStats* output,
absl::string_view metric_name,
const LrsClient::ClusterLocalityStats::BackendMetric& backend_metric) {
if (backend_metric.IsZero()) return;
envoy_config_endpoint_v3_EndpointLoadMetricStats* load_metric =
envoy_config_endpoint_v3_UpstreamLocalityStats_add_load_metric_stats(
output, context.arena);
envoy_config_endpoint_v3_EndpointLoadMetricStats_set_metric_name(
load_metric, StdStringToUpbString(metric_name));
envoy_config_endpoint_v3_EndpointLoadMetricStats_set_num_requests_finished_with_metric(
load_metric, backend_metric.num_requests_finished_with_metric);
envoy_config_endpoint_v3_EndpointLoadMetricStats_set_total_metric_value(
load_metric, backend_metric.total_metric_value);
}
void LocalityStatsPopulate(
const LrsApiContext& context,
envoy_config_endpoint_v3_UpstreamLocalityStats* output,
@ -1126,19 +1143,17 @@ void LocalityStatsPopulate(
envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_issued_requests(
output, snapshot.total_issued_requests);
// Add backend metrics.
MaybeAddLoadMetric(context, output, "cpu_utilization",
snapshot.cpu_utilization);
MaybeAddLoadMetric(context, output, "mem_utilization",
snapshot.mem_utilization);
MaybeAddLoadMetric(context, output, "application_utilization",
snapshot.application_utilization);
for (const auto& p : snapshot.backend_metrics) {
const std::string& metric_name = p.first;
const LrsClient::ClusterLocalityStats::BackendMetric& metric_value =
p.second;
envoy_config_endpoint_v3_EndpointLoadMetricStats* load_metric =
envoy_config_endpoint_v3_UpstreamLocalityStats_add_load_metric_stats(
output, context.arena);
envoy_config_endpoint_v3_EndpointLoadMetricStats_set_metric_name(
load_metric, StdStringToUpbString(metric_name));
envoy_config_endpoint_v3_EndpointLoadMetricStats_set_num_requests_finished_with_metric(
load_metric, metric_value.num_requests_finished_with_metric);
envoy_config_endpoint_v3_EndpointLoadMetricStats_set_total_metric_value(
load_metric, metric_value.total_metric_value);
MaybeAddLoadMetric(context, output, metric_name, metric_value);
}
}

@ -1724,6 +1724,324 @@ TEST_P(ClientLoadReportingTest, Vanilla) {
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
}
// Tests ORCA to LRS propagation.
TEST_P(ClientLoadReportingTest, OrcaPropagation) {
grpc_core::testing::ScopedExperimentalEnvVar env(
"GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
CreateAndStartBackends(4);
const size_t kNumRpcsPerAddress = 10;
const size_t kNumFailuresPerAddress = 3;
Cluster cluster = default_cluster_;
cluster.add_lrs_report_endpoint_metrics("named_metrics.foo");
cluster.add_lrs_report_endpoint_metrics("cpu_utilization");
cluster.add_lrs_report_endpoint_metrics("mem_utilization");
cluster.add_lrs_report_endpoint_metrics("application_utilization");
cluster.add_lrs_report_endpoint_metrics("unknown_field");
balancer_->ads_service()->SetCdsResource(cluster);
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 2)},
{"locality1", CreateEndpointsForBackends(2, 4)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until all backends are ready.
size_t num_warmup_rpcs =
WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false));
// Send kNumRpcsPerAddress RPCs per server with named metrics.
xds::data::orca::v3::OrcaLoadReport backend_metrics;
backend_metrics.set_cpu_utilization(0.8);
backend_metrics.set_mem_utilization(0.6);
backend_metrics.set_application_utilization(0.4);
auto& named_metrics = (*backend_metrics.mutable_named_metrics());
named_metrics["foo"] = 1.0;
named_metrics["bar"] = 2.0; // Not propagated.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
RpcOptions().set_backend_metrics(backend_metrics));
backend_metrics.set_cpu_utilization(0.4);
backend_metrics.set_mem_utilization(0.3);
backend_metrics.set_application_utilization(0.2);
named_metrics["foo"] = 0.3;
named_metrics["bar"] = 0.4; // Not propagated.
for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
RpcOptions().set_server_fail(true).set_backend_metrics(
backend_metrics));
}
const size_t total_successful_rpcs_sent =
(kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
const size_t total_failed_rpcs_sent =
kNumFailuresPerAddress * backends_.size();
// Check that the backends got the right number of requests.
size_t total_rpcs_sent = 0;
for (const auto& backend : backends_) {
total_rpcs_sent += backend->backend_service()->request_count();
}
EXPECT_EQ(total_rpcs_sent,
total_successful_rpcs_sent + total_failed_rpcs_sent);
// The load report received at the balancer should be correct.
std::vector<ClientStats> load_report =
balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL);
ClientStats& client_stats = load_report.front();
EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
EXPECT_EQ(total_successful_rpcs_sent,
client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
ASSERT_THAT(
client_stats.locality_stats(),
::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
::testing::Pair("locality1", ::testing::_)));
size_t num_successful_rpcs = 0;
size_t num_failed_rpcs = 0;
std::map<std::string, ClientStats::LocalityStats::LoadMetric>
named_metrics_total;
for (const auto& p : client_stats.locality_stats()) {
EXPECT_EQ(p.second.total_requests_in_progress, 0U);
EXPECT_EQ(
p.second.total_issued_requests,
p.second.total_successful_requests + p.second.total_error_requests);
num_successful_rpcs += p.second.total_successful_requests;
num_failed_rpcs += p.second.total_error_requests;
for (const auto& s : p.second.load_metrics) {
named_metrics_total[s.first] += s.second;
}
}
EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
EXPECT_THAT(
named_metrics_total,
::testing::UnorderedElementsAre(
::testing::Pair(
"named_metrics.foo",
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) *
backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 1.0 +
(kNumFailuresPerAddress * backends_.size()) * 0.3)),
::testing::Pair(
"cpu_utilization",
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) *
backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 0.8 +
(kNumFailuresPerAddress * backends_.size()) * 0.4)),
::testing::Pair(
"mem_utilization",
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) *
backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 0.6 +
(kNumFailuresPerAddress * backends_.size()) * 0.3)),
::testing::Pair(
"application_utilization",
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) *
backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 0.4 +
(kNumFailuresPerAddress * backends_.size()) * 0.2))));
// The LRS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
}
TEST_P(ClientLoadReportingTest, OrcaPropagationNamedMetricsAll) {
grpc_core::testing::ScopedExperimentalEnvVar env(
"GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
CreateAndStartBackends(4);
const size_t kNumRpcsPerAddress = 10;
const size_t kNumFailuresPerAddress = 3;
Cluster cluster = default_cluster_;
cluster.add_lrs_report_endpoint_metrics("named_metrics.*");
balancer_->ads_service()->SetCdsResource(cluster);
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 2)},
{"locality1", CreateEndpointsForBackends(2, 4)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until all backends are ready.
size_t num_warmup_rpcs =
WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false));
// Send kNumRpcsPerAddress RPCs per server with named metrics.
xds::data::orca::v3::OrcaLoadReport backend_metrics;
backend_metrics.set_cpu_utilization(0.8);
backend_metrics.set_mem_utilization(0.6);
backend_metrics.set_application_utilization(0.4);
auto& named_metrics = (*backend_metrics.mutable_named_metrics());
named_metrics["foo"] = 1.0;
named_metrics["bar"] = 2.0;
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
RpcOptions().set_backend_metrics(backend_metrics));
backend_metrics.set_cpu_utilization(0.4);
backend_metrics.set_mem_utilization(0.3);
backend_metrics.set_application_utilization(0.2);
named_metrics["foo"] = 0.3;
named_metrics["bar"] = 0.4;
for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
RpcOptions().set_server_fail(true).set_backend_metrics(
backend_metrics));
}
const size_t total_successful_rpcs_sent =
(kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
const size_t total_failed_rpcs_sent =
kNumFailuresPerAddress * backends_.size();
// Check that the backends got the right number of requests.
size_t total_rpcs_sent = 0;
for (const auto& backend : backends_) {
total_rpcs_sent += backend->backend_service()->request_count();
}
EXPECT_EQ(total_rpcs_sent,
total_successful_rpcs_sent + total_failed_rpcs_sent);
// The load report received at the balancer should be correct.
std::vector<ClientStats> load_report =
balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL);
ClientStats& client_stats = load_report.front();
EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
EXPECT_EQ(total_successful_rpcs_sent,
client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
ASSERT_THAT(
client_stats.locality_stats(),
::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
::testing::Pair("locality1", ::testing::_)));
size_t num_successful_rpcs = 0;
size_t num_failed_rpcs = 0;
std::map<std::string, ClientStats::LocalityStats::LoadMetric>
named_metrics_total;
for (const auto& p : client_stats.locality_stats()) {
EXPECT_EQ(p.second.total_requests_in_progress, 0U);
EXPECT_EQ(
p.second.total_issued_requests,
p.second.total_successful_requests + p.second.total_error_requests);
num_successful_rpcs += p.second.total_successful_requests;
num_failed_rpcs += p.second.total_error_requests;
for (const auto& s : p.second.load_metrics) {
named_metrics_total[s.first] += s.second;
}
}
EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
EXPECT_THAT(
named_metrics_total,
::testing::UnorderedElementsAre(
::testing::Pair(
"named_metrics.foo",
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) *
backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 1.0 +
(kNumFailuresPerAddress * backends_.size()) * 0.3)),
::testing::Pair(
"named_metrics.bar",
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) *
backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 2.0 +
(kNumFailuresPerAddress * backends_.size()) * 0.4))));
// The LRS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
}
TEST_P(ClientLoadReportingTest, OrcaPropagationNotConfigured) {
grpc_core::testing::ScopedExperimentalEnvVar env(
"GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
CreateAndStartBackends(4);
const size_t kNumRpcsPerAddress = 10;
const size_t kNumFailuresPerAddress = 3;
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 2)},
{"locality1", CreateEndpointsForBackends(2, 4)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until all backends are ready.
size_t num_warmup_rpcs =
WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false));
// Send kNumRpcsPerAddress RPCs per server with named metrics.
xds::data::orca::v3::OrcaLoadReport backend_metrics;
backend_metrics.set_cpu_utilization(0.8);
backend_metrics.set_mem_utilization(0.6);
backend_metrics.set_application_utilization(0.4);
auto& named_metrics = (*backend_metrics.mutable_named_metrics());
named_metrics["foo"] = 1.0;
named_metrics["bar"] = 2.0;
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
RpcOptions().set_backend_metrics(backend_metrics));
backend_metrics.set_cpu_utilization(0.4);
backend_metrics.set_mem_utilization(0.3);
backend_metrics.set_application_utilization(0.2);
named_metrics["foo"] = 0.3;
named_metrics["bar"] = 0.4;
for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
RpcOptions().set_server_fail(true).set_backend_metrics(
backend_metrics));
}
const size_t total_successful_rpcs_sent =
(kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
const size_t total_failed_rpcs_sent =
kNumFailuresPerAddress * backends_.size();
// Check that the backends got the right number of requests.
size_t total_rpcs_sent = 0;
for (const auto& backend : backends_) {
total_rpcs_sent += backend->backend_service()->request_count();
}
EXPECT_EQ(total_rpcs_sent,
total_successful_rpcs_sent + total_failed_rpcs_sent);
// The load report received at the balancer should be correct.
std::vector<ClientStats> load_report =
balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL);
ClientStats& client_stats = load_report.front();
EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
EXPECT_EQ(total_successful_rpcs_sent,
client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
ASSERT_THAT(
client_stats.locality_stats(),
::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
::testing::Pair("locality1", ::testing::_)));
size_t num_successful_rpcs = 0;
size_t num_failed_rpcs = 0;
std::map<std::string, ClientStats::LocalityStats::LoadMetric>
named_metrics_total;
for (const auto& p : client_stats.locality_stats()) {
EXPECT_EQ(p.second.total_requests_in_progress, 0U);
EXPECT_EQ(
p.second.total_issued_requests,
p.second.total_successful_requests + p.second.total_error_requests);
num_successful_rpcs += p.second.total_successful_requests;
num_failed_rpcs += p.second.total_error_requests;
for (const auto& s : p.second.load_metrics) {
named_metrics_total[s.first] += s.second;
}
}
EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
EXPECT_THAT(named_metrics_total, ::testing::UnorderedElementsAre());
// The LRS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
}
// Tests send_all_clusters.
TEST_P(ClientLoadReportingTest, SendAllClusters) {
CreateAndStartBackends(2);

@ -304,6 +304,18 @@ class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType>,
if (request->has_param() && request->param().has_backend_metrics()) {
const auto& request_metrics = request->param().backend_metrics();
auto* recorder = context->ExperimentalGetCallMetricRecorder();
if (request_metrics.cpu_utilization() != 0) {
recorder->RecordCpuUtilizationMetric(
request_metrics.cpu_utilization());
}
if (request_metrics.mem_utilization() != 0) {
recorder->RecordMemoryUtilizationMetric(
request_metrics.mem_utilization());
}
if (request_metrics.application_utilization() != 0) {
recorder->RecordApplicationUtilizationMetric(
request_metrics.application_utilization());
}
for (const auto& p : request_metrics.named_metrics()) {
char* key = static_cast<char*>(
grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));

Loading…
Cancel
Save