[WRR] implement non-per-call metrics (#35977)

As per gRFC A78 (https://github.com/grpc/proposal/pull/419).

Note that these new metrics are populated only if the `wrr_delegate_to_pick_first` experiment is enabled, which is the case by default.

Closes #35977

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35977 from markdroth:non_per_call_metrics_wrr 7acea32fc4
PiperOrigin-RevId: 610760874
pull/35984/head^2
Mark D. Roth 9 months ago committed by Copybara-Service
parent 571da7be78
commit 40577dd585
  1. 1
      BUILD
  2. 8
      CMakeLists.txt
  3. 2
      Makefile
  4. 3
      Package.swift
  5. 20
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 4
      gRPC-C++.podspec
  9. 5
      gRPC-Core.podspec
  10. 3
      grpc.gemspec
  11. 3
      grpc.gyp
  12. 3
      package.xml
  13. 6
      src/core/BUILD
  14. 17
      src/core/client_channel/client_channel_filter.cc
  15. 1
      src/core/client_channel/client_channel_filter.h
  16. 2
      src/core/lib/channel/channel_stack.cc
  17. 5
      src/core/lib/channel/channel_stack.h
  18. 10
      src/core/lib/channel/metrics.cc
  19. 17
      src/core/lib/channel/metrics.h
  20. 7
      src/core/lib/surface/channel.cc
  21. 8
      src/core/load_balancing/delegating_helper.h
  22. 8
      src/core/load_balancing/lb_policy.h
  23. 89
      src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc
  24. 9
      src/core/load_balancing/weighted_target/weighted_target.cc
  25. 28
      src/core/load_balancing/weighted_target/weighted_target.h
  26. 1
      src/python/grpcio/grpc_core_dependencies.py
  27. 1
      test/core/channel/BUILD
  28. 290
      test/core/channel/metrics_test.cc
  29. 2
      test/core/client_channel/lb_policy/BUILD
  30. 12
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  31. 177
      test/core/client_channel/lb_policy/weighted_round_robin_test.cc
  32. 11
      test/core/util/BUILD
  33. 105
      test/core/util/fake_stats_plugin.cc
  34. 318
      test/core/util/fake_stats_plugin.h
  35. 1
      test/cpp/end2end/xds/BUILD
  36. 59
      test/cpp/end2end/xds/xds_wrr_end2end_test.cc
  37. 3
      tools/doxygen/Doxyfile.c++.internal
  38. 3
      tools/doxygen/Doxyfile.core.internal

@ -1817,6 +1817,7 @@ grpc_cc_library(
"//src/core:message",
"//src/core:metadata",
"//src/core:metadata_batch",
"//src/core:metrics",
"//src/core:no_destruct",
"//src/core:per_cpu",
"//src/core:pipe",

8
CMakeLists.txt generated

@ -2210,6 +2210,7 @@ add_library(grpc
src/core/lib/channel/channelz.cc
src/core/lib/channel/channelz_registry.cc
src/core/lib/channel/connected_channel.cc
src/core/lib/channel/metrics.cc
src/core/lib/channel/promise_based_filter.cc
src/core/lib/channel/server_call_tracer_filter.cc
src/core/lib/channel/status_util.cc
@ -2977,6 +2978,7 @@ add_library(grpc_unsecure
src/core/lib/channel/channelz.cc
src/core/lib/channel/channelz_registry.cc
src/core/lib/channel/connected_channel.cc
src/core/lib/channel/metrics.cc
src/core/lib/channel/promise_based_filter.cc
src/core/lib/channel/server_call_tracer_filter.cc
src/core/lib/channel/status_util.cc
@ -5103,6 +5105,7 @@ add_library(grpc_authorization_provider
src/core/lib/channel/channelz.cc
src/core/lib/channel/channelz_registry.cc
src/core/lib/channel/connected_channel.cc
src/core/lib/channel/metrics.cc
src/core/lib/channel/promise_based_filter.cc
src/core/lib/channel/server_call_tracer_filter.cc
src/core/lib/channel/status_util.cc
@ -19563,8 +19566,8 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(metrics_test
src/core/lib/channel/metrics.cc
test/core/channel/metrics_test.cc
test/core/util/fake_stats_plugin.cc
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
@ -31749,6 +31752,7 @@ if(gRPC_BUILD_TESTS)
add_executable(weighted_round_robin_config_test
test/core/client_channel/lb_policy/weighted_round_robin_config_test.cc
test/core/util/fake_stats_plugin.cc
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
@ -31797,6 +31801,7 @@ add_executable(weighted_round_robin_test
test/core/client_channel/lb_policy/weighted_round_robin_test.cc
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
test/core/util/fake_stats_plugin.cc
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
@ -36106,6 +36111,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/wrr_locality.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/wrr_locality.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/wrr_locality.grpc.pb.h
test/core/util/fake_stats_plugin.cc
test/cpp/end2end/test_service_impl.cc
test/cpp/end2end/xds/xds_end2end_test_lib.cc
test/cpp/end2end/xds/xds_server.cc

2
Makefile generated

@ -1391,6 +1391,7 @@ LIBGRPC_SRC = \
src/core/lib/channel/channelz.cc \
src/core/lib/channel/channelz_registry.cc \
src/core/lib/channel/connected_channel.cc \
src/core/lib/channel/metrics.cc \
src/core/lib/channel/promise_based_filter.cc \
src/core/lib/channel/server_call_tracer_filter.cc \
src/core/lib/channel/status_util.cc \
@ -1992,6 +1993,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/channel/channelz.cc \
src/core/lib/channel/channelz_registry.cc \
src/core/lib/channel/connected_channel.cc \
src/core/lib/channel/metrics.cc \
src/core/lib/channel/promise_based_filter.cc \
src/core/lib/channel/server_call_tracer_filter.cc \
src/core/lib/channel/status_util.cc \

3
Package.swift generated

@ -1155,6 +1155,8 @@ let package = Package(
"src/core/lib/channel/connected_channel.cc",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
"src/core/lib/channel/metrics.cc",
"src/core/lib/channel/metrics.h",
"src/core/lib/channel/promise_based_filter.cc",
"src/core/lib/channel/promise_based_filter.h",
"src/core/lib/channel/server_call_tracer_filter.cc",
@ -1890,6 +1892,7 @@ let package = Package(
"src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h",
"src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc",
"src/core/load_balancing/weighted_target/weighted_target.cc",
"src/core/load_balancing/weighted_target/weighted_target.h",
"src/core/load_balancing/xds/cds.cc",
"src/core/load_balancing/xds/xds_channel_args.h",
"src/core/load_balancing/xds/xds_cluster_impl.cc",

@ -823,6 +823,7 @@ libs:
- src/core/lib/channel/channelz_registry.h
- src/core/lib/channel/connected_channel.h
- src/core/lib/channel/context.h
- src/core/lib/channel/metrics.h
- src/core/lib/channel/promise_based_filter.h
- src/core/lib/channel/status_util.h
- src/core/lib/channel/tcp_tracer.h
@ -1177,6 +1178,7 @@ libs:
- src/core/load_balancing/subchannel_interface.h
- src/core/load_balancing/subchannel_list.h
- src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h
- src/core/load_balancing/weighted_target/weighted_target.h
- src/core/load_balancing/xds/xds_channel_args.h
- src/core/load_balancing/xds/xds_override_host.h
- src/core/resolver/dns/c_ares/dns_resolver_ares.h
@ -1665,6 +1667,7 @@ libs:
- src/core/lib/channel/channelz.cc
- src/core/lib/channel/channelz_registry.cc
- src/core/lib/channel/connected_channel.cc
- src/core/lib/channel/metrics.cc
- src/core/lib/channel/promise_based_filter.cc
- src/core/lib/channel/server_call_tracer_filter.cc
- src/core/lib/channel/status_util.cc
@ -2319,6 +2322,7 @@ libs:
- src/core/lib/channel/channelz_registry.h
- src/core/lib/channel/connected_channel.h
- src/core/lib/channel/context.h
- src/core/lib/channel/metrics.h
- src/core/lib/channel/promise_based_filter.h
- src/core/lib/channel/status_util.h
- src/core/lib/channel/tcp_tracer.h
@ -2636,6 +2640,7 @@ libs:
- src/core/load_balancing/subchannel_interface.h
- src/core/load_balancing/subchannel_list.h
- src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h
- src/core/load_balancing/weighted_target/weighted_target.h
- src/core/resolver/dns/c_ares/dns_resolver_ares.h
- src/core/resolver/dns/c_ares/grpc_ares_ev_driver.h
- src/core/resolver/dns/c_ares/grpc_ares_wrapper.h
@ -2784,6 +2789,7 @@ libs:
- src/core/lib/channel/channelz.cc
- src/core/lib/channel/channelz_registry.cc
- src/core/lib/channel/connected_channel.cc
- src/core/lib/channel/metrics.cc
- src/core/lib/channel/promise_based_filter.cc
- src/core/lib/channel/server_call_tracer_filter.cc
- src/core/lib/channel/status_util.cc
@ -4393,6 +4399,7 @@ libs:
- src/core/lib/channel/channelz_registry.h
- src/core/lib/channel/connected_channel.h
- src/core/lib/channel/context.h
- src/core/lib/channel/metrics.h
- src/core/lib/channel/promise_based_filter.h
- src/core/lib/channel/status_util.h
- src/core/lib/channel/tcp_tracer.h
@ -4741,6 +4748,7 @@ libs:
- src/core/lib/channel/channelz.cc
- src/core/lib/channel/channelz_registry.cc
- src/core/lib/channel/connected_channel.cc
- src/core/lib/channel/metrics.cc
- src/core/lib/channel/promise_based_filter.cc
- src/core/lib/channel/server_call_tracer_filter.cc
- src/core/lib/channel/status_util.cc
@ -12714,10 +12722,10 @@ targets:
build: test
language: c++
headers:
- src/core/lib/channel/metrics.h
- test/core/util/fake_stats_plugin.h
src:
- src/core/lib/channel/metrics.cc
- test/core/channel/metrics_test.cc
- test/core/util/fake_stats_plugin.cc
deps:
- gtest
- grpc_test_util
@ -19564,9 +19572,11 @@ targets:
gtest: true
build: test
language: c++
headers: []
headers:
- test/core/util/fake_stats_plugin.h
src:
- test/core/client_channel/lb_policy/weighted_round_robin_config_test.cc
- test/core/util/fake_stats_plugin.cc
deps:
- gtest
- grpc_test_util
@ -19579,11 +19589,13 @@ targets:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/util/fake_stats_plugin.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/client_channel/lb_policy/weighted_round_robin_test.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
- test/core/util/fake_stats_plugin.cc
deps:
- gtest
- protobuf
@ -21224,6 +21236,7 @@ targets:
build: test
language: c++
headers:
- test/core/util/fake_stats_plugin.h
- test/core/util/scoped_env_var.h
- test/cpp/end2end/counted_service.h
- test/cpp/end2end/test_service_impl.h
@ -21265,6 +21278,7 @@ targets:
- src/proto/grpc/testing/xds/v3/router.proto
- src/proto/grpc/testing/xds/v3/string.proto
- src/proto/grpc/testing/xds/v3/wrr_locality.proto
- test/core/util/fake_stats_plugin.cc
- test/cpp/end2end/test_service_impl.cc
- test/cpp/end2end/xds/xds_end2end_test_lib.cc
- test/cpp/end2end/xds/xds_server.cc

1
config.m4 generated

@ -474,6 +474,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/channel/channelz.cc \
src/core/lib/channel/channelz_registry.cc \
src/core/lib/channel/connected_channel.cc \
src/core/lib/channel/metrics.cc \
src/core/lib/channel/promise_based_filter.cc \
src/core/lib/channel/server_call_tracer_filter.cc \
src/core/lib/channel/status_util.cc \

1
config.w32 generated

@ -439,6 +439,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\channel\\channelz.cc " +
"src\\core\\lib\\channel\\channelz_registry.cc " +
"src\\core\\lib\\channel\\connected_channel.cc " +
"src\\core\\lib\\channel\\metrics.cc " +
"src\\core\\lib\\channel\\promise_based_filter.cc " +
"src\\core\\lib\\channel\\server_call_tracer_filter.cc " +
"src\\core\\lib\\channel\\status_util.cc " +

4
gRPC-C++.podspec generated

@ -903,6 +903,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/channelz_registry.h',
'src/core/lib/channel/connected_channel.h',
'src/core/lib/channel/context.h',
'src/core/lib/channel/metrics.h',
'src/core/lib/channel/promise_based_filter.h',
'src/core/lib/channel/status_util.h',
'src/core/lib/channel/tcp_tracer.h',
@ -1282,6 +1283,7 @@ Pod::Spec.new do |s|
'src/core/load_balancing/subchannel_interface.h',
'src/core/load_balancing/subchannel_list.h',
'src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h',
'src/core/load_balancing/weighted_target/weighted_target.h',
'src/core/load_balancing/xds/xds_channel_args.h',
'src/core/load_balancing/xds/xds_override_host.h',
'src/core/resolver/dns/c_ares/dns_resolver_ares.h',
@ -2159,6 +2161,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/channelz_registry.h',
'src/core/lib/channel/connected_channel.h',
'src/core/lib/channel/context.h',
'src/core/lib/channel/metrics.h',
'src/core/lib/channel/promise_based_filter.h',
'src/core/lib/channel/status_util.h',
'src/core/lib/channel/tcp_tracer.h',
@ -2538,6 +2541,7 @@ Pod::Spec.new do |s|
'src/core/load_balancing/subchannel_interface.h',
'src/core/load_balancing/subchannel_list.h',
'src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h',
'src/core/load_balancing/weighted_target/weighted_target.h',
'src/core/load_balancing/xds/xds_channel_args.h',
'src/core/load_balancing/xds/xds_override_host.h',
'src/core/resolver/dns/c_ares/dns_resolver_ares.h',

5
gRPC-Core.podspec generated

@ -1268,6 +1268,8 @@ Pod::Spec.new do |s|
'src/core/lib/channel/connected_channel.cc',
'src/core/lib/channel/connected_channel.h',
'src/core/lib/channel/context.h',
'src/core/lib/channel/metrics.cc',
'src/core/lib/channel/metrics.h',
'src/core/lib/channel/promise_based_filter.cc',
'src/core/lib/channel/promise_based_filter.h',
'src/core/lib/channel/server_call_tracer_filter.cc',
@ -1999,6 +2001,7 @@ Pod::Spec.new do |s|
'src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h',
'src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc',
'src/core/load_balancing/weighted_target/weighted_target.cc',
'src/core/load_balancing/weighted_target/weighted_target.h',
'src/core/load_balancing/xds/cds.cc',
'src/core/load_balancing/xds/xds_channel_args.h',
'src/core/load_balancing/xds/xds_cluster_impl.cc',
@ -2938,6 +2941,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/channelz_registry.h',
'src/core/lib/channel/connected_channel.h',
'src/core/lib/channel/context.h',
'src/core/lib/channel/metrics.h',
'src/core/lib/channel/promise_based_filter.h',
'src/core/lib/channel/status_util.h',
'src/core/lib/channel/tcp_tracer.h',
@ -3317,6 +3321,7 @@ Pod::Spec.new do |s|
'src/core/load_balancing/subchannel_interface.h',
'src/core/load_balancing/subchannel_list.h',
'src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h',
'src/core/load_balancing/weighted_target/weighted_target.h',
'src/core/load_balancing/xds/xds_channel_args.h',
'src/core/load_balancing/xds/xds_override_host.h',
'src/core/resolver/dns/c_ares/dns_resolver_ares.h',

3
grpc.gemspec generated

@ -1161,6 +1161,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/channel/connected_channel.cc )
s.files += %w( src/core/lib/channel/connected_channel.h )
s.files += %w( src/core/lib/channel/context.h )
s.files += %w( src/core/lib/channel/metrics.cc )
s.files += %w( src/core/lib/channel/metrics.h )
s.files += %w( src/core/lib/channel/promise_based_filter.cc )
s.files += %w( src/core/lib/channel/promise_based_filter.h )
s.files += %w( src/core/lib/channel/server_call_tracer_filter.cc )
@ -1892,6 +1894,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h )
s.files += %w( src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc )
s.files += %w( src/core/load_balancing/weighted_target/weighted_target.cc )
s.files += %w( src/core/load_balancing/weighted_target/weighted_target.h )
s.files += %w( src/core/load_balancing/xds/cds.cc )
s.files += %w( src/core/load_balancing/xds/xds_channel_args.h )
s.files += %w( src/core/load_balancing/xds/xds_cluster_impl.cc )

3
grpc.gyp generated

@ -705,6 +705,7 @@
'src/core/lib/channel/channelz.cc',
'src/core/lib/channel/channelz_registry.cc',
'src/core/lib/channel/connected_channel.cc',
'src/core/lib/channel/metrics.cc',
'src/core/lib/channel/promise_based_filter.cc',
'src/core/lib/channel/server_call_tracer_filter.cc',
'src/core/lib/channel/status_util.cc',
@ -1246,6 +1247,7 @@
'src/core/lib/channel/channelz.cc',
'src/core/lib/channel/channelz_registry.cc',
'src/core/lib/channel/connected_channel.cc',
'src/core/lib/channel/metrics.cc',
'src/core/lib/channel/promise_based_filter.cc',
'src/core/lib/channel/server_call_tracer_filter.cc',
'src/core/lib/channel/status_util.cc',
@ -2054,6 +2056,7 @@
'src/core/lib/channel/channelz.cc',
'src/core/lib/channel/channelz_registry.cc',
'src/core/lib/channel/connected_channel.cc',
'src/core/lib/channel/metrics.cc',
'src/core/lib/channel/promise_based_filter.cc',
'src/core/lib/channel/server_call_tracer_filter.cc',
'src/core/lib/channel/status_util.cc',

3
package.xml generated

@ -1143,6 +1143,8 @@
<file baseinstalldir="/" name="src/core/lib/channel/connected_channel.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/connected_channel.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/context.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/metrics.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/metrics.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/promise_based_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/promise_based_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/server_call_tracer_filter.cc" role="src" />
@ -1874,6 +1876,7 @@
<file baseinstalldir="/" name="src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h" role="src" />
<file baseinstalldir="/" name="src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc" role="src" />
<file baseinstalldir="/" name="src/core/load_balancing/weighted_target/weighted_target.cc" role="src" />
<file baseinstalldir="/" name="src/core/load_balancing/weighted_target/weighted_target.h" role="src" />
<file baseinstalldir="/" name="src/core/load_balancing/xds/cds.cc" role="src" />
<file baseinstalldir="/" name="src/core/load_balancing/xds/xds_channel_args.h" role="src" />
<file baseinstalldir="/" name="src/core/load_balancing/xds/xds_cluster_impl.cc" role="src" />

@ -3256,6 +3256,7 @@ grpc_cc_library(
"error",
"grpc_backend_metric_data",
"iomgr_fwd",
"metrics",
"pollset_set",
"ref_counted",
"resolved_address",
@ -5680,6 +5681,7 @@ grpc_cc_library(
"connectivity_state",
"experiments",
"grpc_backend_metric_data",
"grpc_lb_policy_weighted_target",
"grpc_lb_subchannel_list",
"json",
"json_args",
@ -5687,6 +5689,7 @@ grpc_cc_library(
"lb_endpoint_list",
"lb_policy",
"lb_policy_factory",
"metrics",
"ref_counted",
"resolved_address",
"static_stride_scheduler",
@ -5826,6 +5829,9 @@ grpc_cc_library(
srcs = [
"load_balancing/weighted_target/weighted_target.cc",
],
hdrs = [
"load_balancing/weighted_target/weighted_target.h",
],
external_deps = [
"absl/base:core_headers",
"absl/meta:type_traits",

@ -1148,6 +1148,8 @@ class ClientChannelFilter::ClientChannelControlHelper
chand_->resolver_->RequestReresolutionLocked();
}
absl::string_view GetTarget() override { return chand_->target_uri_; }
absl::string_view GetAuthority() override {
return chand_->default_authority_;
}
@ -1166,6 +1168,10 @@ class ClientChannelFilter::ClientChannelControlHelper
return chand_->owning_stack_->EventEngine();
}
GlobalStatsPluginRegistry::StatsPluginGroup& GetStatsPluginGroup() override {
return *chand_->owning_stack_->stats_plugin_group;
}
void AddTraceEvent(TraceSeverity severity, absl::string_view message) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
if (chand_->resolver_ == nullptr) return; // Shutting down.
@ -1270,18 +1276,19 @@ ClientChannelFilter::ClientChannelFilter(grpc_channel_element_args* args,
}
default_service_config_ = std::move(*service_config);
// Get URI to resolve, using proxy mapper if needed.
absl::optional<std::string> server_uri =
absl::optional<std::string> target_uri =
channel_args_.GetOwnedString(GRPC_ARG_SERVER_URI);
if (!server_uri.has_value()) {
if (!target_uri.has_value()) {
*error = GRPC_ERROR_CREATE(
"target URI channel arg missing or wrong type in client channel "
"filter");
return;
}
target_uri_ = std::move(*target_uri);
uri_to_resolve_ = CoreConfiguration::Get()
.proxy_mapper_registry()
.MapName(*server_uri, &channel_args_)
.value_or(*server_uri);
.MapName(target_uri_, &channel_args_)
.value_or(target_uri_);
// Make sure the URI to resolve is valid, so that we know that
// resolver creation will succeed later.
if (!CoreConfiguration::Get().resolver_registry().IsValidTarget(
@ -1306,7 +1313,7 @@ ClientChannelFilter::ClientChannelFilter(grpc_channel_element_args* args,
if (!default_authority.has_value()) {
default_authority_ =
CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
*server_uri);
target_uri_);
} else {
default_authority_ = std::move(*default_authority);
}

@ -295,6 +295,7 @@ class ClientChannelFilter {
grpc_channel_stack* owning_stack_;
ClientChannelFactory* client_channel_factory_;
RefCountedPtr<ServiceConfig> default_service_config_;
std::string target_uri_;
std::string uri_to_resolve_;
std::string default_authority_;
channelz::ChannelNode* channelz_node_;

@ -129,6 +129,7 @@ grpc_error_handle grpc_channel_stack_init(
stack->on_destroy.Init([]() {});
stack->event_engine.Init(channel_args.GetObjectRef<EventEngine>());
stack->stats_plugin_group.Init();
size_t call_size =
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
@ -188,6 +189,7 @@ void grpc_channel_stack_destroy(grpc_channel_stack* stack) {
(*stack->on_destroy)();
stack->on_destroy.Destroy();
stack->event_engine.Destroy();
stack->stats_plugin_group.Destroy();
}
grpc_error_handle grpc_call_stack_init(

@ -61,6 +61,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/metrics.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gprpp/manual_constructor.h"
@ -219,6 +220,10 @@ struct grpc_channel_stack {
return event_engine->get();
}
grpc_core::ManualConstructor<
grpc_core::GlobalStatsPluginRegistry::StatsPluginGroup>
stats_plugin_group;
// Minimal infrastructure to act like a RefCounted thing without converting
// everything.
// It's likely that we'll want to replace grpc_channel_stack with something

@ -21,18 +21,17 @@
#include "src/core/lib/gprpp/crash.h"
namespace grpc_core {
namespace {
// Uses the Construct-on-First-Use idiom to avoid the static initialization
// order fiasco.
absl::flat_hash_map<absl::string_view,
GlobalInstrumentsRegistry::GlobalInstrumentDescriptor>&
GetInstrumentList() {
GlobalInstrumentsRegistry::GetInstrumentList() {
static NoDestruct<absl::flat_hash_map<
absl::string_view, GlobalInstrumentsRegistry::GlobalInstrumentDescriptor>>
instruments;
return *instruments;
}
} // namespace
GlobalInstrumentsRegistry::GlobalUInt64CounterHandle
GlobalInstrumentsRegistry::RegisterUInt64Counter(
@ -157,11 +156,6 @@ void GlobalInstrumentsRegistry::ForEach(
}
}
void GlobalInstrumentsRegistry::TestOnlyResetGlobalInstrumentsRegistry() {
auto& instruments = GetInstrumentList();
instruments.clear();
}
NoDestruct<Mutex> GlobalStatsPluginRegistry::mutex_;
NoDestruct<std::vector<std::shared_ptr<StatsPlugin>>>
GlobalStatsPluginRegistry::plugins_;

@ -21,6 +21,7 @@
#include <memory>
#include <vector>
#include "absl/container/flat_hash_map.h"
#include "absl/functional/function_ref.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
@ -33,6 +34,8 @@
namespace grpc_core {
constexpr absl::string_view kMetricLabelTarget = "grpc.target";
// A global registry of instruments(metrics). This API is designed to be used to
// register instruments (Counter and Histogram) as part of program startup,
// before the execution of the main function (during dynamic initialization
@ -97,9 +100,14 @@ class GlobalInstrumentsRegistry {
static void ForEach(
absl::FunctionRef<void(const GlobalInstrumentDescriptor&)> f);
static void TestOnlyResetGlobalInstrumentsRegistry();
private:
friend class GlobalInstrumentsRegistryTestPeer;
GlobalInstrumentsRegistry() = delete;
static absl::flat_hash_map<
absl::string_view, GlobalInstrumentsRegistry::GlobalInstrumentDescriptor>&
GetInstrumentList();
};
// The StatsPlugin interface.
@ -213,12 +221,9 @@ class GlobalStatsPluginRegistry {
// TODO(yijiem): Implement this.
// StatsPluginsGroup GetStatsPluginsForServer(ChannelArgs& args);
static void TestOnlyResetGlobalStatsPluginRegistry() {
MutexLock lock(&*mutex_);
plugins_->clear();
}
private:
friend class GlobalStatsPluginRegistryTestPeer;
GlobalStatsPluginRegistry() = default;
static NoDestruct<Mutex> mutex_;

@ -42,6 +42,7 @@
#include "src/core/lib/channel/channel_stack_builder_impl.h"
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/metrics.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
@ -160,6 +161,12 @@ absl::StatusOr<RefCountedPtr<Channel>> Channel::CreateWithBuilder(
*enabled_algorithms_bitset | 1 /* always support no compression */;
}
// TODO(roth): Populate authority after merging
// https://github.com/grpc/grpc/pull/35924.
StatsPlugin::ChannelScope scope(builder->target(), "");
*(*r)->stats_plugin_group =
GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope);
return RefCountedPtr<Channel>(new Channel(
grpc_channel_stack_type_is_client(builder->channel_stack_type()),
builder->IsPromising(), std::string(builder->target()), channel_args,

@ -58,6 +58,10 @@ class LoadBalancingPolicy::DelegatingChannelControlHelper
parent_helper()->RequestReresolution();
}
absl::string_view GetTarget() override {
return parent_helper()->GetTarget();
}
absl::string_view GetAuthority() override {
return parent_helper()->GetAuthority();
}
@ -75,6 +79,10 @@ class LoadBalancingPolicy::DelegatingChannelControlHelper
return parent_helper()->GetEventEngine();
}
GlobalStatsPluginRegistry::StatsPluginGroup& GetStatsPluginGroup() override {
return parent_helper()->GetStatsPluginGroup();
}
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override {
parent_helper()->AddTraceEvent(severity, message);

@ -40,6 +40,7 @@
#include "src/core/load_balancing/backend_metric_data.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/metrics.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
@ -299,6 +300,9 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Requests that the resolver re-resolve.
virtual void RequestReresolution() = 0;
/// Returns the channel target.
virtual absl::string_view GetTarget() = 0;
/// Returns the channel authority.
virtual absl::string_view GetAuthority() = 0;
@ -319,6 +323,10 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Returns the EventEngine to use for timers and async work.
virtual grpc_event_engine::experimental::EventEngine* GetEventEngine() = 0;
/// Returns the stats plugin group for reporting metrics.
virtual GlobalStatsPluginRegistry::StatsPluginGroup&
GetStatsPluginGroup() = 0;
/// Adds a trace message associated with the channel.
enum TraceSeverity { TRACE_INFO, TRACE_WARNING, TRACE_ERROR };
virtual void AddTraceEvent(TraceSeverity severity,

@ -51,8 +51,10 @@
#include "src/core/load_balancing/oob_backend_metric.h"
#include "src/core/load_balancing/subchannel_list.h"
#include "src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h"
#include "src/core/load_balancing/weighted_target/weighted_target.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/metrics.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
@ -86,6 +88,41 @@ namespace {
constexpr absl::string_view kWeightedRoundRobin = "weighted_round_robin";
constexpr absl::string_view kMetricLabelLocality = "grpc.lb.locality";
const auto kMetricRrFallback =
GlobalInstrumentsRegistry::RegisterUInt64Counter(
"grpc.lb.wrr.rr_fallback",
"EXPERIMENTAL. Number of scheduler updates in which there were not "
"enough endpoints with valid weight, which caused the WRR policy to "
"fall back to RR behavior.",
"{update}", {kMetricLabelTarget}, {kMetricLabelLocality}, false);
const auto kMetricEndpointWeightNotYetUsable =
GlobalInstrumentsRegistry::RegisterUInt64Counter(
"grpc.lb.wrr.endpoint_weight_not_yet_usable",
"EXPERIMENTAL. Number of endpoints from each scheduler update that "
"don't yet have usable weight information (i.e., either the load "
"report has not yet been received, or it is within the blackout "
"period).",
"{endpoint}", {kMetricLabelTarget}, {kMetricLabelLocality}, false);
const auto kMetricEndpointWeightStale =
GlobalInstrumentsRegistry::RegisterUInt64Counter(
"grpc.lb.wrr.endpoint_weight_stale",
"EXPERIMENTAL. Number of endpoints from each scheduler update whose "
"latest weight is older than the expiration period.",
"{endpoint}", {kMetricLabelTarget}, {kMetricLabelLocality}, false);
const auto kMetricEndpointWeights =
GlobalInstrumentsRegistry::RegisterDoubleHistogram(
"grpc.lb.wrr.endpoint_weights",
"EXPERIMENTAL. The histogram buckets will be endpoint weight ranges. "
"Each bucket will be a counter that is incremented once for every "
"endpoint whose weight is within that range. Note that endpoints "
"without usable weights will have weight 0.",
"{weight}", {kMetricLabelTarget}, {kMetricLabelLocality}, false);
// Config for WRR policy.
class WeightedRoundRobinConfig : public LoadBalancingPolicy::Config {
public:
@ -1021,7 +1058,8 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
float error_utilization_penalty);
float GetWeight(Timestamp now, Duration weight_expiration_period,
Duration blackout_period);
Duration blackout_period, uint64_t* num_not_yet_usable,
uint64_t* num_stale);
void ResetNonEmptySince();
@ -1215,6 +1253,8 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
std::map<EndpointAddressSet, EndpointWeight*> endpoint_weight_map_
ABSL_GUARDED_BY(&endpoint_weight_map_mu_);
const absl::string_view locality_name_;
bool shutdown_ = false;
absl::BitGen bit_gen_;
@ -1276,8 +1316,8 @@ void WeightedRoundRobin::EndpointWeight::MaybeUpdateWeight(
}
float WeightedRoundRobin::EndpointWeight::GetWeight(
Timestamp now, Duration weight_expiration_period,
Duration blackout_period) {
Timestamp now, Duration weight_expiration_period, Duration blackout_period,
uint64_t* num_not_yet_usable, uint64_t* num_stale) {
MutexLock lock(&mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO,
@ -1294,12 +1334,14 @@ float WeightedRoundRobin::EndpointWeight::GetWeight(
// period, reset non_empty_since_ so that we apply the blackout period
// again if we start getting data again in the future, and return 0.
if (now - last_update_time_ >= weight_expiration_period) {
++*num_stale;
non_empty_since_ = Timestamp::InfFuture();
return 0;
}
// If we don't have at least blackout_period worth of data, return 0.
if (blackout_period > Duration::Zero() &&
now - non_empty_since_ < blackout_period) {
++*num_not_yet_usable;
return 0;
}
// Otherwise, return the weight.
@ -1418,14 +1460,28 @@ size_t WeightedRoundRobin::Picker::PickIndex() {
}
void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
// Build scheduler.
auto& stats_plugins = wrr_->channel_control_helper()->GetStatsPluginGroup();
// Build scheduler, reporting metrics on endpoint weights.
const Timestamp now = Timestamp::Now();
std::vector<float> weights;
weights.reserve(endpoints_.size());
uint64_t num_not_yet_usable = 0;
uint64_t num_stale = 0;
for (const auto& endpoint : endpoints_) {
weights.push_back(endpoint.weight->GetWeight(
now, config_->weight_expiration_period(), config_->blackout_period()));
}
float weight = endpoint.weight->GetWeight(
now, config_->weight_expiration_period(), config_->blackout_period(),
&num_not_yet_usable, &num_stale);
weights.push_back(weight);
stats_plugins.RecordHistogram(
kMetricEndpointWeights, weight,
{wrr_->channel_control_helper()->GetTarget()}, {wrr_->locality_name_});
}
stats_plugins.AddCounter(
kMetricEndpointWeightNotYetUsable, num_not_yet_usable,
{wrr_->channel_control_helper()->GetTarget()}, {wrr_->locality_name_});
stats_plugins.AddCounter(
kMetricEndpointWeightStale, num_stale,
{wrr_->channel_control_helper()->GetTarget()}, {wrr_->locality_name_});
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p picker %p] new weights: %s", wrr_.get(), this,
absl::StrJoin(weights, " ").c_str());
@ -1440,9 +1496,14 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
gpr_log(GPR_INFO, "[WRR %p picker %p] new scheduler: %p", wrr_.get(),
this, scheduler.get());
}
} else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p picker %p] no scheduler, falling back to RR",
wrr_.get(), this);
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p picker %p] no scheduler, falling back to RR",
wrr_.get(), this);
}
stats_plugins.AddCounter(
kMetricRrFallback, 1, {wrr_->channel_control_helper()->GetTarget()},
{wrr_->locality_name_});
}
{
MutexLock lock(&scheduler_mu_);
@ -1483,9 +1544,13 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
//
WeightedRoundRobin::WeightedRoundRobin(Args args)
: LoadBalancingPolicy(std::move(args)) {
: LoadBalancingPolicy(std::move(args)),
locality_name_(channel_args()
.GetString(GRPC_ARG_LB_WEIGHTED_TARGET_CHILD)
.value_or("")) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] Created", this);
gpr_log(GPR_INFO, "[WRR %p] Created -- locality_name=\"%s\"", this,
std::string(locality_name_).c_str());
}
}

@ -16,6 +16,8 @@
#include <grpc/support/port_platform.h>
#include "src/core/load_balancing/weighted_target/weighted_target.h"
#include <string.h>
#include <algorithm>
@ -160,7 +162,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {
absl::Status UpdateLocked(
const WeightedTargetLbConfig::ChildConfig& config,
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
const std::string& resolution_note, const ChannelArgs& args);
const std::string& resolution_note, ChannelArgs args);
void ResetBackoffLocked();
void DeactivateLocked();
@ -592,7 +594,7 @@ WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
absl::Status WeightedTargetLb::WeightedChild::UpdateLocked(
const WeightedTargetLbConfig::ChildConfig& config,
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
const std::string& resolution_note, const ChannelArgs& args) {
const std::string& resolution_note, ChannelArgs args) {
if (weighted_target_policy_->shutting_down_) return absl::OkStatus();
// Update child weight.
if (weight_ != config.weight &&
@ -611,6 +613,7 @@ absl::Status WeightedTargetLb::WeightedChild::UpdateLocked(
delayed_removal_timer_.reset();
}
// Create child policy if needed.
args = args.Set(GRPC_ARG_LB_WEIGHTED_TARGET_CHILD, name_);
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args);
}
@ -619,7 +622,7 @@ absl::Status WeightedTargetLb::WeightedChild::UpdateLocked(
update_args.config = config.config;
update_args.addresses = std::move(addresses);
update_args.resolution_note = resolution_note;
update_args.args = args;
update_args.args = std::move(args);
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
gpr_log(GPR_INFO,

@ -0,0 +1,28 @@
//
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_SRC_CORE_LOAD_BALANCING_WEIGHTED_TARGET_WEIGHTED_TARGET_H
#define GRPC_SRC_CORE_LOAD_BALANCING_WEIGHTED_TARGET_WEIGHTED_TARGET_H
#include <grpc/support/port_platform.h>
#include "src/core/resolver/endpoint_addresses.h"
// Channel arg key indicating the weighted_target child name.
#define GRPC_ARG_LB_WEIGHTED_TARGET_CHILD \
GRPC_ARG_NO_SUBCHANNEL_PREFIX "lb_weighted_target_child"
#endif // GRPC_SRC_CORE_LOAD_BALANCING_WEIGHTED_TARGET_WEIGHTED_TARGET_H

@ -448,6 +448,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/channel/channelz.cc',
'src/core/lib/channel/channelz_registry.cc',
'src/core/lib/channel/connected_channel.cc',
'src/core/lib/channel/metrics.cc',
'src/core/lib/channel/promise_based_filter.cc',
'src/core/lib/channel/server_call_tracer_filter.cc',
'src/core/lib/channel/status_util.cc',

@ -204,6 +204,7 @@ grpc_cc_test(
uses_polling = False,
deps = [
"//src/core:metrics",
"//test/core/util:fake_stats_plugin",
"//test/core/util:grpc_test_util",
],
)

@ -23,299 +23,17 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "test/core/util/fake_stats_plugin.h"
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace {
void AddKeyValuePairs(absl::Span<const absl::string_view> keys,
absl::Span<const absl::string_view> values,
std::vector<std::string>* key_value_pairs) {
GPR_ASSERT(keys.size() == values.size());
for (size_t i = 0; i < keys.size(); ++i) {
key_value_pairs->push_back(absl::StrCat(keys[i], "=", values[i]));
}
}
std::string MakeLabelString(
absl::Span<const absl::string_view> label_keys,
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_label_keys,
absl::Span<const absl::string_view> optional_values) {
std::vector<std::string> key_value_pairs;
AddKeyValuePairs(label_keys, label_values, &key_value_pairs);
AddKeyValuePairs(optional_label_keys, optional_values, &key_value_pairs);
return absl::StrJoin(key_value_pairs, ",");
}
// TODO(yijiem): Move this to test/core/util/fake_stats_plugin.h
class FakeStatsPlugin : public StatsPlugin {
public:
bool IsEnabledForChannel(
const StatsPlugin::ChannelScope& scope) const override {
return channel_filter_(scope);
}
bool IsEnabledForServer(const ChannelArgs& /*args*/) const override {
return false;
}
void AddCounter(
GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle,
uint64_t value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) override {
// The problem with this approach is that we initialize uint64_counters_ in
// BuildAndRegister by querying the GlobalInstrumentsRegistry at the time.
// If the GlobalInstrumentsRegistry has changed since then (which we
// currently don't allow), we might not have seen that descriptor nor have
// we created an instrument for it. We probably could copy the existing
// instruments at build time and for the handle that we haven't seen we will
// just ignore it here. This would also prevent us from having to lock the
// GlobalInstrumentsRegistry everytime a metric is recorded. But this is not
// a concern for now.
auto iter = uint64_counters_.find(handle.index);
if (iter == uint64_counters_.end()) {
return;
}
iter->second.Add(value, label_values, optional_values);
}
void AddCounter(
GlobalInstrumentsRegistry::GlobalDoubleCounterHandle handle, double value,
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) override {
auto iter = double_counters_.find(handle.index);
if (iter == double_counters_.end()) {
return;
}
iter->second.Add(value, label_values, optional_values);
}
void RecordHistogram(
GlobalInstrumentsRegistry::GlobalUInt64HistogramHandle handle,
uint64_t value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) override {
auto iter = uint64_histograms_.find(handle.index);
if (iter == uint64_histograms_.end()) {
return;
}
iter->second.Record(value, label_values, optional_values);
}
void RecordHistogram(
GlobalInstrumentsRegistry::GlobalDoubleHistogramHandle handle,
double value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) override {
auto iter = double_histograms_.find(handle.index);
if (iter == double_histograms_.end()) {
return;
}
iter->second.Record(value, label_values, optional_values);
}
absl::optional<uint64_t> GetCounterValue(
GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle,
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
auto iter = uint64_counters_.find(handle.index);
if (iter == uint64_counters_.end()) {
return absl::nullopt;
}
return iter->second.GetValue(label_values, optional_values);
}
absl::optional<double> GetCounterValue(
GlobalInstrumentsRegistry::GlobalDoubleCounterHandle handle,
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
auto iter = double_counters_.find(handle.index);
if (iter == double_counters_.end()) {
return absl::nullopt;
}
return iter->second.GetValue(label_values, optional_values);
}
absl::optional<std::vector<uint64_t>> GetHistogramValue(
GlobalInstrumentsRegistry::GlobalUInt64HistogramHandle handle,
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
auto iter = uint64_histograms_.find(handle.index);
if (iter == uint64_histograms_.end()) {
return absl::nullopt;
}
return iter->second.GetValues(label_values, optional_values);
}
absl::optional<std::vector<double>> GetHistogramValue(
GlobalInstrumentsRegistry::GlobalDoubleHistogramHandle handle,
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
auto iter = double_histograms_.find(handle.index);
if (iter == double_histograms_.end()) {
return absl::nullopt;
}
return iter->second.GetValues(label_values, optional_values);
}
private:
friend class FakeStatsPluginBuilder;
explicit FakeStatsPlugin(
absl::AnyInvocable<bool(const StatsPlugin::ChannelScope& /*scope*/) const>
channel_filter)
: channel_filter_(std::move(channel_filter)) {
GlobalInstrumentsRegistry::ForEach(
[this](const GlobalInstrumentsRegistry::GlobalInstrumentDescriptor&
descriptor) {
if (!descriptor.enable_by_default) {
return;
}
if (descriptor.instrument_type ==
GlobalInstrumentsRegistry::InstrumentType::kCounter) {
if (descriptor.value_type ==
GlobalInstrumentsRegistry::ValueType::kUInt64) {
uint64_counters_.emplace(descriptor.index, descriptor);
} else {
double_counters_.emplace(descriptor.index, descriptor);
}
} else {
EXPECT_EQ(descriptor.instrument_type,
GlobalInstrumentsRegistry::InstrumentType::kHistogram);
if (descriptor.value_type ==
GlobalInstrumentsRegistry::ValueType::kUInt64) {
uint64_histograms_.emplace(descriptor.index, descriptor);
} else {
double_histograms_.emplace(descriptor.index, descriptor);
}
}
});
}
template <class T>
class Counter {
public:
explicit Counter(GlobalInstrumentsRegistry::GlobalInstrumentDescriptor u)
: name_(u.name),
description_(u.description),
unit_(u.unit),
label_keys_(std::move(u.label_keys)),
optional_label_keys_(std::move(u.optional_label_keys)) {}
void Add(T t, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
auto iter = storage_.find(MakeLabelString(
label_keys_, label_values, optional_label_keys_, optional_values));
if (iter != storage_.end()) {
iter->second += t;
} else {
storage_[MakeLabelString(label_keys_, label_values,
optional_label_keys_, optional_values)] = t;
}
}
absl::optional<T> GetValue(
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
auto iter = storage_.find(MakeLabelString(
label_keys_, label_values, optional_label_keys_, optional_values));
if (iter == storage_.end()) {
return absl::nullopt;
}
return iter->second;
}
private:
absl::string_view name_;
absl::string_view description_;
absl::string_view unit_;
std::vector<absl::string_view> label_keys_;
std::vector<absl::string_view> optional_label_keys_;
// Aggregation of the same key attributes.
absl::flat_hash_map<std::string, T> storage_;
};
template <class T>
class Histogram {
public:
explicit Histogram(GlobalInstrumentsRegistry::GlobalInstrumentDescriptor u)
: name_(u.name),
description_(u.description),
unit_(u.unit),
label_keys_(std::move(u.label_keys)),
optional_label_keys_(std::move(u.optional_label_keys)) {}
void Record(T t, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
std::string key = MakeLabelString(label_keys_, label_values,
optional_label_keys_, optional_values);
auto iter = storage_.find(key);
if (iter == storage_.end()) {
storage_.emplace(key, std::initializer_list<T>{t});
} else {
iter->second.push_back(t);
}
}
absl::optional<std::vector<T>> GetValues(
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
auto iter = storage_.find(MakeLabelString(
label_keys_, label_values, optional_label_keys_, optional_values));
if (iter == storage_.end()) {
return absl::nullopt;
}
return iter->second;
}
private:
absl::string_view name_;
absl::string_view description_;
absl::string_view unit_;
std::vector<absl::string_view> label_keys_;
std::vector<absl::string_view> optional_label_keys_;
absl::flat_hash_map<std::string, std::vector<T>> storage_;
};
absl::AnyInvocable<bool(const StatsPlugin::ChannelScope& /*scope*/) const>
channel_filter_;
// Instruments.
absl::flat_hash_map<uint32_t, Counter<uint64_t>> uint64_counters_;
absl::flat_hash_map<uint32_t, Counter<double>> double_counters_;
absl::flat_hash_map<uint32_t, Histogram<uint64_t>> uint64_histograms_;
absl::flat_hash_map<uint32_t, Histogram<double>> double_histograms_;
};
// TODO(yijiem): Move this to test/core/util/fake_stats_plugin.h
class FakeStatsPluginBuilder {
public:
FakeStatsPluginBuilder& SetChannelFilter(
absl::AnyInvocable<bool(const StatsPlugin::ChannelScope& /*scope*/) const>
channel_filter) {
channel_filter_ = std::move(channel_filter);
return *this;
}
std::shared_ptr<FakeStatsPlugin> BuildAndRegister() {
auto f = std::shared_ptr<FakeStatsPlugin>(
new FakeStatsPlugin(std::move(channel_filter_)));
GlobalStatsPluginRegistry::RegisterStatsPlugin(f);
return f;
}
private:
absl::AnyInvocable<bool(const StatsPlugin::ChannelScope& /*scope*/) const>
channel_filter_;
};
std::shared_ptr<FakeStatsPlugin> MakeStatsPluginForTarget(
absl::string_view target_suffix) {
return FakeStatsPluginBuilder()
.SetChannelFilter(
[target_suffix](const StatsPlugin::ChannelScope& scope) {
return absl::EndsWith(scope.target(), target_suffix);
})
.BuildAndRegister();
}
class MetricsTest : public testing::Test {
class MetricsTest : public ::testing::Test {
public:
void TearDown() override {
GlobalInstrumentsRegistry::TestOnlyResetGlobalInstrumentsRegistry();
GlobalStatsPluginRegistry::TestOnlyResetGlobalStatsPluginRegistry();
GlobalInstrumentsRegistryTestPeer::ResetGlobalInstrumentsRegistry();
GlobalStatsPluginRegistryTestPeer::ResetGlobalStatsPluginRegistry();
}
};

@ -219,6 +219,7 @@ grpc_cc_test(
uses_polling = False,
deps = [
"//:grpc",
"//test/core/util:fake_stats_plugin",
"//test/core/util:grpc_test_util",
],
)
@ -235,6 +236,7 @@ grpc_cc_test(
deps = [
":lb_policy_test_lib",
"//src/core:grpc_lb_policy_weighted_round_robin",
"//test/core/util:fake_stats_plugin",
"//test/core/util:grpc_test_util",
],
)

@ -587,7 +587,9 @@ class LoadBalancingPolicyTest : public ::testing::Test {
queue_.push_back(ReresolutionRequested());
}
absl::string_view GetAuthority() override { return "server.example.com"; }
absl::string_view GetTarget() override { return test_->target_; }
absl::string_view GetAuthority() override { return test_->authority_; }
RefCountedPtr<grpc_channel_credentials> GetChannelCredentials() override {
return nullptr;
@ -602,6 +604,11 @@ class LoadBalancingPolicyTest : public ::testing::Test {
return test_->fuzzing_ee_.get();
}
GlobalStatsPluginRegistry::StatsPluginGroup& GetStatsPluginGroup()
override {
return test_->stats_plugin_group_;
}
void AddTraceEvent(TraceSeverity, absl::string_view) override {}
LoadBalancingPolicyTest* test_;
@ -1494,6 +1501,9 @@ class LoadBalancingPolicyTest : public ::testing::Test {
OrphanablePtr<LoadBalancingPolicy> lb_policy_;
const absl::string_view lb_policy_name_;
const ChannelArgs channel_args_;
GlobalStatsPluginRegistry::StatsPluginGroup stats_plugin_group_;
std::string target_ = "dns:server.example.com";
std::string authority_ = "server.example.com";
};
} // namespace testing

@ -49,14 +49,18 @@
#include "src/core/lib/json/json_writer.h"
#include "src/core/load_balancing/backend_metric_data.h"
#include "src/core/load_balancing/lb_policy.h"
#include "src/core/load_balancing/weighted_target/weighted_target.h"
#include "src/core/resolver/endpoint_addresses.h"
#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
#include "test/core/util/fake_stats_plugin.h"
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace testing {
namespace {
constexpr absl::string_view kLocalityName = "locality0";
class WeightedRoundRobinTest : public LoadBalancingPolicyTest {
protected:
class ConfigBuilder {
@ -103,7 +107,11 @@ class WeightedRoundRobinTest : public LoadBalancingPolicyTest {
Json::Object json_;
};
WeightedRoundRobinTest() : LoadBalancingPolicyTest("weighted_round_robin") {}
WeightedRoundRobinTest()
: LoadBalancingPolicyTest(
"weighted_round_robin",
ChannelArgs().Set(GRPC_ARG_LB_WEIGHTED_TARGET_CHILD,
kLocalityName)) {}
void SetUp() override {
LoadBalancingPolicyTest::SetUp();
@ -989,6 +997,173 @@ TEST_F(WeightedRoundRobinTest, MultipleAddressesPerEndpoint) {
EXPECT_FALSE(subchannel3_1->ConnectionRequested());
}
TEST_F(WeightedRoundRobinTest, MetricDefinitionRrFallback) {
const auto* descriptor =
GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
"grpc.lb.wrr.rr_fallback");
ASSERT_NE(descriptor, nullptr);
EXPECT_EQ(descriptor->value_type,
GlobalInstrumentsRegistry::ValueType::kUInt64);
EXPECT_EQ(descriptor->instrument_type,
GlobalInstrumentsRegistry::InstrumentType::kCounter);
EXPECT_EQ(descriptor->enable_by_default, false);
EXPECT_EQ(descriptor->name, "grpc.lb.wrr.rr_fallback");
EXPECT_EQ(descriptor->unit, "{update}");
EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
EXPECT_THAT(descriptor->optional_label_keys,
::testing::ElementsAre("grpc.lb.locality"));
}
TEST_F(WeightedRoundRobinTest, MetricDefinitionEndpointWeightNotYetUsable) {
const auto* descriptor =
GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
"grpc.lb.wrr.endpoint_weight_not_yet_usable");
ASSERT_NE(descriptor, nullptr);
EXPECT_EQ(descriptor->value_type,
GlobalInstrumentsRegistry::ValueType::kUInt64);
EXPECT_EQ(descriptor->instrument_type,
GlobalInstrumentsRegistry::InstrumentType::kCounter);
EXPECT_EQ(descriptor->enable_by_default, false);
EXPECT_EQ(descriptor->name, "grpc.lb.wrr.endpoint_weight_not_yet_usable");
EXPECT_EQ(descriptor->unit, "{endpoint}");
EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
EXPECT_THAT(descriptor->optional_label_keys,
::testing::ElementsAre("grpc.lb.locality"));
}
TEST_F(WeightedRoundRobinTest, MetricDefinitionEndpointWeightStale) {
const auto* descriptor =
GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
"grpc.lb.wrr.endpoint_weight_stale");
ASSERT_NE(descriptor, nullptr);
EXPECT_EQ(descriptor->value_type,
GlobalInstrumentsRegistry::ValueType::kUInt64);
EXPECT_EQ(descriptor->instrument_type,
GlobalInstrumentsRegistry::InstrumentType::kCounter);
EXPECT_EQ(descriptor->enable_by_default, false);
EXPECT_EQ(descriptor->name, "grpc.lb.wrr.endpoint_weight_stale");
EXPECT_EQ(descriptor->unit, "{endpoint}");
EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
EXPECT_THAT(descriptor->optional_label_keys,
::testing::ElementsAre("grpc.lb.locality"));
}
TEST_F(WeightedRoundRobinTest, MetricDefinitionEndpointWeights) {
const auto* descriptor =
GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
"grpc.lb.wrr.endpoint_weights");
ASSERT_NE(descriptor, nullptr);
EXPECT_EQ(descriptor->value_type,
GlobalInstrumentsRegistry::ValueType::kDouble);
EXPECT_EQ(descriptor->instrument_type,
GlobalInstrumentsRegistry::InstrumentType::kHistogram);
EXPECT_EQ(descriptor->enable_by_default, false);
EXPECT_EQ(descriptor->name, "grpc.lb.wrr.endpoint_weights");
EXPECT_EQ(descriptor->unit, "{weight}");
EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target"));
EXPECT_THAT(descriptor->optional_label_keys,
::testing::ElementsAre("grpc.lb.locality"));
}
TEST_F(WeightedRoundRobinTest, MetricValues) {
if (!IsWrrDelegateToPickFirstEnabled()) return;
const auto kRrFallback =
GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
"grpc.lb.wrr.rr_fallback")
.value();
const auto kEndpointWeightNotYetUsable =
GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
"grpc.lb.wrr.endpoint_weight_not_yet_usable")
.value();
const auto kEndpointWeightStale =
GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
"grpc.lb.wrr.endpoint_weight_stale")
.value();
const auto kEndpointWeights =
GlobalInstrumentsRegistryTestPeer::FindDoubleHistogramHandleByName(
"grpc.lb.wrr.endpoint_weights")
.value();
const absl::string_view kLabelValues[] = {target_};
const absl::string_view kOptionalLabelValues[] = {kLocalityName};
auto stats_plugin = std::make_shared<FakeStatsPlugin>(
nullptr, /*use_disabled_by_default_metrics=*/true);
stats_plugin_group_.push_back(stats_plugin);
// Send address list to LB policy.
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
auto picker = SendInitialUpdateAndWaitForConnected(
kAddresses,
ConfigBuilder().SetWeightExpirationPeriod(Duration::Seconds(2)));
ASSERT_NE(picker, nullptr);
// Address 0 gets weight 1, address 1 gets weight 3.
// No utilization report from backend 2, so it gets the average weight 2.
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}});
// Now have backend 2 report utilization the same as backend 1, so its
// weight will be the same.
WaitForWeightedRoundRobinPicks(
&picker,
{{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)},
{kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3,
/*qps=*/100.0, /*eps=*/0.0)}},
{{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// Check endpoint weights.
EXPECT_THAT(stats_plugin->GetHistogramValue(kEndpointWeights, kLabelValues,
kOptionalLabelValues),
::testing::Optional(::testing::ElementsAre(
// Picker created for first endpoint becoming READY.
0,
// Picker update for second endpoint CONNECTING.
0,
// Picker update for second endpoint READY.
0, 0,
// Picker update for third endpoint CONNECTING.
0, 0,
// Picker update for third endpoint READY.
0, 0, 0,
// Weights for first two endpoints now start getting used.
::testing::DoubleNear(111.111115, 0.000001),
::testing::DoubleNear(333.333344, 0.000001), 0,
// Weights for all endpoints are now used.
::testing::DoubleNear(111.111115, 0.000001),
::testing::DoubleNear(333.333344, 0.000001),
::testing::DoubleNear(333.333344, 0.000001))));
// RR fallback should trigger for the first 5 updates above, because
// there are less than two endpoints with valid weights.
EXPECT_THAT(stats_plugin->GetCounterValue(kRrFallback, kLabelValues,
kOptionalLabelValues),
::testing::Optional(5));
// Endpoint-not-yet-usable will be incremented once for every endpoint
// with weight 0 above.
EXPECT_THAT(stats_plugin->GetCounterValue(kEndpointWeightNotYetUsable,
kLabelValues, kOptionalLabelValues),
::testing::Optional(10));
// There are no stale endpoint weights so far.
EXPECT_THAT(stats_plugin->GetCounterValue(kEndpointWeightStale, kLabelValues,
kOptionalLabelValues),
::testing::Optional(0));
// Advance time to make weights stale and trigger the timer callback
// to recompute weights.
gpr_log(GPR_INFO, "advancing time to trigger staleness...");
IncrementTimeBy(Duration::Seconds(2));
// Picker should now be falling back to round-robin.
ExpectWeightedRoundRobinPicks(
picker.get(), {},
{{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 3}});
// All three endpoints should now have stale weights.
EXPECT_THAT(stats_plugin->GetCounterValue(kEndpointWeightStale, kLabelValues,
kOptionalLabelValues),
::testing::Optional(3));
}
} // namespace
} // namespace testing
} // namespace grpc_core

@ -494,10 +494,21 @@ grpc_cc_library(
grpc_cc_library(
name = "fake_stats_plugin",
testonly = True,
srcs = ["fake_stats_plugin.cc"],
hdrs = ["fake_stats_plugin.h"],
external_deps = [
"absl/container:flat_hash_map",
"absl/functional:any_invocable",
"absl/status",
"absl/strings",
"absl/types:optional",
"absl/types:span",
"gtest",
],
deps = [
"//:grpc",
"//src/core:examine_stack",
"//src/core:metrics",
],
)

@ -79,4 +79,109 @@ void RegisterFakeStatsPlugin() {
});
}
namespace {
void AddKeyValuePairs(absl::Span<const absl::string_view> keys,
absl::Span<const absl::string_view> values,
std::vector<std::string>* key_value_pairs) {
GPR_ASSERT(keys.size() == values.size());
for (size_t i = 0; i < keys.size(); ++i) {
key_value_pairs->push_back(absl::StrCat(keys[i], "=", values[i]));
}
}
} // namespace
std::string MakeLabelString(
absl::Span<const absl::string_view> label_keys,
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_label_keys,
absl::Span<const absl::string_view> optional_values) {
std::vector<std::string> key_value_pairs;
AddKeyValuePairs(label_keys, label_values, &key_value_pairs);
AddKeyValuePairs(optional_label_keys, optional_values, &key_value_pairs);
return absl::StrJoin(key_value_pairs, ",");
}
std::shared_ptr<FakeStatsPlugin> MakeStatsPluginForTarget(
absl::string_view target_suffix) {
return FakeStatsPluginBuilder()
.SetChannelFilter(
[target_suffix](const StatsPlugin::ChannelScope& scope) {
return absl::EndsWith(scope.target(), target_suffix);
})
.BuildAndRegister();
}
void GlobalInstrumentsRegistryTestPeer::ResetGlobalInstrumentsRegistry() {
auto& instruments = GlobalInstrumentsRegistry::GetInstrumentList();
instruments.clear();
}
namespace {
template <typename HandleType>
absl::optional<HandleType> FindInstrument(
const absl::flat_hash_map<
absl::string_view,
GlobalInstrumentsRegistry::GlobalInstrumentDescriptor>& instruments,
absl::string_view name, GlobalInstrumentsRegistry::ValueType value_type,
GlobalInstrumentsRegistry::InstrumentType instrument_type) {
auto it = instruments.find(name);
if (it != instruments.end() && it->second.value_type == value_type &&
it->second.instrument_type == instrument_type) {
HandleType handle;
handle.index = it->second.index;
return handle;
}
return absl::nullopt;
}
} // namespace
absl::optional<GlobalInstrumentsRegistry::GlobalUInt64CounterHandle>
GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName(
absl::string_view name) {
return FindInstrument<GlobalInstrumentsRegistry::GlobalUInt64CounterHandle>(
GlobalInstrumentsRegistry::GetInstrumentList(), name,
GlobalInstrumentsRegistry::ValueType::kUInt64,
GlobalInstrumentsRegistry::InstrumentType::kCounter);
}
absl::optional<GlobalInstrumentsRegistry::GlobalDoubleCounterHandle>
GlobalInstrumentsRegistryTestPeer::FindDoubleCounterHandleByName(
absl::string_view name) {
return FindInstrument<GlobalInstrumentsRegistry::GlobalDoubleCounterHandle>(
GlobalInstrumentsRegistry::GetInstrumentList(), name,
GlobalInstrumentsRegistry::ValueType::kDouble,
GlobalInstrumentsRegistry::InstrumentType::kCounter);
}
absl::optional<GlobalInstrumentsRegistry::GlobalUInt64HistogramHandle>
GlobalInstrumentsRegistryTestPeer::FindUInt64HistogramHandleByName(
absl::string_view name) {
return FindInstrument<GlobalInstrumentsRegistry::GlobalUInt64HistogramHandle>(
GlobalInstrumentsRegistry::GetInstrumentList(), name,
GlobalInstrumentsRegistry::ValueType::kUInt64,
GlobalInstrumentsRegistry::InstrumentType::kHistogram);
}
absl::optional<GlobalInstrumentsRegistry::GlobalDoubleHistogramHandle>
GlobalInstrumentsRegistryTestPeer::FindDoubleHistogramHandleByName(
absl::string_view name) {
return FindInstrument<GlobalInstrumentsRegistry::GlobalDoubleHistogramHandle>(
GlobalInstrumentsRegistry::GetInstrumentList(), name,
GlobalInstrumentsRegistry::ValueType::kDouble,
GlobalInstrumentsRegistry::InstrumentType::kHistogram);
}
GlobalInstrumentsRegistry::GlobalInstrumentDescriptor*
GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName(
absl::string_view name) {
auto& instruments = GlobalInstrumentsRegistry::GetInstrumentList();
auto it = instruments.find(name);
if (it != instruments.end()) return &it->second;
return nullptr;
}
} // namespace grpc_core

@ -19,7 +19,16 @@
#include <string>
#include <vector>
#include "absl/container/flat_hash_map.h"
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "gmock/gmock.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/metrics.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/channel/tcp_tracer.h"
@ -187,6 +196,315 @@ class FakeServerCallTracer : public ServerCallTracer {
std::vector<std::string>* annotation_logger_;
};
std::string MakeLabelString(
absl::Span<const absl::string_view> label_keys,
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_label_keys,
absl::Span<const absl::string_view> optional_values);
class FakeStatsPlugin : public StatsPlugin {
public:
explicit FakeStatsPlugin(
absl::AnyInvocable<bool(const ChannelScope& /*scope*/) const>
channel_filter = nullptr,
bool use_disabled_by_default_metrics = false)
: channel_filter_(std::move(channel_filter)) {
GlobalInstrumentsRegistry::ForEach(
[&](const GlobalInstrumentsRegistry::GlobalInstrumentDescriptor&
descriptor) {
if (!use_disabled_by_default_metrics &&
!descriptor.enable_by_default) {
gpr_log(GPR_INFO,
"FakeStatsPlugin[%p]: skipping disabled metric: %s", this,
std::string(descriptor.name).c_str());
return;
}
if (descriptor.instrument_type ==
GlobalInstrumentsRegistry::InstrumentType::kCounter) {
if (descriptor.value_type ==
GlobalInstrumentsRegistry::ValueType::kUInt64) {
uint64_counters_.emplace(descriptor.index, descriptor);
} else {
double_counters_.emplace(descriptor.index, descriptor);
}
} else {
EXPECT_EQ(descriptor.instrument_type,
GlobalInstrumentsRegistry::InstrumentType::kHistogram);
if (descriptor.value_type ==
GlobalInstrumentsRegistry::ValueType::kUInt64) {
uint64_histograms_.emplace(descriptor.index, descriptor);
} else {
double_histograms_.emplace(descriptor.index, descriptor);
}
}
});
}
bool IsEnabledForChannel(const ChannelScope& scope) const override {
if (channel_filter_ == nullptr) return true;
return channel_filter_(scope);
}
bool IsEnabledForServer(const ChannelArgs& /*args*/) const override {
return false;
}
void AddCounter(
GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle,
uint64_t value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) override {
// The problem with this approach is that we initialize uint64_counters_ in
// BuildAndRegister by querying the GlobalInstrumentsRegistry at the time.
// If the GlobalInstrumentsRegistry has changed since then (which we
// currently don't allow), we might not have seen that descriptor nor have
// we created an instrument for it. We probably could copy the existing
// instruments at build time and for the handle that we haven't seen we will
// just ignore it here. This would also prevent us from having to lock the
// GlobalInstrumentsRegistry everytime a metric is recorded. But this is not
// a concern for now.
gpr_log(GPR_INFO,
"FakeStatsPlugin[%p]::AddCounter(index=%u, value=(uint64)%lu, "
"label_values={%s}, optional_label_values={%s}",
this, handle.index, value,
absl::StrJoin(label_values, ", ").c_str(),
absl::StrJoin(optional_values, ", ").c_str());
auto iter = uint64_counters_.find(handle.index);
if (iter == uint64_counters_.end()) return;
iter->second.Add(value, label_values, optional_values);
}
void AddCounter(
GlobalInstrumentsRegistry::GlobalDoubleCounterHandle handle, double value,
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) override {
gpr_log(GPR_INFO,
"FakeStatsPlugin[%p]::AddCounter(index=%u, value(double)=%f, "
"label_values={%s}, optional_label_values={%s}",
this, handle.index, value,
absl::StrJoin(label_values, ", ").c_str(),
absl::StrJoin(optional_values, ", ").c_str());
auto iter = double_counters_.find(handle.index);
if (iter == double_counters_.end()) return;
iter->second.Add(value, label_values, optional_values);
}
void RecordHistogram(
GlobalInstrumentsRegistry::GlobalUInt64HistogramHandle handle,
uint64_t value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) override {
gpr_log(GPR_INFO,
"FakeStatsPlugin[%p]::RecordHistogram(index=%u, value=(uint64)%lu, "
"label_values={%s}, optional_label_values={%s}",
this, handle.index, value,
absl::StrJoin(label_values, ", ").c_str(),
absl::StrJoin(optional_values, ", ").c_str());
auto iter = uint64_histograms_.find(handle.index);
if (iter == uint64_histograms_.end()) return;
iter->second.Record(value, label_values, optional_values);
}
void RecordHistogram(
GlobalInstrumentsRegistry::GlobalDoubleHistogramHandle handle,
double value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) override {
gpr_log(GPR_INFO,
"FakeStatsPlugin[%p]::RecordHistogram(index=%u, value=(double)%f, "
"label_values={%s}, optional_label_values={%s}",
this, handle.index, value,
absl::StrJoin(label_values, ", ").c_str(),
absl::StrJoin(optional_values, ", ").c_str());
auto iter = double_histograms_.find(handle.index);
if (iter == double_histograms_.end()) return;
iter->second.Record(value, label_values, optional_values);
}
absl::optional<uint64_t> GetCounterValue(
GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle,
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
auto iter = uint64_counters_.find(handle.index);
if (iter == uint64_counters_.end()) {
return absl::nullopt;
}
return iter->second.GetValue(label_values, optional_values);
}
absl::optional<double> GetCounterValue(
GlobalInstrumentsRegistry::GlobalDoubleCounterHandle handle,
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
auto iter = double_counters_.find(handle.index);
if (iter == double_counters_.end()) {
return absl::nullopt;
}
return iter->second.GetValue(label_values, optional_values);
}
absl::optional<std::vector<uint64_t>> GetHistogramValue(
GlobalInstrumentsRegistry::GlobalUInt64HistogramHandle handle,
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
auto iter = uint64_histograms_.find(handle.index);
if (iter == uint64_histograms_.end()) {
return absl::nullopt;
}
return iter->second.GetValues(label_values, optional_values);
}
absl::optional<std::vector<double>> GetHistogramValue(
GlobalInstrumentsRegistry::GlobalDoubleHistogramHandle handle,
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
auto iter = double_histograms_.find(handle.index);
if (iter == double_histograms_.end()) {
return absl::nullopt;
}
return iter->second.GetValues(label_values, optional_values);
}
private:
template <class T>
class Counter {
public:
explicit Counter(GlobalInstrumentsRegistry::GlobalInstrumentDescriptor u)
: name_(u.name),
description_(u.description),
unit_(u.unit),
label_keys_(std::move(u.label_keys)),
optional_label_keys_(std::move(u.optional_label_keys)) {}
void Add(T t, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
auto iter = storage_.find(MakeLabelString(
label_keys_, label_values, optional_label_keys_, optional_values));
if (iter != storage_.end()) {
iter->second += t;
} else {
storage_[MakeLabelString(label_keys_, label_values,
optional_label_keys_, optional_values)] = t;
}
}
absl::optional<T> GetValue(
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
auto iter = storage_.find(MakeLabelString(
label_keys_, label_values, optional_label_keys_, optional_values));
if (iter == storage_.end()) {
return absl::nullopt;
}
return iter->second;
}
private:
absl::string_view name_;
absl::string_view description_;
absl::string_view unit_;
std::vector<absl::string_view> label_keys_;
std::vector<absl::string_view> optional_label_keys_;
// Aggregation of the same key attributes.
absl::flat_hash_map<std::string, T> storage_;
};
template <class T>
class Histogram {
public:
explicit Histogram(GlobalInstrumentsRegistry::GlobalInstrumentDescriptor u)
: name_(u.name),
description_(u.description),
unit_(u.unit),
label_keys_(std::move(u.label_keys)),
optional_label_keys_(std::move(u.optional_label_keys)) {}
void Record(T t, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
std::string key = MakeLabelString(label_keys_, label_values,
optional_label_keys_, optional_values);
auto iter = storage_.find(key);
if (iter == storage_.end()) {
storage_.emplace(key, std::initializer_list<T>{t});
} else {
iter->second.push_back(t);
}
}
absl::optional<std::vector<T>> GetValues(
absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
auto iter = storage_.find(MakeLabelString(
label_keys_, label_values, optional_label_keys_, optional_values));
if (iter == storage_.end()) {
return absl::nullopt;
}
return iter->second;
}
private:
absl::string_view name_;
absl::string_view description_;
absl::string_view unit_;
std::vector<absl::string_view> label_keys_;
std::vector<absl::string_view> optional_label_keys_;
absl::flat_hash_map<std::string, std::vector<T>> storage_;
};
absl::AnyInvocable<bool(const ChannelScope& /*scope*/) const> channel_filter_;
// Instruments.
absl::flat_hash_map<uint32_t, Counter<uint64_t>> uint64_counters_;
absl::flat_hash_map<uint32_t, Counter<double>> double_counters_;
absl::flat_hash_map<uint32_t, Histogram<uint64_t>> uint64_histograms_;
absl::flat_hash_map<uint32_t, Histogram<double>> double_histograms_;
};
class FakeStatsPluginBuilder {
public:
FakeStatsPluginBuilder& SetChannelFilter(
absl::AnyInvocable<bool(const StatsPlugin::ChannelScope& /*scope*/) const>
channel_filter) {
channel_filter_ = std::move(channel_filter);
return *this;
}
FakeStatsPluginBuilder& UseDisabledByDefaultMetrics(bool value) {
use_disabled_by_default_metrics_ = value;
return *this;
}
std::shared_ptr<FakeStatsPlugin> BuildAndRegister() {
auto f = std::make_shared<FakeStatsPlugin>(
std::move(channel_filter_), use_disabled_by_default_metrics_);
GlobalStatsPluginRegistry::RegisterStatsPlugin(f);
return f;
}
private:
absl::AnyInvocable<bool(const StatsPlugin::ChannelScope& /*scope*/) const>
channel_filter_;
bool use_disabled_by_default_metrics_ = false;
};
std::shared_ptr<FakeStatsPlugin> MakeStatsPluginForTarget(
absl::string_view target_suffix);
class GlobalInstrumentsRegistryTestPeer {
public:
static void ResetGlobalInstrumentsRegistry();
static absl::optional<GlobalInstrumentsRegistry::GlobalUInt64CounterHandle>
FindUInt64CounterHandleByName(absl::string_view name);
static absl::optional<GlobalInstrumentsRegistry::GlobalDoubleCounterHandle>
FindDoubleCounterHandleByName(absl::string_view name);
static absl::optional<GlobalInstrumentsRegistry::GlobalUInt64HistogramHandle>
FindUInt64HistogramHandleByName(absl::string_view name);
static absl::optional<GlobalInstrumentsRegistry::GlobalDoubleHistogramHandle>
FindDoubleHistogramHandleByName(absl::string_view name);
static GlobalInstrumentsRegistry::GlobalInstrumentDescriptor*
FindMetricDescriptorByName(absl::string_view name);
};
class GlobalStatsPluginRegistryTestPeer {
public:
static void ResetGlobalStatsPluginRegistry() {
MutexLock lock(&*GlobalStatsPluginRegistry::mutex_);
GlobalStatsPluginRegistry::plugins_->clear();
}
};
} // namespace grpc_core
#endif // GRPC_TEST_CORE_UTIL_FAKE_STATS_PLUGIN_H

@ -329,6 +329,7 @@ grpc_cc_test(
"//src/proto/grpc/testing/xds/v3:client_side_weighted_round_robin_proto",
"//src/proto/grpc/testing/xds/v3:cluster_proto",
"//src/proto/grpc/testing/xds/v3:wrr_locality_proto",
"//test/core/util:fake_stats_plugin",
"//test/core/util:grpc_test_util",
"//test/core/util:scoped_env_var",
],

@ -27,8 +27,10 @@
#include "src/core/client_channel/backup_poller.h"
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/proto/grpc/testing/xds/v3/client_side_weighted_round_robin.grpc.pb.h"
#include "src/proto/grpc/testing/xds/v3/wrr_locality.grpc.pb.h"
#include "test/core/util/fake_stats_plugin.h"
#include "test/core/util/scoped_env_var.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
@ -40,12 +42,25 @@ using ::envoy::extensions::load_balancing_policies::
client_side_weighted_round_robin::v3::ClientSideWeightedRoundRobin;
using ::envoy::extensions::load_balancing_policies::wrr_locality::v3::
WrrLocality;
using WrrTest = XdsEnd2endTest;
class WrrTest : public XdsEnd2endTest {
protected:
void SetUp() override {
// No-op -- tests must explicitly call InitClient().
}
static std::string LocalityNameString(absl::string_view sub_zone) {
return absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}",
kDefaultLocalityRegion, kDefaultLocalityZone,
sub_zone);
}
};
INSTANTIATE_TEST_SUITE_P(XdsTest, WrrTest, ::testing::Values(XdsTestType()),
&XdsTestType::Name);
TEST_P(WrrTest, Basic) {
InitClient();
CreateAndStartBackends(3);
// Expected weights = qps / (util + (eps/qps)) =
// 1/(0.2+0.2) : 1/(0.3+0.3) : 2/(1.5+0.1) = 6:4:3
@ -92,6 +107,48 @@ TEST_P(WrrTest, Basic) {
});
}
TEST_P(WrrTest, MetricsHaveLocalityLabel) {
if (!grpc_core::IsWrrDelegateToPickFirstEnabled()) return;
const auto kEndpointWeights =
grpc_core::GlobalInstrumentsRegistryTestPeer::
FindDoubleHistogramHandleByName("grpc.lb.wrr.endpoint_weights")
.value();
const std::string target = absl::StrCat("xds:", kServerName);
const absl::string_view kLabelValues[] = {/*target=*/target};
// Register stats plugin before initializing client.
auto stats_plugin = grpc_core::FakeStatsPluginBuilder()
.UseDisabledByDefaultMetrics(true)
.BuildAndRegister();
InitClient();
CreateAndStartBackends(2);
auto cluster = default_cluster_;
WrrLocality wrr_locality;
wrr_locality.mutable_endpoint_picking_policy()
->add_policies()
->mutable_typed_extension_config()
->mutable_typed_config()
->PackFrom(ClientSideWeightedRoundRobin());
cluster.mutable_load_balancing_policy()
->add_policies()
->mutable_typed_extension_config()
->mutable_typed_config()
->PackFrom(wrr_locality);
balancer_->ads_service()->SetCdsResource(cluster);
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)},
{"locality1", CreateEndpointsForBackends(1, 2)}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
WaitForAllBackends(DEBUG_LOCATION);
// Make sure we have a metric value for each of the two localities.
EXPECT_THAT(
stats_plugin->GetHistogramValue(kEndpointWeights, kLabelValues,
{LocalityNameString("locality0")}),
::testing::Optional(::testing::Not(::testing::IsEmpty())));
EXPECT_THAT(
stats_plugin->GetHistogramValue(kEndpointWeights, kLabelValues,
{LocalityNameString("locality1")}),
::testing::Optional(::testing::Not(::testing::IsEmpty())));
}
} // namespace
} // namespace testing
} // namespace grpc

