[WRR] Prefer application_utilization to cpu_utilization (#33355)

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/33373/head
Yousuk Seung 2 years ago committed by GitHub
parent 18c42a21af
commit c03cd744b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      CMakeLists.txt
  2. 8
      bazel/grpc_deps.bzl
  3. 8
      build_autogenerated.yaml
  4. 8
      include/grpcpp/ext/call_metric_recorder.h
  5. 6
      include/grpcpp/ext/server_metric_recorder.h
  6. 5
      src/core/ext/filters/backend_metrics/backend_metric_filter.cc
  7. 2
      src/core/ext/filters/client_channel/backend_metric.cc
  8. 3
      src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h
  9. 33
      src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc
  10. 13
      src/core/ext/upb-generated/xds/data/orca/v3/orca_load_report.upb.c
  11. 15
      src/core/ext/upb-generated/xds/data/orca/v3/orca_load_report.upb.h
  12. 60
      src/cpp/server/backend_metric_recorder.cc
  13. 3
      src/cpp/server/backend_metric_recorder.h
  14. 4
      src/cpp/server/orca/orca_service.cc
  15. 8
      src/proto/grpc/testing/xds/v3/orca_load_report.proto
  16. 262
      test/core/client_channel/lb_policy/weighted_round_robin_test.cc
  17. 54
      test/cpp/end2end/client_lb_end2end_test.cc
  18. 22
      test/cpp/end2end/orca_service_end2end_test.cc
  19. 11
      test/cpp/end2end/xds/xds_wrr_end2end_test.cc
  20. 2
      third_party/xds
  21. 2
      tools/run_tests/sanity/check_submodules.sh

12
CMakeLists.txt generated

@ -394,18 +394,18 @@ if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/third_party/xds)
# Download the archive via HTTP, validate the checksum, and extract to third_party/xds.
download_archive(
${CMAKE_CURRENT_SOURCE_DIR}/third_party/xds
https://storage.googleapis.com/grpc-bazel-mirror/github.com/cncf/xds/archive/4003588d1b747e37e911baa5a9c1c07fde4ca518.tar.gz
aef36c29bd0ef95509f7f52693dbdafe4a2c2c5d1eb406bf68e6364a0d12e11b
xds-4003588d1b747e37e911baa5a9c1c07fde4ca518
https://storage.googleapis.com/grpc-bazel-mirror/github.com/cncf/xds/archive/e9ce68804cb4e64cab5a52e3c8baf840d4ff87b7.tar.gz
0d33b83f8c6368954e72e7785539f0d272a8aba2f6e2e336ed15fd1514bc9899
xds-e9ce68804cb4e64cab5a52e3c8baf840d4ff87b7
)
endif()
if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/third_party/xds)
# Download the archive via HTTP, validate the checksum, and extract to third_party/xds.
download_archive(
${CMAKE_CURRENT_SOURCE_DIR}/third_party/xds
https://github.com/cncf/xds/archive/4003588d1b747e37e911baa5a9c1c07fde4ca518.tar.gz
aef36c29bd0ef95509f7f52693dbdafe4a2c2c5d1eb406bf68e6364a0d12e11b
xds-4003588d1b747e37e911baa5a9c1c07fde4ca518
https://github.com/cncf/xds/archive/e9ce68804cb4e64cab5a52e3c8baf840d4ff87b7.tar.gz
0d33b83f8c6368954e72e7785539f0d272a8aba2f6e2e336ed15fd1514bc9899
xds-e9ce68804cb4e64cab5a52e3c8baf840d4ff87b7
)
endif()

@ -502,11 +502,11 @@ def grpc_deps():
if "com_github_cncf_udpa" not in native.existing_rules():
http_archive(
name = "com_github_cncf_udpa",
sha256 = "aef36c29bd0ef95509f7f52693dbdafe4a2c2c5d1eb406bf68e6364a0d12e11b",
strip_prefix = "xds-4003588d1b747e37e911baa5a9c1c07fde4ca518",
sha256 = "0d33b83f8c6368954e72e7785539f0d272a8aba2f6e2e336ed15fd1514bc9899",
strip_prefix = "xds-e9ce68804cb4e64cab5a52e3c8baf840d4ff87b7",
urls = [
"https://storage.googleapis.com/grpc-bazel-mirror/github.com/cncf/xds/archive/4003588d1b747e37e911baa5a9c1c07fde4ca518.tar.gz",
"https://github.com/cncf/xds/archive/4003588d1b747e37e911baa5a9c1c07fde4ca518.tar.gz",
"https://storage.googleapis.com/grpc-bazel-mirror/github.com/cncf/xds/archive/e9ce68804cb4e64cab5a52e3c8baf840d4ff87b7.tar.gz",
"https://github.com/cncf/xds/archive/e9ce68804cb4e64cab5a52e3c8baf840d4ff87b7.tar.gz",
],
)

@ -14119,10 +14119,10 @@ external_proto_libraries:
- https://storage.googleapis.com/grpc-bazel-mirror/github.com/census-instrumentation/opencensus-proto/archive/v0.3.0.tar.gz
- https://github.com/census-instrumentation/opencensus-proto/archive/v0.3.0.tar.gz
- destination: third_party/xds
hash: aef36c29bd0ef95509f7f52693dbdafe4a2c2c5d1eb406bf68e6364a0d12e11b
hash: 0d33b83f8c6368954e72e7785539f0d272a8aba2f6e2e336ed15fd1514bc9899
proto_prefix: third_party/xds/
strip_prefix: xds-4003588d1b747e37e911baa5a9c1c07fde4ca518
strip_prefix: xds-e9ce68804cb4e64cab5a52e3c8baf840d4ff87b7
urls:
- https://storage.googleapis.com/grpc-bazel-mirror/github.com/cncf/xds/archive/4003588d1b747e37e911baa5a9c1c07fde4ca518.tar.gz
- https://github.com/cncf/xds/archive/4003588d1b747e37e911baa5a9c1c07fde4ca518.tar.gz
- https://storage.googleapis.com/grpc-bazel-mirror/github.com/cncf/xds/archive/e9ce68804cb4e64cab5a52e3c8baf840d4ff87b7.tar.gz
- https://github.com/cncf/xds/archive/e9ce68804cb4e64cab5a52e3c8baf840d4ff87b7.tar.gz
tests: []

