[xDS] ORCA to LRS propagation changes (#37467)

Implements gRFC A85 (https://github.com/grpc/proposal/pull/454).

Closes #37467

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37467 from markdroth:orca_lrs_propagation_changes 0c1e889bb7
PiperOrigin-RevId: 679646192
pull/37810/head
Mark D. Roth 2 months ago committed by Copybara-Service
parent f2e3636303
commit 6ba3b4a717
  1. 2
      BUILD
  2. 1
      CMakeLists.txt
  3. 1
      Makefile
  4. 2
      Package.swift
  5. 2
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 3
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 2
      package.xml
  12. 24
      src/core/BUILD
  13. 13
      src/core/load_balancing/xds/xds_cluster_impl.cc
  14. 5
      src/core/xds/grpc/xds_cluster.cc
  15. 15
      src/core/xds/grpc/xds_cluster.h
  16. 27
      src/core/xds/grpc/xds_cluster_parser.cc
  17. 145
      src/core/xds/xds_client/lrs_client.cc
  18. 63
      src/core/xds/xds_client/lrs_client.h
  19. 64
      src/core/xds/xds_client/xds_backend_metric_propagation.cc
  20. 60
      src/core/xds/xds_client/xds_backend_metric_propagation.h
  21. 15
      src/proto/grpc/testing/xds/v3/cluster.proto
  22. 25
      src/proto/grpc/testing/xds/v3/load_report.proto
  23. 1
      src/python/grpcio/grpc_core_dependencies.py
  24. 88
      test/core/xds/xds_cluster_resource_type_test.cc
  25. 319
      test/cpp/end2end/xds/xds_cluster_end2end_test.cc
  26. 12
      test/cpp/end2end/xds/xds_end2end_test_lib.h
  27. 30
      test/cpp/end2end/xds/xds_server.h
  28. 2
      tools/doxygen/Doxyfile.c++.internal
  29. 2
      tools/doxygen/Doxyfile.core.internal

@ -4462,6 +4462,7 @@ grpc_cc_library(
"//src/core:default_event_engine",
"//src/core:dual_ref_counted",
"//src/core:env",
"//src/core:grpc_backend_metric_data",
"//src/core:json",
"//src/core:per_cpu",
"//src/core:ref_counted",
@ -4469,6 +4470,7 @@ grpc_cc_library(
"//src/core:time",
"//src/core:upb_utils",
"//src/core:useful",
"//src/core:xds_backend_metric_propagation",
],
)

1
CMakeLists.txt generated

@ -2665,6 +2665,7 @@ add_library(grpc
src/core/xds/grpc/xds_transport_grpc.cc
src/core/xds/xds_client/lrs_client.cc
src/core/xds/xds_client/xds_api.cc
src/core/xds/xds_client/xds_backend_metric_propagation.cc
src/core/xds/xds_client/xds_bootstrap.cc
src/core/xds/xds_client/xds_client.cc
)

1
Makefile generated

@ -1517,6 +1517,7 @@ LIBGRPC_SRC = \
src/core/xds/grpc/xds_transport_grpc.cc \
src/core/xds/xds_client/lrs_client.cc \
src/core/xds/xds_client/xds_api.cc \
src/core/xds/xds_client/xds_backend_metric_propagation.cc \
src/core/xds/xds_client/xds_bootstrap.cc \
src/core/xds/xds_client/xds_client.cc \
third_party/abseil-cpp/absl/base/internal/cycleclock.cc \

2
Package.swift generated

@ -2036,6 +2036,8 @@ let package = Package(
"src/core/xds/xds_client/lrs_client.h",
"src/core/xds/xds_client/xds_api.cc",
"src/core/xds/xds_client/xds_api.h",
"src/core/xds/xds_client/xds_backend_metric_propagation.cc",
"src/core/xds/xds_client/xds_backend_metric_propagation.h",
"src/core/xds/xds_client/xds_bootstrap.cc",
"src/core/xds/xds_client/xds_bootstrap.h",
"src/core/xds/xds_client/xds_channel_args.h",

@ -1259,6 +1259,7 @@ libs:
- src/core/xds/grpc/xds_transport_grpc.h
- src/core/xds/xds_client/lrs_client.h
- src/core/xds/xds_client/xds_api.h
- src/core/xds/xds_client/xds_backend_metric_propagation.h
- src/core/xds/xds_client/xds_bootstrap.h
- src/core/xds/xds_client/xds_channel_args.h
- src/core/xds/xds_client/xds_client.h
@ -2080,6 +2081,7 @@ libs:
- src/core/xds/grpc/xds_transport_grpc.cc
- src/core/xds/xds_client/lrs_client.cc
- src/core/xds/xds_client/xds_api.cc
- src/core/xds/xds_client/xds_backend_metric_propagation.cc
- src/core/xds/xds_client/xds_bootstrap.cc
- src/core/xds/xds_client/xds_client.cc
deps:

1
config.m4 generated

@ -892,6 +892,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/xds/grpc/xds_transport_grpc.cc \
src/core/xds/xds_client/lrs_client.cc \
src/core/xds/xds_client/xds_api.cc \
src/core/xds/xds_client/xds_backend_metric_propagation.cc \
src/core/xds/xds_client/xds_bootstrap.cc \
src/core/xds/xds_client/xds_client.cc \
src/php/ext/grpc/byte_buffer.c \

1
config.w32 generated

@ -857,6 +857,7 @@ if (PHP_GRPC != "no") {
"src\\core\\xds\\grpc\\xds_transport_grpc.cc " +
"src\\core\\xds\\xds_client\\lrs_client.cc " +
"src\\core\\xds\\xds_client\\xds_api.cc " +
"src\\core\\xds\\xds_client\\xds_backend_metric_propagation.cc " +
"src\\core\\xds\\xds_client\\xds_bootstrap.cc " +
"src\\core\\xds\\xds_client\\xds_client.cc " +
"src\\php\\ext\\grpc\\byte_buffer.c " +

2
gRPC-C++.podspec generated

@ -1373,6 +1373,7 @@ Pod::Spec.new do |s|
'src/core/xds/grpc/xds_transport_grpc.h',
'src/core/xds/xds_client/lrs_client.h',
'src/core/xds/xds_client/xds_api.h',
'src/core/xds/xds_client/xds_backend_metric_propagation.h',
'src/core/xds/xds_client/xds_bootstrap.h',
'src/core/xds/xds_client/xds_channel_args.h',
'src/core/xds/xds_client/xds_client.h',
@ -2675,6 +2676,7 @@ Pod::Spec.new do |s|
'src/core/xds/grpc/xds_transport_grpc.h',
'src/core/xds/xds_client/lrs_client.h',
'src/core/xds/xds_client/xds_api.h',
'src/core/xds/xds_client/xds_backend_metric_propagation.h',
'src/core/xds/xds_client/xds_bootstrap.h',
'src/core/xds/xds_client/xds_channel_args.h',
'src/core/xds/xds_client/xds_client.h',

3
gRPC-Core.podspec generated

@ -2152,6 +2152,8 @@ Pod::Spec.new do |s|
'src/core/xds/xds_client/lrs_client.h',
'src/core/xds/xds_client/xds_api.cc',
'src/core/xds/xds_client/xds_api.h',
'src/core/xds/xds_client/xds_backend_metric_propagation.cc',
'src/core/xds/xds_client/xds_backend_metric_propagation.h',
'src/core/xds/xds_client/xds_bootstrap.cc',
'src/core/xds/xds_client/xds_bootstrap.h',
'src/core/xds/xds_client/xds_channel_args.h',
@ -3460,6 +3462,7 @@ Pod::Spec.new do |s|
'src/core/xds/grpc/xds_transport_grpc.h',
'src/core/xds/xds_client/lrs_client.h',
'src/core/xds/xds_client/xds_api.h',
'src/core/xds/xds_client/xds_backend_metric_propagation.h',
'src/core/xds/xds_client/xds_bootstrap.h',
'src/core/xds/xds_client/xds_channel_args.h',
'src/core/xds/xds_client/xds_client.h',

2
grpc.gemspec generated

@ -2038,6 +2038,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/xds/xds_client/lrs_client.h )
s.files += %w( src/core/xds/xds_client/xds_api.cc )
s.files += %w( src/core/xds/xds_client/xds_api.h )
s.files += %w( src/core/xds/xds_client/xds_backend_metric_propagation.cc )
s.files += %w( src/core/xds/xds_client/xds_backend_metric_propagation.h )
s.files += %w( src/core/xds/xds_client/xds_bootstrap.cc )
s.files += %w( src/core/xds/xds_client/xds_bootstrap.h )
s.files += %w( src/core/xds/xds_client/xds_channel_args.h )

2
package.xml generated

@ -2020,6 +2020,8 @@
<file baseinstalldir="/" name="src/core/xds/xds_client/lrs_client.h" role="src" />
<file baseinstalldir="/" name="src/core/xds/xds_client/xds_api.cc" role="src" />
<file baseinstalldir="/" name="src/core/xds/xds_client/xds_api.h" role="src" />
<file baseinstalldir="/" name="src/core/xds/xds_client/xds_backend_metric_propagation.cc" role="src" />
<file baseinstalldir="/" name="src/core/xds/xds_client/xds_backend_metric_propagation.h" role="src" />
<file baseinstalldir="/" name="src/core/xds/xds_client/xds_bootstrap.cc" role="src" />
<file baseinstalldir="/" name="src/core/xds/xds_client/xds_bootstrap.h" role="src" />
<file baseinstalldir="/" name="src/core/xds/xds_client/xds_channel_args.h" role="src" />

@ -5619,6 +5619,7 @@ grpc_cc_library(
"json_writer",
"match",
"time",
"xds_backend_metric_propagation",
"xds_common_types",
"xds_health_status",
"xds_metadata",
@ -5651,6 +5652,28 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "xds_backend_metric_propagation",
srcs = [
"xds/xds_client/xds_backend_metric_propagation.cc",
],
hdrs = [
"xds/xds_client/xds_backend_metric_propagation.h",
],
external_deps = [
"absl/container:flat_hash_set",
"absl/strings",
],
language = "c++",
tags = ["nofixdeps"],
visibility = ["@grpc:xds_client_core"],
deps = [
"ref_counted",
"useful",
"//:ref_counted_ptr",
],
)
# TODO(roth): Split this up into individual targets.
grpc_cc_library(
name = "grpc_xds_client",
@ -5827,6 +5850,7 @@ grpc_cc_library(
"upb_utils",
"useful",
"validation_errors",
"xds_backend_metric_propagation",
"xds_certificate_provider",
"xds_certificate_provider_store",
"xds_cluster",

@ -362,13 +362,9 @@ class XdsClusterImplLb::Picker::SubchannelCallTracker final
}
// Record call completion for load reporting.
if (locality_stats_ != nullptr) {
auto* backend_metric_data =
args.backend_metric_accessor->GetBackendMetricData();
const std::map<absl::string_view, double>* named_metrics = nullptr;
if (backend_metric_data != nullptr) {
named_metrics = &backend_metric_data->named_metrics;
}
locality_stats_->AddCallFinished(named_metrics, !args.status.ok());
locality_stats_->AddCallFinished(
args.backend_metric_accessor->GetBackendMetricData(),
!args.status.ok());
}
// Decrement number of calls in flight.
call_counter_->Decrement();
@ -826,7 +822,8 @@ RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
parent()->xds_client_->lrs_client().AddClusterLocalityStats(
parent()->cluster_resource_->lrs_load_reporting_server,
parent()->config_->cluster_name(),
GetEdsResourceName(*parent()->cluster_resource_), locality_name);
GetEdsResourceName(*parent()->cluster_resource_), locality_name,
parent()->cluster_resource_->lrs_backend_metric_propagation);
if (locality_stats == nullptr) {
LOG(ERROR)
<< "[xds_cluster_impl_lb " << parent()

@ -53,6 +53,11 @@ std::string XdsClusterResource::ToString() const {
contents.push_back(absl::StrCat("lrs_load_reporting_server_name=",
lrs_load_reporting_server->server_uri()));
}
if (lrs_backend_metric_propagation != nullptr) {
contents.push_back(
absl::StrCat("lrs_backend_metric_propagation=",
lrs_backend_metric_propagation->AsString()));
}
if (!common_tls_context.Empty()) {
contents.push_back(
absl::StrCat("common_tls_context=", common_tls_context.ToString()));

@ -30,6 +30,7 @@
#include "src/core/xds/grpc/xds_health_status.h"
#include "src/core/xds/grpc/xds_metadata.h"
#include "src/core/xds/grpc/xds_server_grpc.h"
#include "src/core/xds/xds_client/xds_backend_metric_propagation.h"
#include "src/core/xds/xds_client/xds_resource_type.h"
#include "src/core/xds/xds_client/xds_resource_type_impl.h"
@ -44,6 +45,15 @@ inline bool LrsServersEqual(
return *lrs_server1 == *lrs_server2;
}
inline bool LrsBackendMetricPropagationEqual(
const RefCountedPtr<const BackendMetricPropagation>& p1,
const RefCountedPtr<const BackendMetricPropagation>& p2) {
if (p1 == nullptr) return p2 == nullptr;
if (p2 == nullptr) return false;
// Neither one is null, so compare them.
return *p1 == *p2;
}
struct XdsClusterResource : public XdsResourceType::ResourceData {
struct Eds {
// If empty, defaults to the cluster name.
@ -82,6 +92,8 @@ struct XdsClusterResource : public XdsResourceType::ResourceData {
// The LRS server to use for load reporting.
// If null, load reporting will be disabled.
std::shared_ptr<const GrpcXdsServer> lrs_load_reporting_server;
// The set of metrics to propagate from ORCA to LRS.
RefCountedPtr<const BackendMetricPropagation> lrs_backend_metric_propagation;
// Tls Context used by clients
CommonTlsContext common_tls_context;
@ -103,6 +115,9 @@ struct XdsClusterResource : public XdsResourceType::ResourceData {
return type == other.type && lb_policy_config == other.lb_policy_config &&
LrsServersEqual(lrs_load_reporting_server,
other.lrs_load_reporting_server) &&
LrsBackendMetricPropagationEqual(
lrs_backend_metric_propagation,
other.lrs_backend_metric_propagation) &&
common_tls_context == other.common_tls_context &&
connection_idle_timeout == other.connection_idle_timeout &&
max_concurrent_requests == other.max_concurrent_requests &&

@ -59,6 +59,8 @@
#include "src/core/xds/grpc/xds_common_types_parser.h"
#include "src/core/xds/grpc/xds_lb_policy_registry.h"
#include "src/core/xds/grpc/xds_metadata_parser.h"
#include "src/core/xds/xds_client/lrs_client.h"
#include "src/core/xds/xds_client/xds_backend_metric_propagation.h"
namespace grpc_core {
@ -457,6 +459,31 @@ absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse(
cds_update->lrs_load_reporting_server = std::make_shared<GrpcXdsServer>(
static_cast<const GrpcXdsServer&>(context.server));
}
// Record LRS metric propagation.
auto propagation = MakeRefCounted<BackendMetricPropagation>();
if (XdsOrcaLrsPropagationChangesEnabled()) {
size_t size;
upb_StringView const* metrics =
envoy_config_cluster_v3_Cluster_lrs_report_endpoint_metrics(cluster,
&size);
for (size_t i = 0; i < size; ++i) {
absl::string_view metric_name = UpbStringToAbsl(metrics[i]);
if (metric_name == "cpu_utilization") {
propagation->propagation_bits |= propagation->kCpuUtilization;
} else if (metric_name == "mem_utilization") {
propagation->propagation_bits |= propagation->kMemUtilization;
} else if (metric_name == "application_utilization") {
propagation->propagation_bits |= propagation->kApplicationUtilization;
} else if (absl::ConsumePrefix(&metric_name, "named_metrics.")) {
if (metric_name == "*") {
propagation->propagation_bits |= propagation->kNamedMetricsAll;
} else {
propagation->named_metric_keys.emplace(metric_name);
}
}
}
}
cds_update->lrs_backend_metric_propagation = std::move(propagation);
// Protocol options.
auto* upstream_config =
envoy_config_cluster_v3_Cluster_upstream_config(cluster);

@ -42,8 +42,10 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/util/backoff.h"
#include "src/core/util/debug_location.h"
#include "src/core/util/env.h"
#include "src/core/util/orphanable.h"
#include "src/core/util/ref_counted_ptr.h"
#include "src/core/util/string.h"
#include "src/core/util/sync.h"
#include "src/core/util/upb_utils.h"
#include "src/core/util/uri.h"
@ -61,6 +63,15 @@ namespace grpc_core {
using ::grpc_event_engine::experimental::EventEngine;
// TODO(roth): Remove this once the feature passes interop tests.
bool XdsOrcaLrsPropagationChangesEnabled() {
auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
if (!value.has_value()) return false;
bool parsed_value;
bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
return parse_succeeded && parsed_value;
}
namespace {
uint64_t GetAndResetCounter(std::atomic<uint64_t>* from) {
@ -124,7 +135,8 @@ void LrsClient::ClusterDropStats::AddCallDropped(const std::string& category) {
LrsClient::ClusterLocalityStats::ClusterLocalityStats(
RefCountedPtr<LrsClient> lrs_client, absl::string_view lrs_server,
absl::string_view cluster_name, absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> name)
RefCountedPtr<XdsLocalityName> name,
RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation)
: RefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
? "ClusterLocalityStats"
: nullptr),
@ -132,13 +144,14 @@ LrsClient::ClusterLocalityStats::ClusterLocalityStats(
lrs_server_(lrs_server),
cluster_name_(cluster_name),
eds_service_name_(eds_service_name),
name_(std::move(name)) {
name_(std::move(name)),
backend_metric_propagation_(std::move(backend_metric_propagation)) {
GRPC_TRACE_LOG(xds_client, INFO)
<< "[lrs_client " << lrs_client_.get() << "] created locality stats "
<< this << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
<< eds_service_name_ << ", "
<< (name_ == nullptr ? "<none>" : name_->human_readable_string().c_str())
<< "}";
<< ", propagation=" << backend_metric_propagation_->AsString() << "}";
}
LrsClient::ClusterLocalityStats::~ClusterLocalityStats() {
@ -147,9 +160,10 @@ LrsClient::ClusterLocalityStats::~ClusterLocalityStats() {
<< this << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
<< eds_service_name_ << ", "
<< (name_ == nullptr ? "<none>" : name_->human_readable_string().c_str())
<< "}";
<< ", propagation=" << backend_metric_propagation_->AsString() << "}";
lrs_client_->RemoveClusterLocalityStats(lrs_server_, cluster_name_,
eds_service_name_, name_, this);
eds_service_name_, name_,
backend_metric_propagation_, this);
lrs_client_.reset(DEBUG_LOCATION, "ClusterLocalityStats");
}
@ -164,9 +178,16 @@ LrsClient::ClusterLocalityStats::GetSnapshotAndReset() {
percpu_stats.total_requests_in_progress.load(std::memory_order_relaxed),
GetAndResetCounter(&percpu_stats.total_error_requests),
GetAndResetCounter(&percpu_stats.total_issued_requests),
{},
{},
{},
{}};
{
MutexLock lock(&percpu_stats.backend_metrics_mu);
percpu_snapshot.cpu_utilization = std::move(percpu_stats.cpu_utilization);
percpu_snapshot.mem_utilization = std::move(percpu_stats.mem_utilization);
percpu_snapshot.application_utilization =
std::move(percpu_stats.application_utilization);
percpu_snapshot.backend_metrics = std::move(percpu_stats.backend_metrics);
}
snapshot += percpu_snapshot;
@ -181,16 +202,44 @@ void LrsClient::ClusterLocalityStats::AddCallStarted() {
}
void LrsClient::ClusterLocalityStats::AddCallFinished(
const std::map<absl::string_view, double>* named_metrics, bool fail) {
const BackendMetricData* backend_metrics, bool fail) {
Stats& stats = stats_.this_cpu();
std::atomic<uint64_t>& to_increment =
fail ? stats.total_error_requests : stats.total_successful_requests;
to_increment.fetch_add(1, std::memory_order_relaxed);
stats.total_requests_in_progress.fetch_add(-1, std::memory_order_acq_rel);
if (named_metrics == nullptr) return;
if (backend_metrics == nullptr) return;
MutexLock lock(&stats.backend_metrics_mu);
for (const auto& m : *named_metrics) {
stats.backend_metrics[std::string(m.first)] += BackendMetric{1, m.second};
if (!XdsOrcaLrsPropagationChangesEnabled()) {
for (const auto& m : backend_metrics->named_metrics) {
stats.backend_metrics[std::string(m.first)] += BackendMetric(1, m.second);
}
return;
}
if (backend_metric_propagation_->propagation_bits &
BackendMetricPropagation::kCpuUtilization) {
stats.cpu_utilization += BackendMetric(1, backend_metrics->cpu_utilization);
}
if (backend_metric_propagation_->propagation_bits &
BackendMetricPropagation::kMemUtilization) {
stats.mem_utilization += BackendMetric(1, backend_metrics->mem_utilization);
}
if (backend_metric_propagation_->propagation_bits &
BackendMetricPropagation::kApplicationUtilization) {
stats.application_utilization +=
BackendMetric(1, backend_metrics->application_utilization);
}
if (backend_metric_propagation_->propagation_bits &
BackendMetricPropagation::kNamedMetricsAll ||
!backend_metric_propagation_->named_metric_keys.empty()) {
for (const auto& m : backend_metrics->named_metrics) {
if (backend_metric_propagation_->propagation_bits &
BackendMetricPropagation::kNamedMetricsAll ||
backend_metric_propagation_->named_metric_keys.contains(m.first)) {
stats.backend_metrics[absl::StrCat("named_metrics.", m.first)] +=
BackendMetric(1, m.second);
}
}
}
}
@ -825,7 +874,8 @@ RefCountedPtr<LrsClient::ClusterLocalityStats>
LrsClient::AddClusterLocalityStats(
std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server,
absl::string_view cluster_name, absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> locality) {
RefCountedPtr<XdsLocalityName> locality,
RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation) {
auto key =
std::make_pair(std::string(cluster_name), std::string(eds_service_name));
RefCountedPtr<ClusterLocalityStats> cluster_locality_stats;
@ -847,20 +897,22 @@ LrsClient::AddClusterLocalityStats(
LoadReportState& load_report_state = load_report_it->second;
LoadReportState::LocalityState& locality_state =
load_report_state.locality_stats[locality];
if (locality_state.locality_stats != nullptr) {
cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
ClusterLocalityStats*& locality_stats =
locality_state.propagation_stats[backend_metric_propagation];
if (locality_stats != nullptr) {
cluster_locality_stats = locality_stats->RefIfNonZero();
}
if (cluster_locality_stats == nullptr) {
if (locality_state.locality_stats != nullptr) {
if (locality_stats != nullptr) {
locality_state.deleted_locality_stats +=
locality_state.locality_stats->GetSnapshotAndReset();
locality_stats->GetSnapshotAndReset();
}
cluster_locality_stats = MakeRefCounted<ClusterLocalityStats>(
Ref(DEBUG_LOCATION, "LocalityStats"), server_it->first /*lrs_server*/,
load_report_it->first.first /*cluster_name*/,
load_report_it->first.second /*eds_service_name*/,
std::move(locality));
locality_state.locality_stats = cluster_locality_stats.get();
std::move(locality), std::move(backend_metric_propagation));
locality_stats = cluster_locality_stats.get();
}
server_it->second.lrs_channel->MaybeStartLrsCall();
}
@ -871,6 +923,8 @@ void LrsClient::RemoveClusterLocalityStats(
absl::string_view lrs_server_key, absl::string_view cluster_name,
absl::string_view eds_service_name,
const RefCountedPtr<XdsLocalityName>& locality,
const RefCountedPtr<const BackendMetricPropagation>&
backend_metric_propagation,
ClusterLocalityStats* cluster_locality_stats) {
MutexLock lock(&mu_);
auto server_it = load_report_map_.find(lrs_server_key);
@ -882,12 +936,16 @@ void LrsClient::RemoveClusterLocalityStats(
auto locality_it = load_report_state.locality_stats.find(locality);
if (locality_it == load_report_state.locality_stats.end()) return;
LoadReportState::LocalityState& locality_state = locality_it->second;
if (locality_state.locality_stats == cluster_locality_stats) {
auto propagation_it =
locality_state.propagation_stats.find(backend_metric_propagation);
if (propagation_it == locality_state.propagation_stats.end()) return;
ClusterLocalityStats* locality_stats = propagation_it->second;
if (locality_stats == cluster_locality_stats) {
// Record final snapshot in deleted_locality_stats, which will be
// added to the next load report.
locality_state.deleted_locality_stats +=
locality_state.locality_stats->GetSnapshotAndReset();
locality_state.locality_stats = nullptr;
locality_stats->GetSnapshotAndReset();
locality_state.propagation_stats.erase(propagation_it);
}
}
@ -940,19 +998,22 @@ LrsClient::ClusterLoadReportMap LrsClient::BuildLoadReportSnapshotLocked(
ClusterLocalityStats::Snapshot& locality_snapshot =
snapshot.locality_stats[locality_name];
locality_snapshot = std::move(locality_state.deleted_locality_stats);
if (locality_state.locality_stats != nullptr) {
locality_snapshot +=
locality_state.locality_stats->GetSnapshotAndReset();
GRPC_TRACE_LOG(xds_client, INFO)
<< "[lrs_client " << this
<< "] cluster=" << cluster_key.first.c_str()
<< " eds_service_name=" << cluster_key.second.c_str()
<< " locality=" << locality_name->human_readable_string().c_str()
<< " locality_stats=" << locality_state.locality_stats;
for (const auto& p : locality_state.propagation_stats) {
ClusterLocalityStats* locality_stats = p.second;
if (locality_stats != nullptr) {
locality_snapshot += locality_stats->GetSnapshotAndReset();
GRPC_TRACE_LOG(xds_client, INFO)
<< "[lrs_client " << this
<< "] cluster=" << cluster_key.first.c_str()
<< " eds_service_name=" << cluster_key.second.c_str()
<< " locality=" << locality_name->human_readable_string().c_str()
<< " propagation=" << p.first->AsString()
<< " locality_stats=" << locality_stats;
}
}
// If the only thing left in this entry was final snapshots from
// deleted locality stats objects, remove the entry.
if (locality_state.locality_stats == nullptr) {
if (locality_state.propagation_stats.empty()) {
it = load_report.locality_stats.erase(it);
} else {
++it;
@ -1034,6 +1095,20 @@ std::string LrsClient::CreateLrsInitialRequest() {
namespace {
void MaybeAddUnnamedMetric(
const LrsApiContext& context,
const LrsClient::ClusterLocalityStats::BackendMetric& backend_metric,
envoy_config_endpoint_v3_UnnamedEndpointLoadMetricStats* (*add_field)(
envoy_config_endpoint_v3_UpstreamLocalityStats*, upb_Arena*),
envoy_config_endpoint_v3_UpstreamLocalityStats* output) {
if (backend_metric.IsZero()) return;
auto* metric_proto = add_field(output, context.arena);
envoy_config_endpoint_v3_UnnamedEndpointLoadMetricStats_set_num_requests_finished_with_metric(
metric_proto, backend_metric.num_requests_finished_with_metric);
envoy_config_endpoint_v3_UnnamedEndpointLoadMetricStats_set_total_metric_value(
metric_proto, backend_metric.total_metric_value);
}
void LocalityStatsPopulate(
const LrsApiContext& context,
envoy_config_endpoint_v3_UpstreamLocalityStats* output,
@ -1065,6 +1140,18 @@ void LocalityStatsPopulate(
envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_issued_requests(
output, snapshot.total_issued_requests);
// Add backend metrics.
MaybeAddUnnamedMetric(
context, snapshot.cpu_utilization,
envoy_config_endpoint_v3_UpstreamLocalityStats_mutable_cpu_utilization,
output);
MaybeAddUnnamedMetric(
context, snapshot.mem_utilization,
envoy_config_endpoint_v3_UpstreamLocalityStats_mutable_mem_utilization,
output);
MaybeAddUnnamedMetric(
context, snapshot.application_utilization,
envoy_config_endpoint_v3_UpstreamLocalityStats_mutable_application_utilization,
output);
for (const auto& p : snapshot.backend_metrics) {
const std::string& metric_name = p.first;
const LrsClient::ClusterLocalityStats::BackendMetric& metric_value =

@ -33,6 +33,7 @@
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/load_balancing/backend_metric_data.h"
#include "src/core/util/dual_ref_counted.h"
#include "src/core/util/orphanable.h"
#include "src/core/util/per_cpu.h"
@ -43,6 +44,7 @@
#include "src/core/util/uri.h"
#include "src/core/util/work_serializer.h"
#include "src/core/xds/xds_client/xds_api.h"
#include "src/core/xds/xds_client/xds_backend_metric_propagation.h"
#include "src/core/xds/xds_client/xds_bootstrap.h"
#include "src/core/xds/xds_client/xds_locality.h"
#include "src/core/xds/xds_client/xds_metrics.h"
@ -51,6 +53,8 @@
namespace grpc_core {
bool XdsOrcaLrsPropagationChangesEnabled();
class LrsClient : public DualRefCounted<LrsClient> {
public:
// Drop stats for an xds cluster.
@ -114,6 +118,24 @@ class LrsClient : public DualRefCounted<LrsClient> {
uint64_t num_requests_finished_with_metric = 0;
double total_metric_value = 0;
BackendMetric() = default;
BackendMetric(uint64_t num_requests_finished, double value)
: num_requests_finished_with_metric(num_requests_finished),
total_metric_value(value) {}
BackendMetric(BackendMetric&& other) noexcept
: num_requests_finished_with_metric(
std::exchange(other.num_requests_finished_with_metric, 0)),
total_metric_value(std::exchange(other.total_metric_value, 0)) {}
BackendMetric& operator=(BackendMetric&& other) noexcept {
num_requests_finished_with_metric =
std::exchange(other.num_requests_finished_with_metric, 0);
total_metric_value = std::exchange(other.total_metric_value, 0);
return *this;
}
BackendMetric& operator+=(const BackendMetric& other) {
num_requests_finished_with_metric +=
other.num_requests_finished_with_metric;
@ -132,6 +154,9 @@ class LrsClient : public DualRefCounted<LrsClient> {
uint64_t total_requests_in_progress = 0;
uint64_t total_error_requests = 0;
uint64_t total_issued_requests = 0;
BackendMetric cpu_utilization;
BackendMetric mem_utilization;
BackendMetric application_utilization;
std::map<std::string, BackendMetric> backend_metrics;
Snapshot& operator+=(const Snapshot& other) {
@ -139,6 +164,9 @@ class LrsClient : public DualRefCounted<LrsClient> {
total_requests_in_progress += other.total_requests_in_progress;
total_error_requests += other.total_error_requests;
total_issued_requests += other.total_issued_requests;
cpu_utilization += other.cpu_utilization;
mem_utilization += other.mem_utilization;
application_utilization += other.application_utilization;
for (const auto& p : other.backend_metrics) {
backend_metrics[p.first] += p.second;
}
@ -147,7 +175,9 @@ class LrsClient : public DualRefCounted<LrsClient> {
bool IsZero() const {
if (total_successful_requests != 0 || total_requests_in_progress != 0 ||
total_error_requests != 0 || total_issued_requests != 0) {
total_error_requests != 0 || total_issued_requests != 0 ||
!cpu_utilization.IsZero() || !mem_utilization.IsZero() ||
!application_utilization.IsZero()) {
return false;
}
for (const auto& p : backend_metrics) {
@ -161,16 +191,17 @@ class LrsClient : public DualRefCounted<LrsClient> {
absl::string_view lrs_server,
absl::string_view cluster_name,
absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> name);
RefCountedPtr<XdsLocalityName> name,
RefCountedPtr<const BackendMetricPropagation>
backend_metric_propagation);
~ClusterLocalityStats() override;
// Returns a snapshot of this instance and resets all the counters.
Snapshot GetSnapshotAndReset();
void AddCallStarted();
void AddCallFinished(
const std::map<absl::string_view, double>* named_metrics,
bool fail = false);
void AddCallFinished(const BackendMetricData* backend_metrics,
bool fail = false);
XdsLocalityName* locality_name() const { return name_.get(); }
@ -181,10 +212,10 @@ class LrsClient : public DualRefCounted<LrsClient> {
std::atomic<uint64_t> total_error_requests{0};
std::atomic<uint64_t> total_issued_requests{0};
// Protects backend_metrics. A mutex is necessary because the length of
// backend_metrics_ can be accessed by both the callback intercepting the
// call's recv_trailing_metadata and the load reporting thread.
Mutex backend_metrics_mu;
BackendMetric cpu_utilization ABSL_GUARDED_BY(backend_metrics_mu);
BackendMetric mem_utilization ABSL_GUARDED_BY(backend_metrics_mu);
BackendMetric application_utilization ABSL_GUARDED_BY(backend_metrics_mu);
std::map<std::string, BackendMetric> backend_metrics
ABSL_GUARDED_BY(backend_metrics_mu);
};
@ -194,6 +225,7 @@ class LrsClient : public DualRefCounted<LrsClient> {
absl::string_view cluster_name_;
absl::string_view eds_service_name_;
RefCountedPtr<XdsLocalityName> name_;
RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation_;
PerCpu<Stats> stats_{PerCpuOptions().SetMaxShards(32).SetCpusPerShard(4)};
};
@ -204,17 +236,18 @@ class LrsClient : public DualRefCounted<LrsClient> {
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine);
~LrsClient() override;
// Adds and removes drop stats for cluster_name and eds_service_name.
// Adds drop stats for cluster_name and eds_service_name.
RefCountedPtr<ClusterDropStats> AddClusterDropStats(
std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server,
absl::string_view cluster_name, absl::string_view eds_service_name);
// Adds and removes locality stats for cluster_name and eds_service_name
// for the specified locality.
// Adds locality stats for cluster_name and eds_service_name for the
// specified locality with the specified backend metric propagation.
RefCountedPtr<ClusterLocalityStats> AddClusterLocalityStats(
std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server,
absl::string_view cluster_name, absl::string_view eds_service_name,
RefCountedPtr<XdsLocalityName> locality);
RefCountedPtr<XdsLocalityName> locality,
RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation);
// Resets connection backoff state.
void ResetBackoff();
@ -267,7 +300,9 @@ class LrsClient : public DualRefCounted<LrsClient> {
struct LoadReportState {
struct LocalityState {
ClusterLocalityStats* locality_stats = nullptr;
std::map<RefCountedPtr<const BackendMetricPropagation>,
ClusterLocalityStats*, BackendMetricPropagation::Less>
propagation_stats;
ClusterLocalityStats::Snapshot deleted_locality_stats;
};
@ -321,6 +356,8 @@ class LrsClient : public DualRefCounted<LrsClient> {
absl::string_view lrs_server, absl::string_view cluster_name,
absl::string_view eds_service_name,
const RefCountedPtr<XdsLocalityName>& locality,
const RefCountedPtr<const BackendMetricPropagation>&
backend_metric_propagation,
ClusterLocalityStats* cluster_locality_stats);
// Creates an initial LRS request.

@ -0,0 +1,64 @@
//
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "src/core/xds/xds_client/xds_backend_metric_propagation.h"
#include <vector>
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include "src/core/util/useful.h"
namespace grpc_core {
std::string BackendMetricPropagation::AsString() const {
std::vector<std::string> parts;
if (propagation_bits & kCpuUtilization) parts.push_back("cpu_utilization");
if (propagation_bits & kMemUtilization) parts.push_back("mem_utilization");
if (propagation_bits & kApplicationUtilization) {
parts.push_back("application_utilization");
}
if (propagation_bits & kNamedMetricsAll) {
parts.push_back("named_metrics.*");
} else {
// Output keys in sorted order for consistency.
std::vector<absl::string_view> keys(named_metric_keys.begin(),
named_metric_keys.end());
std::sort(keys.begin(), keys.end());
for (const auto& key : keys) {
parts.push_back(absl::StrCat("named_metrics.", key));
}
}
return absl::StrCat("{", absl::StrJoin(parts, ","), "}");
}
bool BackendMetricPropagation::operator<(
const BackendMetricPropagation& other) const {
int c = QsortCompare(propagation_bits, other.propagation_bits);
if (c != 0) return c == -1;
auto other_it = other.named_metric_keys.begin();
for (auto it = named_metric_keys.begin(); it != named_metric_keys.end();
++it) {
if (other_it == other.named_metric_keys.end()) return false;
c = QsortCompare(*it, *other_it);
if (c != 0) return c == -1;
++other_it;
}
return false;
}
} // namespace grpc_core

@ -0,0 +1,60 @@
//
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_BACKEND_METRIC_PROPAGATION_H
#define GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_BACKEND_METRIC_PROPAGATION_H
#include <string>
#include "absl/container/flat_hash_set.h"
#include "src/core/util/ref_counted.h"
#include "src/core/util/ref_counted_ptr.h"
namespace grpc_core {
struct BackendMetricPropagation : public RefCounted<BackendMetricPropagation> {
static constexpr uint8_t kCpuUtilization = 1;
static constexpr uint8_t kMemUtilization = 2;
static constexpr uint8_t kApplicationUtilization = 4;
static constexpr uint8_t kNamedMetricsAll = 8;
uint8_t propagation_bits = 0;
absl::flat_hash_set<std::string> named_metric_keys;
std::string AsString() const;
bool operator==(const BackendMetricPropagation& other) const {
return propagation_bits == other.propagation_bits &&
named_metric_keys == other.named_metric_keys;
}
bool operator<(const BackendMetricPropagation& other) const;
// Sorting functor for RefCountedPtr<const BackendMetricPropagation>.
struct Less {
bool operator()(
const RefCountedPtr<const BackendMetricPropagation>& p1,
const RefCountedPtr<const BackendMetricPropagation>& p2) const {
if (p1 == nullptr || p2 == nullptr) return p1.get() < p2.get();
return *p1 < *p2;
}
};
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_BACKEND_METRIC_PROPAGATION_H

@ -252,6 +252,21 @@ message Cluster {
// from the LRS stream here.]
core.v3.ConfigSource lrs_server = 42;
// A list of metric names from ORCA load reports to propagate to LRS.
//
// For map fields in the ORCA proto, the string will be of the form ``<map_field_name>.<map_key>``.
// For example, the string ``named_metrics.foo`` will mean to look for the key ``foo`` in the ORCA
// ``named_metrics`` field.
//
// The special map key ``*`` means to report all entries in the map (e.g., ``named_metrics.*`` means to
// report all entries in the ORCA named_metrics field). Note that this should be used only with trusted
// backends.
//
// The metric names in LRS will follow the same semantics as this field. In other words, if this field
// contains ``named_metrics.foo``, then the LRS load report will include the data with that same string
// as the key.
repeated string lrs_report_endpoint_metrics = 57;
// The Metadata field can be used to provide additional information about the
// cluster. It can be used for stats, logging, and varying filter behavior.
// Fields should use reverse DNS notation to denote which entity within Envoy

@ -51,7 +51,20 @@ message UpstreamLocalityStats {
// upstream endpoints in the locality.
uint64 total_issued_requests = 8;
// Stats for multi-dimensional load balancing.
// CPU utilization stats for multi-dimensional load balancing.
// This typically comes from endpoint metrics reported via ORCA.
UnnamedEndpointLoadMetricStats cpu_utilization = 12;
// Memory utilization for multi-dimensional load balancing.
// This typically comes from endpoint metrics reported via ORCA.
UnnamedEndpointLoadMetricStats mem_utilization = 13;
// Blended application-defined utilization for multi-dimensional load balancing.
// This typically comes from endpoint metrics reported via ORCA.
UnnamedEndpointLoadMetricStats application_utilization = 14;
// Named stats for multi-dimensional load balancing.
// These typically come from endpoint metrics reported via ORCA.
repeated EndpointLoadMetricStats load_metric_stats = 5;
// Endpoint granularity stats information for this locality. This information
@ -117,6 +130,16 @@ message EndpointLoadMetricStats {
double total_metric_value = 3;
}
// Same as EndpointLoadMetricStats, except without the metric_name field.
message UnnamedEndpointLoadMetricStats {
// Number of calls that finished and included this metric.
uint64 num_requests_finished_with_metric = 1;
// Sum of metric values across all calls that finished with this metric for
// load_reporting_interval.
double total_metric_value = 2;
}
// Per cluster load stats. Envoy reports these stats a management server in a
// :ref:`LoadStatsRequest<envoy_api_msg_service.load_stats.v3.LoadStatsRequest>`
// [#not-implemented-hide:] Not configuration. TBD how to doc proto APIs.

@ -866,6 +866,7 @@ CORE_SOURCE_FILES = [
'src/core/xds/grpc/xds_transport_grpc.cc',
'src/core/xds/xds_client/lrs_client.cc',
'src/core/xds/xds_client/xds_api.cc',
'src/core/xds/xds_client/xds_backend_metric_propagation.cc',
'src/core/xds/xds_client/xds_bootstrap.cc',
'src/core/xds/xds_client/xds_client.cc',
'third_party/abseil-cpp/absl/base/internal/cycleclock.cc',

@ -1113,7 +1113,7 @@ TEST_F(TlsConfigTest, CaCertProviderUnset) {
}
//
// LRS server tests
// LRS tests
//
using LrsTest = XdsClusterTest;
@ -1160,6 +1160,92 @@ TEST_F(LrsTest, NotSelfConfigSource) {
<< decode_result.resource.status();
}
TEST_F(LrsTest, IgnoresPropagationWithoutEnvVar) {
Cluster cluster;
cluster.set_name("foo");
cluster.set_type(cluster.EDS);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
cluster.mutable_lrs_server()->mutable_self();
cluster.add_lrs_report_endpoint_metrics("named_metrics.foo");
cluster.add_lrs_report_endpoint_metrics("cpu_utilization");
std::string serialized_resource;
ASSERT_TRUE(cluster.SerializeToString(&serialized_resource));
auto* resource_type = XdsClusterResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
ASSERT_NE(resource.lrs_load_reporting_server, nullptr);
EXPECT_EQ(*resource.lrs_load_reporting_server,
*xds_client_->bootstrap().servers().front());
ASSERT_NE(resource.lrs_backend_metric_propagation, nullptr);
EXPECT_EQ(resource.lrs_backend_metric_propagation->AsString(), "{}");
}
TEST_F(LrsTest, Propagation) {
ScopedExperimentalEnvVar env_var(
"GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
Cluster cluster;
cluster.set_name("foo");
cluster.set_type(cluster.EDS);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
cluster.mutable_lrs_server()->mutable_self();
cluster.add_lrs_report_endpoint_metrics("named_metrics.foo");
cluster.add_lrs_report_endpoint_metrics("named_metrics.bar");
cluster.add_lrs_report_endpoint_metrics("cpu_utilization");
cluster.add_lrs_report_endpoint_metrics("mem_utilization");
cluster.add_lrs_report_endpoint_metrics("application_utilization");
cluster.add_lrs_report_endpoint_metrics("unknown_field");
std::string serialized_resource;
ASSERT_TRUE(cluster.SerializeToString(&serialized_resource));
auto* resource_type = XdsClusterResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
ASSERT_NE(resource.lrs_load_reporting_server, nullptr);
EXPECT_EQ(*resource.lrs_load_reporting_server,
*xds_client_->bootstrap().servers().front());
ASSERT_NE(resource.lrs_backend_metric_propagation, nullptr);
EXPECT_EQ(resource.lrs_backend_metric_propagation->AsString(),
"{cpu_utilization,mem_utilization,application_utilization,"
"named_metrics.bar,named_metrics.foo}");
}
TEST_F(LrsTest, PropagationNamedMetricsAll) {
ScopedExperimentalEnvVar env_var(
"GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
Cluster cluster;
cluster.set_name("foo");
cluster.set_type(cluster.EDS);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
cluster.mutable_lrs_server()->mutable_self();
cluster.add_lrs_report_endpoint_metrics("named_metrics.*");
cluster.add_lrs_report_endpoint_metrics("cpu_utilization");
std::string serialized_resource;
ASSERT_TRUE(cluster.SerializeToString(&serialized_resource));
auto* resource_type = XdsClusterResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
ASSERT_TRUE(decode_result.name.has_value());
EXPECT_EQ(*decode_result.name, "foo");
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
ASSERT_NE(resource.lrs_load_reporting_server, nullptr);
EXPECT_EQ(*resource.lrs_load_reporting_server,
*xds_client_->bootstrap().servers().front());
ASSERT_NE(resource.lrs_backend_metric_propagation, nullptr);
EXPECT_EQ(resource.lrs_backend_metric_propagation->AsString(),
"{cpu_utilization,named_metrics.*}");
}
//
// upstream config tests
//

@ -1724,6 +1724,325 @@ TEST_P(ClientLoadReportingTest, Vanilla) {
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
}
// Tests ORCA to LRS propagation.
TEST_P(ClientLoadReportingTest, OrcaPropagation) {
grpc_core::testing::ScopedExperimentalEnvVar env(
"GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
CreateAndStartBackends(4);
const size_t kNumRpcsPerAddress = 10;
const size_t kNumFailuresPerAddress = 3;
Cluster cluster = default_cluster_;
cluster.add_lrs_report_endpoint_metrics("named_metrics.foo");
cluster.add_lrs_report_endpoint_metrics("cpu_utilization");
cluster.add_lrs_report_endpoint_metrics("mem_utilization");
cluster.add_lrs_report_endpoint_metrics("application_utilization");
cluster.add_lrs_report_endpoint_metrics("unknown_field");
balancer_->ads_service()->SetCdsResource(cluster);
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 2)},
{"locality1", CreateEndpointsForBackends(2, 4)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until all backends are ready.
size_t num_warmup_rpcs =
WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false));
// Send kNumRpcsPerAddress RPCs per server with named metrics.
xds::data::orca::v3::OrcaLoadReport backend_metrics;
backend_metrics.set_cpu_utilization(0.8);
backend_metrics.set_mem_utilization(0.6);
backend_metrics.set_application_utilization(0.4);
auto& named_metrics = (*backend_metrics.mutable_named_metrics());
named_metrics["foo"] = 1.0;
named_metrics["bar"] = 2.0; // Not propagated.
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
RpcOptions().set_backend_metrics(backend_metrics));
backend_metrics.set_cpu_utilization(0.4);
backend_metrics.set_mem_utilization(0.3);
backend_metrics.set_application_utilization(0.2);
named_metrics["foo"] = 0.3;
named_metrics["bar"] = 0.4; // Not propagated.
for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
RpcOptions().set_server_fail(true).set_backend_metrics(
backend_metrics));
}
const size_t total_successful_rpcs_sent =
(kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
const size_t total_failed_rpcs_sent =
kNumFailuresPerAddress * backends_.size();
// Check that the backends got the right number of requests.
size_t total_rpcs_sent = 0;
for (const auto& backend : backends_) {
total_rpcs_sent += backend->backend_service()->request_count();
}
EXPECT_EQ(total_rpcs_sent,
total_successful_rpcs_sent + total_failed_rpcs_sent);
// The load report received at the balancer should be correct.
std::vector<ClientStats> load_report =
balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL);
ClientStats& client_stats = load_report.front();
EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
EXPECT_EQ(total_successful_rpcs_sent,
client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
ASSERT_THAT(
client_stats.locality_stats(),
::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
::testing::Pair("locality1", ::testing::_)));
size_t num_successful_rpcs = 0;
size_t num_failed_rpcs = 0;
ClientStats::LocalityStats::LoadMetric cpu_utilization;
ClientStats::LocalityStats::LoadMetric mem_utilization;
ClientStats::LocalityStats::LoadMetric application_utilization;
std::map<std::string, ClientStats::LocalityStats::LoadMetric>
named_metrics_total;
for (const auto& p : client_stats.locality_stats()) {
EXPECT_EQ(p.second.total_requests_in_progress, 0U);
EXPECT_EQ(
p.second.total_issued_requests,
p.second.total_successful_requests + p.second.total_error_requests);
num_successful_rpcs += p.second.total_successful_requests;
num_failed_rpcs += p.second.total_error_requests;
cpu_utilization += p.second.cpu_utilization;
mem_utilization += p.second.mem_utilization;
application_utilization += p.second.application_utilization;
for (const auto& s : p.second.load_metrics) {
named_metrics_total[s.first] += s.second;
}
}
EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
EXPECT_THAT(
cpu_utilization,
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 0.8 +
(kNumFailuresPerAddress * backends_.size()) * 0.4));
EXPECT_THAT(
mem_utilization,
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 0.6 +
(kNumFailuresPerAddress * backends_.size()) * 0.3));
EXPECT_THAT(
application_utilization,
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 0.4 +
(kNumFailuresPerAddress * backends_.size()) * 0.2));
EXPECT_THAT(
named_metrics_total,
::testing::UnorderedElementsAre(::testing::Pair(
"named_metrics.foo",
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 1.0 +
(kNumFailuresPerAddress * backends_.size()) * 0.3))));
// The LRS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
}
TEST_P(ClientLoadReportingTest, OrcaPropagationNamedMetricsAll) {
grpc_core::testing::ScopedExperimentalEnvVar env(
"GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
CreateAndStartBackends(4);
const size_t kNumRpcsPerAddress = 10;
const size_t kNumFailuresPerAddress = 3;
Cluster cluster = default_cluster_;
cluster.add_lrs_report_endpoint_metrics("named_metrics.*");
balancer_->ads_service()->SetCdsResource(cluster);
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 2)},
{"locality1", CreateEndpointsForBackends(2, 4)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until all backends are ready.
size_t num_warmup_rpcs =
WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false));
// Send kNumRpcsPerAddress RPCs per server with named metrics.
xds::data::orca::v3::OrcaLoadReport backend_metrics;
backend_metrics.set_cpu_utilization(0.8);
backend_metrics.set_mem_utilization(0.6);
backend_metrics.set_application_utilization(0.4);
auto& named_metrics = (*backend_metrics.mutable_named_metrics());
named_metrics["foo"] = 1.0;
named_metrics["bar"] = 2.0;
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
RpcOptions().set_backend_metrics(backend_metrics));
backend_metrics.set_cpu_utilization(0.4);
backend_metrics.set_mem_utilization(0.3);
backend_metrics.set_application_utilization(0.2);
named_metrics["foo"] = 0.3;
named_metrics["bar"] = 0.4;
for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
RpcOptions().set_server_fail(true).set_backend_metrics(
backend_metrics));
}
const size_t total_successful_rpcs_sent =
(kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
const size_t total_failed_rpcs_sent =
kNumFailuresPerAddress * backends_.size();
// Check that the backends got the right number of requests.
size_t total_rpcs_sent = 0;
for (const auto& backend : backends_) {
total_rpcs_sent += backend->backend_service()->request_count();
}
EXPECT_EQ(total_rpcs_sent,
total_successful_rpcs_sent + total_failed_rpcs_sent);
// The load report received at the balancer should be correct.
std::vector<ClientStats> load_report =
balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL);
ClientStats& client_stats = load_report.front();
EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
EXPECT_EQ(total_successful_rpcs_sent,
client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
ASSERT_THAT(
client_stats.locality_stats(),
::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
::testing::Pair("locality1", ::testing::_)));
size_t num_successful_rpcs = 0;
size_t num_failed_rpcs = 0;
std::map<std::string, ClientStats::LocalityStats::LoadMetric>
named_metrics_total;
for (const auto& p : client_stats.locality_stats()) {
EXPECT_EQ(p.second.total_requests_in_progress, 0U);
EXPECT_EQ(
p.second.total_issued_requests,
p.second.total_successful_requests + p.second.total_error_requests);
num_successful_rpcs += p.second.total_successful_requests;
num_failed_rpcs += p.second.total_error_requests;
for (const auto& s : p.second.load_metrics) {
named_metrics_total[s.first] += s.second;
}
}
EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
EXPECT_THAT(
named_metrics_total,
::testing::UnorderedElementsAre(
::testing::Pair(
"named_metrics.foo",
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) *
backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 1.0 +
(kNumFailuresPerAddress * backends_.size()) * 0.3)),
::testing::Pair(
"named_metrics.bar",
LoadMetricEq(
(kNumRpcsPerAddress + kNumFailuresPerAddress) *
backends_.size(),
(kNumRpcsPerAddress * backends_.size()) * 2.0 +
(kNumFailuresPerAddress * backends_.size()) * 0.4))));
// The LRS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
}
TEST_P(ClientLoadReportingTest, OrcaPropagationNotConfigured) {
grpc_core::testing::ScopedExperimentalEnvVar env(
"GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
CreateAndStartBackends(4);
const size_t kNumRpcsPerAddress = 10;
const size_t kNumFailuresPerAddress = 3;
EdsResourceArgs args({
{"locality0", CreateEndpointsForBackends(0, 2)},
{"locality1", CreateEndpointsForBackends(2, 4)},
});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Wait until all backends are ready.
size_t num_warmup_rpcs =
WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
WaitForBackendOptions().set_reset_counters(false));
// Send kNumRpcsPerAddress RPCs per server with named metrics.
xds::data::orca::v3::OrcaLoadReport backend_metrics;
backend_metrics.set_cpu_utilization(0.8);
backend_metrics.set_mem_utilization(0.6);
backend_metrics.set_application_utilization(0.4);
auto& named_metrics = (*backend_metrics.mutable_named_metrics());
named_metrics["foo"] = 1.0;
named_metrics["bar"] = 2.0;
CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
RpcOptions().set_backend_metrics(backend_metrics));
backend_metrics.set_cpu_utilization(0.4);
backend_metrics.set_mem_utilization(0.3);
backend_metrics.set_application_utilization(0.2);
named_metrics["foo"] = 0.3;
named_metrics["bar"] = 0.4;
for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
RpcOptions().set_server_fail(true).set_backend_metrics(
backend_metrics));
}
const size_t total_successful_rpcs_sent =
(kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
const size_t total_failed_rpcs_sent =
kNumFailuresPerAddress * backends_.size();
// Check that the backends got the right number of requests.
size_t total_rpcs_sent = 0;
for (const auto& backend : backends_) {
total_rpcs_sent += backend->backend_service()->request_count();
}
EXPECT_EQ(total_rpcs_sent,
total_successful_rpcs_sent + total_failed_rpcs_sent);
// The load report received at the balancer should be correct.
std::vector<ClientStats> load_report =
balancer_->lrs_service()->WaitForLoadReport();
ASSERT_EQ(load_report.size(), 1UL);
ClientStats& client_stats = load_report.front();
EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
EXPECT_EQ(total_successful_rpcs_sent,
client_stats.total_successful_requests());
EXPECT_EQ(0U, client_stats.total_requests_in_progress());
EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
EXPECT_EQ(0U, client_stats.total_dropped_requests());
ASSERT_THAT(
client_stats.locality_stats(),
::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
::testing::Pair("locality1", ::testing::_)));
size_t num_successful_rpcs = 0;
size_t num_failed_rpcs = 0;
std::map<std::string, ClientStats::LocalityStats::LoadMetric>
named_metrics_total;
for (const auto& p : client_stats.locality_stats()) {
EXPECT_EQ(p.second.total_requests_in_progress, 0U);
EXPECT_EQ(
p.second.total_issued_requests,
p.second.total_successful_requests + p.second.total_error_requests);
num_successful_rpcs += p.second.total_successful_requests;
num_failed_rpcs += p.second.total_error_requests;
for (const auto& s : p.second.load_metrics) {
named_metrics_total[s.first] += s.second;
}
}
EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
EXPECT_THAT(named_metrics_total, ::testing::UnorderedElementsAre());
// The LRS service got a single request, and sent a single response.
EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
}
// Tests send_all_clusters.
TEST_P(ClientLoadReportingTest, SendAllClusters) {
CreateAndStartBackends(2);

@ -304,6 +304,18 @@ class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType>,
if (request->has_param() && request->param().has_backend_metrics()) {
const auto& request_metrics = request->param().backend_metrics();
auto* recorder = context->ExperimentalGetCallMetricRecorder();
if (request_metrics.cpu_utilization() != 0) {
recorder->RecordCpuUtilizationMetric(
request_metrics.cpu_utilization());
}
if (request_metrics.mem_utilization() != 0) {
recorder->RecordMemoryUtilizationMetric(
request_metrics.mem_utilization());
}
if (request_metrics.application_utilization() != 0) {
recorder->RecordApplicationUtilizationMetric(
request_metrics.application_utilization());
}
for (const auto& p : request_metrics.named_metrics()) {
char* key = static_cast<char*>(
grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));

@ -617,8 +617,19 @@ class LrsServiceImpl
// Stats for a given locality.
struct LocalityStats {
struct LoadMetric {
uint64_t num_requests_finished_with_metric;
double total_metric_value;
uint64_t num_requests_finished_with_metric = 0;
double total_metric_value = 0;
LoadMetric() = default;
// Works for both EndpointLoadMetricStats and
// UnnamedEndpointLoadMetricStats.
template <typename T>
explicit LoadMetric(const T& stats)
: num_requests_finished_with_metric(
stats.num_requests_finished_with_metric()),
total_metric_value(stats.total_metric_value()) {}
LoadMetric& operator+=(const LoadMetric& other) {
num_requests_finished_with_metric +=
other.num_requests_finished_with_metric;
@ -640,10 +651,13 @@ class LrsServiceImpl
total_error_requests(
upstream_locality_stats.total_error_requests()),
total_issued_requests(
upstream_locality_stats.total_issued_requests()) {
upstream_locality_stats.total_issued_requests()),
cpu_utilization(upstream_locality_stats.cpu_utilization()),
mem_utilization(upstream_locality_stats.mem_utilization()),
application_utilization(
upstream_locality_stats.application_utilization()) {
for (const auto& s : upstream_locality_stats.load_metric_stats()) {
load_metrics[s.metric_name()] += LoadMetric{
s.num_requests_finished_with_metric(), s.total_metric_value()};
load_metrics[s.metric_name()] += LoadMetric(s);
}
}
@ -652,6 +666,9 @@ class LrsServiceImpl
total_requests_in_progress += other.total_requests_in_progress;
total_error_requests += other.total_error_requests;
total_issued_requests += other.total_issued_requests;
cpu_utilization += other.cpu_utilization;
mem_utilization += other.mem_utilization;
application_utilization += other.application_utilization;
for (const auto& p : other.load_metrics) {
load_metrics[p.first] += p.second;
}
@ -662,6 +679,9 @@ class LrsServiceImpl
uint64_t total_requests_in_progress = 0;
uint64_t total_error_requests = 0;
uint64_t total_issued_requests = 0;
LoadMetric cpu_utilization;
LoadMetric mem_utilization;
LoadMetric application_utilization;
std::map<std::string, LoadMetric> load_metrics;
};

@ -3044,6 +3044,8 @@ src/core/xds/xds_client/lrs_client.cc \
src/core/xds/xds_client/lrs_client.h \
src/core/xds/xds_client/xds_api.cc \
src/core/xds/xds_client/xds_api.h \
src/core/xds/xds_client/xds_backend_metric_propagation.cc \
src/core/xds/xds_client/xds_backend_metric_propagation.h \
src/core/xds/xds_client/xds_bootstrap.cc \
src/core/xds/xds_client/xds_bootstrap.h \
src/core/xds/xds_client/xds_channel_args.h \

@ -2821,6 +2821,8 @@ src/core/xds/xds_client/lrs_client.cc \
src/core/xds/xds_client/lrs_client.h \
src/core/xds/xds_client/xds_api.cc \
src/core/xds/xds_client/xds_api.h \
src/core/xds/xds_client/xds_backend_metric_propagation.cc \
src/core/xds/xds_client/xds_backend_metric_propagation.h \
src/core/xds/xds_client/xds_bootstrap.cc \
src/core/xds/xds_client/xds_bootstrap.h \
src/core/xds/xds_client/xds_channel_args.h \

Loading…
Cancel
Save