@ -2160,6 +2160,8 @@ src/core/lib/channel/channelz_registry.h \
src/core/lib/channel/connected_channel.cc \
src/core/lib/channel/connected_channel.h \
src/core/lib/channel/context.h \
src/core/lib/channel/metrics.cc \
src/core/lib/channel/metrics.h \
src/core/lib/channel/promise_based_filter.cc \
src/core/lib/channel/promise_based_filter.h \
src/core/lib/channel/server_call_tracer_filter.cc \
@ -2891,6 +2893,7 @@ src/core/load_balancing/weighted_round_robin/static_stride_scheduler.cc \
src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h \
src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc \
src/core/load_balancing/weighted_target/weighted_target.cc \
src/core/load_balancing/weighted_target/weighted_target.h \
src/core/load_balancing/xds/cds.cc \
src/core/load_balancing/xds/xds_channel_args.h \
src/core/load_balancing/xds/xds_cluster_impl.cc \

@ -1932,6 +1932,8 @@ src/core/lib/channel/channelz_registry.h \
src/core/lib/channel/connected_channel.cc \
src/core/lib/channel/connected_channel.h \
src/core/lib/channel/context.h \
src/core/lib/channel/metrics.cc \
src/core/lib/channel/metrics.h \
src/core/lib/channel/promise_based_filter.cc \
src/core/lib/channel/promise_based_filter.h \
src/core/lib/channel/server_call_tracer_filter.cc \
@ -2668,6 +2670,7 @@ src/core/load_balancing/weighted_round_robin/static_stride_scheduler.cc \
src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h \
src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc \
src/core/load_balancing/weighted_target/weighted_target.cc \
src/core/load_balancing/weighted_target/weighted_target.h \
src/core/load_balancing/xds/cds.cc \
src/core/load_balancing/xds/xds_channel_args.h \
src/core/load_balancing/xds/xds_cluster_impl.cc \

Loading…
Cancel
Save