@ -50,6 +50,14 @@ class CallMetricRecorder {
/// Values outside of the valid range [0, 1] are ignored.
virtual CallMetricRecorder& RecordMemoryUtilizationMetric(double value) = 0;
/// Records a call metric measurement for application specific utilization.
/// Multiple calls to this method will override the stored value.
/// Values may be larger than 1.0 when the usage exceeds the reporter
/// dependent notion of soft limits.
/// Values outside of the valid range [0, infy] are ignored.
virtual CallMetricRecorder& RecordApplicationUtilizationMetric(
double value) = 0;
/// Records a call metric measurement for queries per second.
/// Multiple calls to this method will override the stored value.
/// Values outside of the valid range [0, infy) are ignored.

@ -52,6 +52,10 @@ class ServerMetricRecorder {
/// Values outside of the valid range are rejected.
/// Overrides the stored value when called again with a valid value.
void SetMemoryUtilization(double value);
/// Records the application specific utilization in the range [0, infy].
/// Values outside of the valid range are rejected.
/// Overrides the stored value when called again with a valid value.
void SetApplicationUtilization(double value);
/// Records number of queries per second to the server in the range [0, infy).
/// Values outside of the valid range are rejected.
/// Overrides the stored value when called again with a valid value.
@ -77,6 +81,8 @@ class ServerMetricRecorder {
void ClearCpuUtilization();
/// Clears the server memory utilization if recorded.
void ClearMemoryUtilization();
/// Clears the application specific utilization if recorded.
void ClearApplicationUtilization();
/// Clears number of queries per second to the server if recorded.
void ClearQps();
/// Clears number of errors per second to the server if recorded.

@ -67,6 +67,11 @@ absl::optional<std::string> BackendMetricFilter::MaybeSerializeBackendMetrics(
data.mem_utilization);
has_data = true;
}
if (data.application_utilization != -1) {
xds_data_orca_v3_OrcaLoadReport_set_application_utilization(
response, data.application_utilization);
has_data = true;
}
if (data.qps != -1) {
xds_data_orca_v3_OrcaLoadReport_set_rps_fractional(response, data.qps);
has_data = true;

@ -69,6 +69,8 @@ const BackendMetricData* ParseBackendMetricData(
xds_data_orca_v3_OrcaLoadReport_cpu_utilization(msg);
backend_metric_data->mem_utilization =
xds_data_orca_v3_OrcaLoadReport_mem_utilization(msg);
backend_metric_data->application_utilization =
xds_data_orca_v3_OrcaLoadReport_application_utilization(msg);
backend_metric_data->qps =
xds_data_orca_v3_OrcaLoadReport_rps_fractional(msg);
backend_metric_data->eps = xds_data_orca_v3_OrcaLoadReport_eps(msg);

@ -32,6 +32,9 @@ struct BackendMetricData {
/// Memory utilization expressed as a fraction of available memory
/// resources.
double mem_utilization = -1;
/// Application specific utilization expressed as a fraction of available
/// resources.
double application_utilization = -1;
/// Total queries per second being served by the backend across all services.
double qps = -1;
/// Total errors per second reported by the backend across all services.

@ -158,7 +158,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
: wrr_(std::move(wrr)), key_(std::move(key)) {}
~AddressWeight() override;
void MaybeUpdateWeight(double qps, double eps, double cpu_utilization,
void MaybeUpdateWeight(double qps, double eps, double utilization,
float error_utilization_penalty);
float GetWeight(Timestamp now, Duration weight_expiration_period,
@ -398,23 +398,23 @@ WeightedRoundRobin::AddressWeight::~AddressWeight() {
}
void WeightedRoundRobin::AddressWeight::MaybeUpdateWeight(
double qps, double eps, double cpu_utilization,
double qps, double eps, double utilization,
float error_utilization_penalty) {
// Compute weight.
float weight = 0;
if (qps > 0 && cpu_utilization > 0) {
if (qps > 0 && utilization > 0) {
double penalty = 0.0;
if (eps > 0 && error_utilization_penalty > 0) {
penalty = eps / qps * error_utilization_penalty;
}
weight = qps / (cpu_utilization + penalty);
weight = qps / (utilization + penalty);
}
if (weight == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO,
"[WRR %p] subchannel %s: qps=%f, eps=%f, cpu_utilization=%f: "
"[WRR %p] subchannel %s: qps=%f, eps=%f, utilization=%f: "
"error_util_penalty=%f, weight=%f (not updating)",
wrr_.get(), key_.c_str(), qps, eps, cpu_utilization,
wrr_.get(), key_.c_str(), qps, eps, utilization,
error_utilization_penalty, weight);
}
return;
@ -424,10 +424,10 @@ void WeightedRoundRobin::AddressWeight::MaybeUpdateWeight(
MutexLock lock(&mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO,
"[WRR %p] subchannel %s: qps=%f, eps=%f, cpu_utilization=%f "
"[WRR %p] subchannel %s: qps=%f, eps=%f, utilization=%f "
"error_util_penalty=%f : setting weight=%f weight_=%f now=%s "
"last_update_time_=%s non_empty_since_=%s",
wrr_.get(), key_.c_str(), qps, eps, cpu_utilization,
wrr_.get(), key_.c_str(), qps, eps, utilization,
error_utilization_penalty, weight, weight_, now.ToString().c_str(),
last_update_time_.ToString().c_str(),
non_empty_since_.ToString().c_str());
@ -483,14 +483,16 @@ void WeightedRoundRobin::Picker::SubchannelCallTracker::Finish(
args.backend_metric_accessor->GetBackendMetricData();
double qps = 0;
double eps = 0;
double cpu_utilization = 0;
double utilization = 0;
if (backend_metric_data != nullptr) {
qps = backend_metric_data->qps;
eps = backend_metric_data->eps;
cpu_utilization = backend_metric_data->cpu_utilization;
utilization = backend_metric_data->application_utilization;
if (utilization <= 0) {
utilization = backend_metric_data->cpu_utilization;
}
}
weight_->MaybeUpdateWeight(qps, eps, cpu_utilization,
error_utilization_penalty_);
weight_->MaybeUpdateWeight(qps, eps, utilization, error_utilization_penalty_);
}
//
@ -847,9 +849,12 @@ void WeightedRoundRobin::WeightedRoundRobinSubchannelList::
void WeightedRoundRobin::WeightedRoundRobinSubchannelData::OobWatcher::
OnBackendMetricReport(const BackendMetricData& backend_metric_data) {
double utilization = backend_metric_data.application_utilization;
if (utilization <= 0) {
utilization = backend_metric_data.cpu_utilization;
}
weight_->MaybeUpdateWeight(backend_metric_data.qps, backend_metric_data.eps,
backend_metric_data.cpu_utilization,
error_utilization_penalty_);
utilization, error_utilization_penalty_);
}
//

@ -22,7 +22,7 @@ static const upb_MiniTableSub xds_data_orca_v3_OrcaLoadReport_submsgs[3] = {
{.submsg = &xds_data_orca_v3_OrcaLoadReport_NamedMetricsEntry_msg_init},
};
static const upb_MiniTableField xds_data_orca_v3_OrcaLoadReport__fields[8] = {
static const upb_MiniTableField xds_data_orca_v3_OrcaLoadReport__fields[9] = {
{1, UPB_SIZE(16, 0), 0, kUpb_NoSub, 1, kUpb_FieldMode_Scalar | (kUpb_FieldRep_8Byte << kUpb_FieldRep_Shift)},
{2, UPB_SIZE(24, 8), 0, kUpb_NoSub, 1, kUpb_FieldMode_Scalar | (kUpb_FieldRep_8Byte << kUpb_FieldRep_Shift)},
{3, UPB_SIZE(32, 16), 0, kUpb_NoSub, 4, kUpb_FieldMode_Scalar | (kUpb_FieldRep_8Byte << kUpb_FieldRep_Shift)},
@ -31,12 +31,13 @@ static const upb_MiniTableField xds_data_orca_v3_OrcaLoadReport__fields[8] = {
{6, 40, 0, kUpb_NoSub, 1, kUpb_FieldMode_Scalar | (kUpb_FieldRep_8Byte << kUpb_FieldRep_Shift)},
{7, 48, 0, kUpb_NoSub, 1, kUpb_FieldMode_Scalar | (kUpb_FieldRep_8Byte << kUpb_FieldRep_Shift)},
{8, UPB_SIZE(8, 56), 0, 2, 11, kUpb_FieldMode_Map | (UPB_SIZE(kUpb_FieldRep_4Byte, kUpb_FieldRep_8Byte) << kUpb_FieldRep_Shift)},
{9, UPB_SIZE(56, 64), 0, kUpb_NoSub, 1, kUpb_FieldMode_Scalar | (kUpb_FieldRep_8Byte << kUpb_FieldRep_Shift)},
};
const upb_MiniTable xds_data_orca_v3_OrcaLoadReport_msg_init = {
&xds_data_orca_v3_OrcaLoadReport_submsgs[0],
&xds_data_orca_v3_OrcaLoadReport__fields[0],
UPB_SIZE(56, 64), 8, kUpb_ExtMode_NonExtendable, 8, UPB_FASTTABLE_MASK(56), 0,
UPB_SIZE(64, 72), 9, kUpb_ExtMode_NonExtendable, 9, UPB_FASTTABLE_MASK(120), 0,
UPB_FASTTABLE_INIT({
{0x0000000000000000, &_upb_FastDecoder_DecodeGeneric},
{0x000000003f000009, &upb_psf8_1bt},
@ -46,6 +47,14 @@ const upb_MiniTable xds_data_orca_v3_OrcaLoadReport_msg_init = {
{0x0000000000000000, &_upb_FastDecoder_DecodeGeneric},
{0x002800003f000031, &upb_psf8_1bt},
{0x003000003f000039, &upb_psf8_1bt},
{0x0000000000000000, &_upb_FastDecoder_DecodeGeneric},
{0x004000003f000049, &upb_psf8_1bt},
{0x0000000000000000, &_upb_FastDecoder_DecodeGeneric},
{0x0000000000000000, &_upb_FastDecoder_DecodeGeneric},
{0x0000000000000000, &_upb_FastDecoder_DecodeGeneric},
{0x0000000000000000, &_upb_FastDecoder_DecodeGeneric},
{0x0000000000000000, &_upb_FastDecoder_DecodeGeneric},
{0x0000000000000000, &_upb_FastDecoder_DecodeGeneric},
})
};

@ -189,6 +189,17 @@ UPB_INLINE const xds_data_orca_v3_OrcaLoadReport_NamedMetricsEntry* xds_data_orc
if (!map) return NULL;
return (const xds_data_orca_v3_OrcaLoadReport_NamedMetricsEntry*)_upb_map_next(map, iter);
}
UPB_INLINE void xds_data_orca_v3_OrcaLoadReport_clear_application_utilization(xds_data_orca_v3_OrcaLoadReport* msg) {
const upb_MiniTableField field = {9, UPB_SIZE(56, 64), 0, kUpb_NoSub, 1, kUpb_FieldMode_Scalar | (kUpb_FieldRep_8Byte << kUpb_FieldRep_Shift)};
_upb_Message_ClearNonExtensionField(msg, &field);
}
UPB_INLINE double xds_data_orca_v3_OrcaLoadReport_application_utilization(const xds_data_orca_v3_OrcaLoadReport* msg) {
double default_val = 0;
double ret;
const upb_MiniTableField field = {9, UPB_SIZE(56, 64), 0, kUpb_NoSub, 1, kUpb_FieldMode_Scalar | (kUpb_FieldRep_8Byte << kUpb_FieldRep_Shift)};
_upb_Message_GetNonExtensionField(msg, &field, &default_val, &ret);
return ret;
}
UPB_INLINE void xds_data_orca_v3_OrcaLoadReport_set_cpu_utilization(xds_data_orca_v3_OrcaLoadReport *msg, double value) {
const upb_MiniTableField field = {1, UPB_SIZE(16, 0), 0, kUpb_NoSub, 1, kUpb_FieldMode_Scalar | (kUpb_FieldRep_8Byte << kUpb_FieldRep_Shift)};
@ -282,6 +293,10 @@ UPB_INLINE xds_data_orca_v3_OrcaLoadReport_NamedMetricsEntry* xds_data_orca_v3_O
if (!map) return NULL;
return (xds_data_orca_v3_OrcaLoadReport_NamedMetricsEntry*)_upb_map_next(map, iter);
}
UPB_INLINE void xds_data_orca_v3_OrcaLoadReport_set_application_utilization(xds_data_orca_v3_OrcaLoadReport *msg, double value) {
const upb_MiniTableField field = {9, UPB_SIZE(56, 64), 0, kUpb_NoSub, 1, kUpb_FieldMode_Scalar | (kUpb_FieldRep_8Byte << kUpb_FieldRep_Shift)};
_upb_Message_SetNonExtensionField(msg, &field, &value);
}
/* xds.data.orca.v3.OrcaLoadReport.RequestCostEntry */

@ -34,8 +34,8 @@
using grpc_core::BackendMetricData;
namespace {
// CPU utilization values must be in [0, infy).
bool IsCpuUtilizationValid(double cpu) { return cpu >= 0.0; }
// Utilization values with soft limits must be in [0, infy).
bool IsUtilizationWithSoftLimitsValid(double util) { return util >= 0.0; }
// Other utilization values must be in [0, 1].
bool IsUtilizationValid(double utilization) {
@ -68,7 +68,7 @@ void ServerMetricRecorder::UpdateBackendMetricDataState(
}
void ServerMetricRecorder::SetCpuUtilization(double value) {
if (!IsCpuUtilizationValid(value)) {
if (!IsUtilizationWithSoftLimitsValid(value)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] CPU utilization rejected: %f", this, value);
}
@ -95,6 +95,22 @@ void ServerMetricRecorder::SetMemoryUtilization(double value) {
}
}
void ServerMetricRecorder::SetApplicationUtilization(double value) {
if (!IsUtilizationWithSoftLimitsValid(value)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Application utilization rejected: %f", this,
value);
}
return;
}
UpdateBackendMetricDataState([value](BackendMetricData* data) {
data->application_utilization = value;
});
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Application utilization set: %f", this, value);
}
}
void ServerMetricRecorder::SetQps(double value) {
if (!IsRateValid(value)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
@ -172,6 +188,14 @@ void ServerMetricRecorder::ClearMemoryUtilization() {
}
}
void ServerMetricRecorder::ClearApplicationUtilization() {
UpdateBackendMetricDataState(
[](BackendMetricData* data) { data->application_utilization = -1; });
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Application utilization cleared.", this);
}
}
void ServerMetricRecorder::ClearQps() {
UpdateBackendMetricDataState([](BackendMetricData* data) { data->qps = -1; });
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
@ -212,9 +236,10 @@ ServerMetricRecorder::GetMetricsIfChanged() const {
const auto& data = result->data;
gpr_log(GPR_INFO,
"[%p] GetMetrics() returned: seq:%" PRIu64
" cpu:%f mem:%f qps:%f eps:%f utilization size: %" PRIuPTR,
" cpu:%f mem:%f app:%f qps:%f eps:%f utilization size: %" PRIuPTR,
this, result->sequence_number, data.cpu_utilization,
data.mem_utilization, data.qps, data.eps, data.utilization.size());
data.mem_utilization, data.application_utilization, data.qps,
data.eps, data.utilization.size());
}
return result;
}
@ -223,7 +248,7 @@ ServerMetricRecorder::GetMetricsIfChanged() const {
experimental::CallMetricRecorder&
BackendMetricState::RecordCpuUtilizationMetric(double value) {
if (!IsCpuUtilizationValid(value)) {
if (!IsUtilizationWithSoftLimitsValid(value)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] CPU utilization value rejected: %f", this, value);
}
@ -251,6 +276,22 @@ BackendMetricState::RecordMemoryUtilizationMetric(double value) {
return *this;
}
experimental::CallMetricRecorder&
BackendMetricState::RecordApplicationUtilizationMetric(double value) {
if (!IsUtilizationWithSoftLimitsValid(value)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Application utilization value rejected: %f", this,
value);
}
return *this;
}
application_utilization_.store(value, std::memory_order_relaxed);
if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
gpr_log(GPR_INFO, "[%p] Application utilization recorded: %f", this, value);
}
return *this;
}
experimental::CallMetricRecorder& BackendMetricState::RecordQpsMetric(
double value) {
if (!IsRateValid(value)) {
@ -333,13 +374,18 @@ BackendMetricData BackendMetricState::GetBackendMetricData() {
}
// Only overwrite if the value is set i.e. in the valid range.
const double cpu = cpu_utilization_.load(std::memory_order_relaxed);
if (IsCpuUtilizationValid(cpu)) {
if (IsUtilizationWithSoftLimitsValid(cpu)) {
data.cpu_utilization = cpu;
}
const double mem = mem_utilization_.load(std::memory_order_relaxed);
if (IsUtilizationValid(mem)) {
data.mem_utilization = mem;
}
const double app_util =
application_utilization_.load(std::memory_order_relaxed);
if (IsUtilizationWithSoftLimitsValid(app_util)) {
data.application_utilization = app_util;
}
const double qps = qps_.load(std::memory_order_relaxed);
if (IsRateValid(qps)) {
data.qps = qps;

@ -58,6 +58,8 @@ class BackendMetricState : public grpc_core::BackendMetricProvider,
double value) override;
experimental::CallMetricRecorder& RecordMemoryUtilizationMetric(
double value) override;
experimental::CallMetricRecorder& RecordApplicationUtilizationMetric(
double value) override;
experimental::CallMetricRecorder& RecordQpsMetric(double value) override;
experimental::CallMetricRecorder& RecordEpsMetric(double value) override;
experimental::CallMetricRecorder& RecordUtilizationMetric(
@ -73,6 +75,7 @@ class BackendMetricState : public grpc_core::BackendMetricProvider,
experimental::ServerMetricRecorder* server_metric_recorder_;
std::atomic<double> cpu_utilization_{-1.0};
std::atomic<double> mem_utilization_{-1.0};
std::atomic<double> application_utilization_{-1.0};
std::atomic<double> qps_{-1.0};
std::atomic<double> eps_{-1.0};
internal::Mutex mu_;

@ -203,6 +203,10 @@ Slice OrcaService::GetOrCreateSerializedResponse() {
xds_data_orca_v3_OrcaLoadReport_set_mem_utilization(response,
data.mem_utilization);
}
if (data.application_utilization != -1) {
xds_data_orca_v3_OrcaLoadReport_set_application_utilization(
response, data.application_utilization);
}
if (data.qps != -1) {
xds_data_orca_v3_OrcaLoadReport_set_rps_fractional(response, data.qps);
}

@ -52,5 +52,13 @@ message OrcaLoadReport {
// Application specific opaque metrics.
map<string, double> named_metrics = 8;
// Application specific utilization expressed as a fraction of available
// resources. For example, an application may report the max of CPU and memory
// utilization for better load balancing if it is both CPU and memory bound.
// This should be derived from the latest sample or measurement.
// The value may be larger than 1.0 when the usage exceeds the reporter
// dependent notion of soft limits.
double application_utilization = 9;
}

@ -56,10 +56,12 @@ namespace grpc_core {
namespace testing {
namespace {
BackendMetricData MakeBackendMetricData(double cpu_utilization, double qps,
double eps) {
BackendMetricData MakeBackendMetricData(double app_utilization, double qps,
double eps,
double cpu_utilization = 0) {
BackendMetricData b;
b.cpu_utilization = cpu_utilization;
b.application_utilization = app_utilization;
b.qps = qps;
b.eps = eps;
return b;
@ -193,6 +195,8 @@ class WeightedRoundRobinTest : public TimeAwareLoadBalancingPolicyTest {
backend_metric_data->qps = it->second.qps;
backend_metric_data->eps = it->second.eps;
backend_metric_data->cpu_utilization = it->second.cpu_utilization;
backend_metric_data->application_utilization =
it->second.application_utilization;
}
FakeMetadata metadata({});
FakeBackendMetricAccessor backend_metric_accessor(
@ -213,6 +217,8 @@ class WeightedRoundRobinTest : public TimeAwareLoadBalancingPolicyTest {
backend_metric_data.qps = p.second.qps;
backend_metric_data.eps = p.second.eps;
backend_metric_data.cpu_utilization = p.second.cpu_utilization;
backend_metric_data.application_utilization =
p.second.application_utilization;
subchannel->SendOobBackendMetricReport(backend_metric_data);
}
}
@ -334,24 +340,92 @@ TEST_F(WeightedRoundRobinTest, Basic) {
// No utilization report from backend 2, so it gets the average weight 2.
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.9,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
// Now have backend 2 report utilization the same as backend 1, so its
// weight will be the same.
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.9,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[2], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
}
TEST_F(WeightedRoundRobinTest, CpuUtilWithNoAppUtil) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = SendInitialUpdateAndWaitForConnected(kAddresses);
ASSERT_NE(picker, nullptr);
// Address 0 gets weight 1, address 1 gets weight 3.
// No utilization report from backend 2, so it gets the average weight 2.
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0,
/*qps=*/100.0, /*eps=*/0.0,
/*cpu_utilization=*/0.9)},
{kAddresses[1],
MakeBackendMetricData(/*app_utilization=*/0,
/*qps=*/100.0,
/*eps=*/0.0, /*cpu_utilization=*/0.3)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
// Now have backend 2 report utilization the same as backend 1, so its
// weight will be the same.
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0,
/*qps=*/100.0, /*eps=*/0.0,
/*cpu_utilization=*/0.9)},
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0,
/*qps=*/100.0, /*eps=*/0.0,
/*cpu_utilization=*/0.3)},
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0,
/*qps=*/100.0, /*eps=*/0.0,
/*cpu_utilization=*/0.3)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
}
TEST_F(WeightedRoundRobinTest, AppUtilOverCpuUtil) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = SendInitialUpdateAndWaitForConnected(kAddresses);
ASSERT_NE(picker, nullptr);
// Address 0 gets weight 1, address 1 gets weight 3.
// No utilization report from backend 2, so it gets the average weight 2.
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0,
/*cpu_utilization=*/0.3)},
{kAddresses[1],
MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0,
/*eps=*/0.0, /*cpu_utilization=*/0.4)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
// Now have backend 2 report utilization the same as backend 1, so its
// weight will be the same.
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0,
/*cpu_utilization=*/0.2)},
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0,
/*cpu_utilization=*/0.6)},
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0,
/*cpu_utilization=*/0.5)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
}
TEST_F(WeightedRoundRobinTest, Eps) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
@ -362,11 +436,11 @@ TEST_F(WeightedRoundRobinTest, Eps) {
// Expected weights: 1/(0.1+0.5) : 1/(0.1+0.2) : 1/(0.1+0.1) = 1:2:3
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.1,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.1,
/*qps=*/100.0, /*eps=*/50.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.1,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.1,
/*qps=*/100.0, /*eps=*/20.0)},
{kAddresses[2], MakeBackendMetricData(/*cpu_utilization=*/0.1,
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.1,
/*qps=*/100.0, /*eps=*/10.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 2}, {kAddresses[2], 3}});
}
@ -385,20 +459,20 @@ TEST_F(WeightedRoundRobinTest, IgnoresDuplicateAddresses) {
// No utilization report from backend 2, so it gets the average weight 2.
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.9,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
// Now have backend 2 report utilization the same as backend 1, so its
// weight will be the same.
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.9,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[2], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
}
@ -425,9 +499,9 @@ TEST_F(WeightedRoundRobinTest, OobReporting) {
// Address 0 gets weight 1, address 1 gets weight 3.
// No utilization report from backend 2, so it gets the average weight 2.
ReportOobBackendMetrics(
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.9,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}});
WaitForWeightedRoundRobinPicks(
&picker, {},
@ -435,11 +509,11 @@ TEST_F(WeightedRoundRobinTest, OobReporting) {
// Now have backend 2 report utilization the same as backend 1, so its
// weight will be the same.
ReportOobBackendMetrics(
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.9,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[2], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}});
WaitForWeightedRoundRobinPicks(
&picker, {},
@ -452,6 +526,94 @@ TEST_F(WeightedRoundRobinTest, OobReporting) {
}
}
TEST_F(WeightedRoundRobinTest, OobReportingCpuUtilWithNoAppUtil) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = SendInitialUpdateAndWaitForConnected(
kAddresses, ConfigBuilder().SetEnableOobLoadReport(true));
ASSERT_NE(picker, nullptr);
// Address 0 gets weight 1, address 1 gets weight 3.
// No utilization report from backend 2, so it gets the average weight 2.
ReportOobBackendMetrics(
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0,
/*qps=*/100.0, /*eps=*/0.0,
/*cpu_utilization=*/0.9)},
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0,
/*qps=*/100.0, /*eps=*/0.0,
/*cpu_utilization=*/0.3)}});
WaitForWeightedRoundRobinPicks(
&picker, {},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
// Now have backend 2 report utilization the same as backend 1, so its
// weight will be the same.
ReportOobBackendMetrics(
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0,
/*qps=*/100.0, /*eps=*/0.0,
/*cpu_utilization=*/0.9)},
{kAddresses[1],
MakeBackendMetricData(/*app_utilization=*/0,
/*qps=*/100.0,
/*eps=*/0.0, /*cpu_utilization=*/0.3)},
{kAddresses[2],
MakeBackendMetricData(/*app_utilization=*/0,
/*qps=*/100.0,
/*eps=*/0.0, /*cpu_utilization=*/0.3)}});
WaitForWeightedRoundRobinPicks(
&picker, {},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Verify that OOB reporting interval is the default.
for (const auto& address : kAddresses) {
auto* subchannel = FindSubchannel(address);
ASSERT_NE(subchannel, nullptr);
subchannel->CheckOobReportingPeriod(Duration::Seconds(10));
}
}
TEST_F(WeightedRoundRobinTest, OobReportingAppUtilOverCpuUtil) {
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = SendInitialUpdateAndWaitForConnected(
kAddresses, ConfigBuilder().SetEnableOobLoadReport(true));
ASSERT_NE(picker, nullptr);
// Address 0 gets weight 1, address 1 gets weight 3.
// No utilization report from backend 2, so it gets the average weight 2.
ReportOobBackendMetrics(
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0,
/*cpu_utilization=*/0.3)},
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0,
/*cpu_utilization=*/0.4)}});
WaitForWeightedRoundRobinPicks(
&picker, {},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
// Now have backend 2 report utilization the same as backend 1, so its
// weight will be the same.
ReportOobBackendMetrics(
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0,
/*cpu_utilization=*/0.2)},
{kAddresses[1],
MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0,
/*eps=*/0.0, /*cpu_utilization=*/0.6)},
{kAddresses[2],
MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0,
/*eps=*/0.0, /*cpu_utilization=*/0.5)}});
WaitForWeightedRoundRobinPicks(
&picker, {},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Verify that OOB reporting interval is the default.
for (const auto& address : kAddresses) {
auto* subchannel = FindSubchannel(address);
ASSERT_NE(subchannel, nullptr);
subchannel->CheckOobReportingPeriod(Duration::Seconds(10));
}
}
TEST_F(WeightedRoundRobinTest, HonorsOobReportingPeriod) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
@ -461,11 +623,11 @@ TEST_F(WeightedRoundRobinTest, HonorsOobReportingPeriod) {
Duration::Seconds(5)));
ASSERT_NE(picker, nullptr);
ReportOobBackendMetrics(
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.9,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[2], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}});
WaitForWeightedRoundRobinPicks(
&picker, {},
@ -486,11 +648,11 @@ TEST_F(WeightedRoundRobinTest, HonorsWeightUpdatePeriod) {
ASSERT_NE(picker, nullptr);
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.9,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[2], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
}
@ -505,11 +667,11 @@ TEST_F(WeightedRoundRobinTest, WeightUpdatePeriodLowerBound) {
ASSERT_NE(picker, nullptr);
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.9,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[2], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
}
@ -525,11 +687,11 @@ TEST_F(WeightedRoundRobinTest, WeightExpirationPeriod) {
// All backends report weights.
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.9,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[2], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Advance time to make weights stale and trigger the timer callback
@ -553,11 +715,11 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterWeightExpiration) {
// All backends report weights.
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.9,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[2], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Advance time to make weights stale and trigger the timer callback
@ -572,11 +734,11 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterWeightExpiration) {
// because we're still in the blackout period.
ExpectWeightedRoundRobinPicks(
picker.get(),
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[2], MakeBackendMetricData(/*cpu_utilization=*/0.9,
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Advance time past the blackout period. This should cause the
@ -599,11 +761,11 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterDisconnect) {
// All backends report weights.
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.9,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[2], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Trigger disconnection and reconnection on address 2.
@ -619,11 +781,11 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterDisconnect) {
picker = ExpectState(GRPC_CHANNEL_READY, absl::OkStatus());
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.9,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[2], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
// Advance time to exceed the blackout period and trigger the timer
@ -632,11 +794,11 @@ TEST_F(WeightedRoundRobinTest, BlackoutPeriodAfterDisconnect) {
RunTimerCallback();
ExpectWeightedRoundRobinPicks(
picker.get(),
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.3,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[2], MakeBackendMetricData(/*cpu_utilization=*/0.9,
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
}
@ -651,11 +813,11 @@ TEST_F(WeightedRoundRobinTest, ZeroErrorUtilPenalty) {
// Expected weights: 1:1:1
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*cpu_utilization=*/0.1,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.1,
/*qps=*/100.0, /*eps=*/50.0)},
{kAddresses[1], MakeBackendMetricData(/*cpu_utilization=*/0.1,
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.1,
/*qps=*/100.0, /*eps=*/20.0)},
{kAddresses[2], MakeBackendMetricData(/*cpu_utilization=*/0.1,
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.1,
/*qps=*/100.0, /*eps=*/10.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 1}, {kAddresses[2], 1}});
}

@ -129,6 +129,10 @@ class MyTestServiceImpl : public TestServiceImpl {
auto* recorder = context->ExperimentalGetCallMetricRecorder();
EXPECT_NE(recorder, nullptr);
// Do not record when zero since it indicates no test per-call report.
if (request_metrics.application_utilization() > 0) {
recorder->RecordApplicationUtilizationMetric(
request_metrics.application_utilization());
}
if (request_metrics.cpu_utilization() > 0) {
recorder->RecordCpuUtilizationMetric(request_metrics.cpu_utilization());
}
@ -2350,6 +2354,10 @@ class OrcaLoadReportBuilder {
OrcaLoadReportBuilder() = default;
explicit OrcaLoadReportBuilder(const OrcaLoadReport& report)
: report_(report) {}
OrcaLoadReportBuilder& SetApplicationUtilization(double v) {
report_.set_application_utilization(v);
return *this;
}
OrcaLoadReportBuilder& SetCpuUtilization(double v) {
report_.set_cpu_utilization(v);
return *this;
@ -2391,6 +2399,8 @@ class OrcaLoadReportBuilder {
OrcaLoadReport BackendMetricDataToOrcaLoadReport(
const grpc_core::BackendMetricData& backend_metric_data) {
auto builder = OrcaLoadReportBuilder()
.SetApplicationUtilization(
backend_metric_data.application_utilization)
.SetCpuUtilization(backend_metric_data.cpu_utilization)
.SetMemUtilization(backend_metric_data.mem_utilization)
.SetQps(backend_metric_data.qps)
@ -2411,6 +2421,8 @@ OrcaLoadReport BackendMetricDataToOrcaLoadReport(
// OSS.
void CheckLoadReportAsExpected(const OrcaLoadReport& actual,
const OrcaLoadReport& expected) {
EXPECT_EQ(actual.application_utilization(),
expected.application_utilization());
EXPECT_EQ(actual.cpu_utilization(), expected.cpu_utilization());
EXPECT_EQ(actual.mem_utilization(), expected.mem_utilization());
EXPECT_EQ(actual.rps_fractional(), expected.rps_fractional());
@ -2671,6 +2683,7 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, InterceptsRetriesEnabled) {
TEST_F(ClientLbInterceptTrailingMetadataTest, Valid) {
RunPerRpcMetricReportingTest(OrcaLoadReportBuilder()
.SetApplicationUtilization(0.25)
.SetCpuUtilization(0.5)
.SetMemUtilization(0.75)
.SetQps(0.25)
@ -2683,6 +2696,7 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, Valid) {
.SetNamedMetrics("metric1", -1.0)
.Build(),
OrcaLoadReportBuilder()
.SetApplicationUtilization(0.25)
.SetCpuUtilization(0.5)
.SetMemUtilization(0.75)
.SetQps(0.25)
@ -2698,6 +2712,7 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, Valid) {
TEST_F(ClientLbInterceptTrailingMetadataTest, NegativeValues) {
RunPerRpcMetricReportingTest(OrcaLoadReportBuilder()
.SetApplicationUtilization(-0.3)
.SetCpuUtilization(-0.1)
.SetMemUtilization(-0.2)
.SetQps(-3)
@ -2714,6 +2729,7 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, NegativeValues) {
TEST_F(ClientLbInterceptTrailingMetadataTest, AboveOneUtilization) {
RunPerRpcMetricReportingTest(OrcaLoadReportBuilder()
.SetApplicationUtilization(1.9)
.SetCpuUtilization(1.1)
.SetMemUtilization(2)
.SetQps(3)
@ -2721,6 +2737,7 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, AboveOneUtilization) {
.SetUtilization("foo", 5)
.Build(),
OrcaLoadReportBuilder()
.SetApplicationUtilization(1.9)
.SetCpuUtilization(1.1)
.SetQps(3)
.SetEps(4)
@ -2731,6 +2748,7 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricDataMerge) {
const int kNumServers = 1;
const int kNumRpcs = 10;
StartServers(kNumServers);
servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.99);
servers_[0]->server_metric_recorder_->SetCpuUtilization(0.99);
servers_[0]->server_metric_recorder_->SetMemoryUtilization(0.99);
servers_[0]->server_metric_recorder_->SetQps(0.99);
@ -2738,6 +2756,7 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricDataMerge) {
servers_[0]->server_metric_recorder_->SetNamedUtilization("foo", 0.99);
servers_[0]->server_metric_recorder_->SetNamedUtilization("bar", 0.1);
OrcaLoadReport per_server_load = OrcaLoadReportBuilder()
.SetApplicationUtilization(0.99)
.SetCpuUtilization(0.99)
.SetMemUtilization(0.99)
.SetQps(0.99)
@ -2753,9 +2772,10 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricDataMerge) {
size_t total_num_rpcs = 0;
{
OrcaLoadReport load_report =
OrcaLoadReportBuilder().SetCpuUtilization(0.5).Build();
OrcaLoadReport expected =
OrcaLoadReportBuilder(per_server_load).SetCpuUtilization(0.5).Build();
OrcaLoadReportBuilder().SetApplicationUtilization(0.5).Build();
OrcaLoadReport expected = OrcaLoadReportBuilder(per_server_load)
.SetApplicationUtilization(0.5)
.Build();
for (size_t i = 0; i < kNumRpcs; ++i) {
CheckRpcSendOk(DEBUG_LOCATION, stub, false, &load_report);
auto actual = backend_load_report();
@ -2974,6 +2994,7 @@ TEST_F(OobBackendMetricTest, Basic) {
StartServers(1);
// Set initial backend metric data on server.
constexpr char kMetricName[] = "foo";
servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.5);
servers_[0]->server_metric_recorder_->SetCpuUtilization(0.1);
servers_[0]->server_metric_recorder_->SetMemoryUtilization(0.2);
servers_[0]->server_metric_recorder_->SetEps(0.3);
@ -2995,6 +3016,7 @@ TEST_F(OobBackendMetricTest, Basic) {
auto report = GetBackendMetricReport();
if (report.has_value()) {
EXPECT_EQ(report->first, servers_[0]->port_);
EXPECT_EQ(report->second.application_utilization(), 0.5);
EXPECT_EQ(report->second.cpu_utilization(), 0.1);
EXPECT_EQ(report->second.mem_utilization(), 0.2);
EXPECT_EQ(report->second.eps(), 0.3);
@ -3011,19 +3033,21 @@ TEST_F(OobBackendMetricTest, Basic) {
// Now update the utilization data on the server.
// Note that the server may send a new report while we're updating these,
// so we set them in reverse order, so that we know we'll get all new
// data once we see a report with the new CPU utilization value.
// data once we see a report with the new app utilization value.
servers_[0]->server_metric_recorder_->SetNamedUtilization(kMetricName, 0.7);
servers_[0]->server_metric_recorder_->SetEps(0.6);
servers_[0]->server_metric_recorder_->SetQps(0.8);
servers_[0]->server_metric_recorder_->SetEps(0.6);
servers_[0]->server_metric_recorder_->SetMemoryUtilization(0.5);
servers_[0]->server_metric_recorder_->SetCpuUtilization(2.4);
servers_[0]->server_metric_recorder_->SetApplicationUtilization(1.2);
// Wait for client to see new report.
report_seen = false;
for (size_t i = 0; i < 5; ++i) {
auto report = GetBackendMetricReport();
if (report.has_value()) {
EXPECT_EQ(report->first, servers_[0]->port_);
if (report->second.cpu_utilization() != 0.1) {
if (report->second.application_utilization() != 0.5) {
EXPECT_EQ(report->second.application_utilization(), 1.2);
EXPECT_EQ(report->second.cpu_utilization(), 2.4);
EXPECT_EQ(report->second.mem_utilization(), 0.5);
EXPECT_EQ(report->second.eps(), 0.6);
@ -3194,15 +3218,16 @@ TEST_F(WeightedRoundRobinTest, CallAndServerMetric) {
const int kNumServers = 3;
StartServers(kNumServers);
// Report server metrics that should give 6:4:3 WRR picks.
// weights = qps / (cpu_util + (eps/qps)) =
// weights = qps / (util + (eps/qps)) =
// 1/(0.2+0.2) : 1/(0.3+0.3) : 2/(1.5+0.1) = 6:4:3
servers_[0]->server_metric_recorder_->SetCpuUtilization(0.2);
// where util is app_util if set, or cpu_util.
servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.2);
servers_[0]->server_metric_recorder_->SetEps(20);
servers_[0]->server_metric_recorder_->SetQps(100);
servers_[1]->server_metric_recorder_->SetCpuUtilization(0.3);
servers_[1]->server_metric_recorder_->SetApplicationUtilization(0.3);
servers_[1]->server_metric_recorder_->SetEps(30);
servers_[1]->server_metric_recorder_->SetQps(100);
servers_[2]->server_metric_recorder_->SetCpuUtilization(1.5);
servers_[2]->server_metric_recorder_->SetApplicationUtilization(1.5);
servers_[2]->server_metric_recorder_->SetEps(20);
servers_[2]->server_metric_recorder_->SetQps(200);
// Create channel.
@ -3241,15 +3266,16 @@ TEST_P(WeightedRoundRobinParamTest, Basic) {
const int kNumServers = 3;
StartServers(kNumServers);
// Report server metrics that should give 1:2:4 WRR picks.
// weights = qps / (cpu_util + (eps/qps)) =
// weights = qps / (util + (eps/qps)) =
// 1/(0.4+0.4) : 1/(0.2+0.2) : 2/(0.3+0.1) = 1:2:4
servers_[0]->server_metric_recorder_->SetCpuUtilization(0.4);
// where util is app_util if set, or cpu_util.
servers_[0]->server_metric_recorder_->SetApplicationUtilization(0.4);
servers_[0]->server_metric_recorder_->SetEps(40);
servers_[0]->server_metric_recorder_->SetQps(100);
servers_[1]->server_metric_recorder_->SetCpuUtilization(0.2);
servers_[1]->server_metric_recorder_->SetApplicationUtilization(0.2);
servers_[1]->server_metric_recorder_->SetEps(20);
servers_[1]->server_metric_recorder_->SetQps(100);
servers_[2]->server_metric_recorder_->SetCpuUtilization(0.3);
servers_[2]->server_metric_recorder_->SetApplicationUtilization(0.3);
servers_[2]->server_metric_recorder_->SetEps(5);
servers_[2]->server_metric_recorder_->SetQps(200);
// Create channel.

@ -141,30 +141,36 @@ TEST_F(OrcaServiceEnd2endTest, Basic) {
};
// Initial response should not have any values populated.
ReadResponses([](const OrcaLoadReport& response) {
EXPECT_EQ(response.application_utilization(), 0);
EXPECT_EQ(response.cpu_utilization(), 0);
EXPECT_EQ(response.mem_utilization(), 0);
EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre());
});
// Now set CPU utilization on the server.
server_metric_recorder_->SetCpuUtilization(0.5);
// Now set app utilization on the server.
server_metric_recorder_->SetApplicationUtilization(0.5);
ReadResponses([](const OrcaLoadReport& response) {
EXPECT_EQ(response.cpu_utilization(), 0.5);
EXPECT_EQ(response.application_utilization(), 0.5);
EXPECT_EQ(response.cpu_utilization(), 0);
EXPECT_EQ(response.mem_utilization(), 0);
EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre());
});
// Update CPU utilization and set memory utilization.
server_metric_recorder_->SetCpuUtilization(1.8);
// Update app utilization and set CPU and memory utilization.
server_metric_recorder_->SetApplicationUtilization(1.8);
server_metric_recorder_->SetCpuUtilization(0.3);
server_metric_recorder_->SetMemoryUtilization(0.4);
ReadResponses([](const OrcaLoadReport& response) {
EXPECT_EQ(response.cpu_utilization(), 1.8);
EXPECT_EQ(response.application_utilization(), 1.8);
EXPECT_EQ(response.cpu_utilization(), 0.3);
EXPECT_EQ(response.mem_utilization(), 0.4);
EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre());
});
// Unset CPU and memory utilization and set a named utilization.
// Unset app, CPU, and memory utilization and set a named utilization.
server_metric_recorder_->ClearApplicationUtilization();
server_metric_recorder_->ClearCpuUtilization();
server_metric_recorder_->ClearMemoryUtilization();
server_metric_recorder_->SetNamedUtilization(kMetricName1, 0.3);
ReadResponses([&](const OrcaLoadReport& response) {
EXPECT_EQ(response.application_utilization(), 0);
EXPECT_EQ(response.cpu_utilization(), 0);
EXPECT_EQ(response.mem_utilization(), 0);
EXPECT_THAT(
@ -176,6 +182,7 @@ TEST_F(OrcaServiceEnd2endTest, Basic) {
server_metric_recorder_->SetNamedUtilization(kMetricName2, 0.2);
server_metric_recorder_->SetNamedUtilization(kMetricName3, 0.1);
ReadResponses([&](const OrcaLoadReport& response) {
EXPECT_EQ(response.application_utilization(), 0);
EXPECT_EQ(response.cpu_utilization(), 0);
EXPECT_EQ(response.mem_utilization(), 0);
EXPECT_THAT(
@ -187,6 +194,7 @@ TEST_F(OrcaServiceEnd2endTest, Basic) {
server_metric_recorder_->SetAllNamedUtilization(
{{kMetricName2, 0.5}, {kMetricName4, 0.9}});
ReadResponses([&](const OrcaLoadReport& response) {
EXPECT_EQ(response.application_utilization(), 0);
EXPECT_EQ(response.cpu_utilization(), 0);
EXPECT_EQ(response.mem_utilization(), 0);
EXPECT_THAT(

@ -40,27 +40,24 @@ using ::envoy::extensions::load_balancing_policies::
client_side_weighted_round_robin::v3::ClientSideWeightedRoundRobin;
using ::envoy::extensions::load_balancing_policies::wrr_locality::v3::
WrrLocality;
using ::grpc_core::testing::ScopedExperimentalEnvVar;
using WrrTest = XdsEnd2endTest;
INSTANTIATE_TEST_SUITE_P(XdsTest, WrrTest, ::testing::Values(XdsTestType()),
&XdsTestType::Name);
TEST_P(WrrTest, Basic) {
ScopedExperimentalEnvVar env_var2("GRPC_EXPERIMENTAL_XDS_WRR_LB");
CreateAndStartBackends(3);
// Expected weights = qps / (cpu_util + (eps/qps)) =
// Expected weights = qps / (util + (eps/qps)) =
// 1/(0.2+0.2) : 1/(0.3+0.3) : 2/(1.5+0.1) = 6:4:3
backends_[0]->server_metric_recorder()->SetQps(100);
backends_[0]->server_metric_recorder()->SetEps(20);
backends_[0]->server_metric_recorder()->SetCpuUtilization(0.2);
backends_[0]->server_metric_recorder()->SetApplicationUtilization(0.2);
backends_[1]->server_metric_recorder()->SetQps(100);
backends_[1]->server_metric_recorder()->SetEps(30);
backends_[1]->server_metric_recorder()->SetCpuUtilization(0.3);
backends_[1]->server_metric_recorder()->SetApplicationUtilization(0.3);
backends_[2]->server_metric_recorder()->SetQps(200);
backends_[2]->server_metric_recorder()->SetEps(20);
backends_[2]->server_metric_recorder()->SetCpuUtilization(1.5);
backends_[2]->server_metric_recorder()->SetApplicationUtilization(1.5);
auto cluster = default_cluster_;
WrrLocality wrr_locality;
wrr_locality.mutable_endpoint_picking_policy()

2
third_party/xds vendored

@ -1 +1 @@
Subproject commit 4003588d1b747e37e911baa5a9c1c07fde4ca518
Subproject commit e9ce68804cb4e64cab5a52e3c8baf840d4ff87b7

@ -38,7 +38,7 @@ third_party/opencensus-proto 4aa53e15cbf1a47bc9087e6cfdca214c1eea4e89
third_party/opentelemetry 60fa8754d890b5c55949a8c68dcfd7ab5c2395df
third_party/protobuf 2dca62f7296e5b49d729f7384f975cecb38382a0
third_party/re2 0c5616df9c0aaa44c9440d87422012423d91c7d1
third_party/xds 4003588d1b747e37e911baa5a9c1c07fde4ca518
third_party/xds e9ce68804cb4e64cab5a52e3c8baf840d4ff87b7
third_party/zlib 04f42ceca40f73e2978b50e93806c2a18c1281fc
EOF

Loading…
Cancel
Save