diff --git a/BUILD b/BUILD index af9e76174d4..ad1796a488e 100644 --- a/BUILD +++ b/BUILD @@ -1817,6 +1817,7 @@ grpc_cc_library( "//src/core:message", "//src/core:metadata", "//src/core:metadata_batch", + "//src/core:metrics", "//src/core:no_destruct", "//src/core:per_cpu", "//src/core:pipe", diff --git a/CMakeLists.txt b/CMakeLists.txt index 6be079f93a5..366897b002c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2210,6 +2210,7 @@ add_library(grpc src/core/lib/channel/channelz.cc src/core/lib/channel/channelz_registry.cc src/core/lib/channel/connected_channel.cc + src/core/lib/channel/metrics.cc src/core/lib/channel/promise_based_filter.cc src/core/lib/channel/server_call_tracer_filter.cc src/core/lib/channel/status_util.cc @@ -2977,6 +2978,7 @@ add_library(grpc_unsecure src/core/lib/channel/channelz.cc src/core/lib/channel/channelz_registry.cc src/core/lib/channel/connected_channel.cc + src/core/lib/channel/metrics.cc src/core/lib/channel/promise_based_filter.cc src/core/lib/channel/server_call_tracer_filter.cc src/core/lib/channel/status_util.cc @@ -5103,6 +5105,7 @@ add_library(grpc_authorization_provider src/core/lib/channel/channelz.cc src/core/lib/channel/channelz_registry.cc src/core/lib/channel/connected_channel.cc + src/core/lib/channel/metrics.cc src/core/lib/channel/promise_based_filter.cc src/core/lib/channel/server_call_tracer_filter.cc src/core/lib/channel/status_util.cc @@ -19563,8 +19566,8 @@ endif() if(gRPC_BUILD_TESTS) add_executable(metrics_test - src/core/lib/channel/metrics.cc test/core/channel/metrics_test.cc + test/core/util/fake_stats_plugin.cc ) if(WIN32 AND MSVC) if(BUILD_SHARED_LIBS) @@ -31749,6 +31752,7 @@ if(gRPC_BUILD_TESTS) add_executable(weighted_round_robin_config_test test/core/client_channel/lb_policy/weighted_round_robin_config_test.cc + test/core/util/fake_stats_plugin.cc ) if(WIN32 AND MSVC) if(BUILD_SHARED_LIBS) @@ -31797,6 +31801,7 @@ add_executable(weighted_round_robin_test test/core/client_channel/lb_policy/weighted_round_robin_test.cc test/core/event_engine/event_engine_test_utils.cc test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc + test/core/util/fake_stats_plugin.cc ) if(WIN32 AND MSVC) if(BUILD_SHARED_LIBS) @@ -36106,6 +36111,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/wrr_locality.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/wrr_locality.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/wrr_locality.grpc.pb.h + test/core/util/fake_stats_plugin.cc test/cpp/end2end/test_service_impl.cc test/cpp/end2end/xds/xds_end2end_test_lib.cc test/cpp/end2end/xds/xds_server.cc diff --git a/Makefile b/Makefile index b4dbdfea859..4f4090053a3 100644 --- a/Makefile +++ b/Makefile @@ -1391,6 +1391,7 @@ LIBGRPC_SRC = \ src/core/lib/channel/channelz.cc \ src/core/lib/channel/channelz_registry.cc \ src/core/lib/channel/connected_channel.cc \ + src/core/lib/channel/metrics.cc \ src/core/lib/channel/promise_based_filter.cc \ src/core/lib/channel/server_call_tracer_filter.cc \ src/core/lib/channel/status_util.cc \ @@ -1992,6 +1993,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/channel/channelz.cc \ src/core/lib/channel/channelz_registry.cc \ src/core/lib/channel/connected_channel.cc \ + src/core/lib/channel/metrics.cc \ src/core/lib/channel/promise_based_filter.cc \ src/core/lib/channel/server_call_tracer_filter.cc \ src/core/lib/channel/status_util.cc \ diff --git a/Package.swift b/Package.swift index 6156daefbb9..62c2fce19ec 100644 --- a/Package.swift +++ b/Package.swift @@ -1155,6 +1155,8 @@ let package = Package( "src/core/lib/channel/connected_channel.cc", "src/core/lib/channel/connected_channel.h", "src/core/lib/channel/context.h", + "src/core/lib/channel/metrics.cc", + "src/core/lib/channel/metrics.h", "src/core/lib/channel/promise_based_filter.cc", "src/core/lib/channel/promise_based_filter.h", "src/core/lib/channel/server_call_tracer_filter.cc", @@ -1890,6 +1892,7 @@ let package = Package( "src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h", "src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc", "src/core/load_balancing/weighted_target/weighted_target.cc", + "src/core/load_balancing/weighted_target/weighted_target.h", "src/core/load_balancing/xds/cds.cc", "src/core/load_balancing/xds/xds_channel_args.h", "src/core/load_balancing/xds/xds_cluster_impl.cc", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index cd50e12b948..445e5a15bc0 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -823,6 +823,7 @@ libs: - src/core/lib/channel/channelz_registry.h - src/core/lib/channel/connected_channel.h - src/core/lib/channel/context.h + - src/core/lib/channel/metrics.h - src/core/lib/channel/promise_based_filter.h - src/core/lib/channel/status_util.h - src/core/lib/channel/tcp_tracer.h @@ -1177,6 +1178,7 @@ libs: - src/core/load_balancing/subchannel_interface.h - src/core/load_balancing/subchannel_list.h - src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h + - src/core/load_balancing/weighted_target/weighted_target.h - src/core/load_balancing/xds/xds_channel_args.h - src/core/load_balancing/xds/xds_override_host.h - src/core/resolver/dns/c_ares/dns_resolver_ares.h @@ -1665,6 +1667,7 @@ libs: - src/core/lib/channel/channelz.cc - src/core/lib/channel/channelz_registry.cc - src/core/lib/channel/connected_channel.cc + - src/core/lib/channel/metrics.cc - src/core/lib/channel/promise_based_filter.cc - src/core/lib/channel/server_call_tracer_filter.cc - src/core/lib/channel/status_util.cc @@ -2319,6 +2322,7 @@ libs: - src/core/lib/channel/channelz_registry.h - src/core/lib/channel/connected_channel.h - src/core/lib/channel/context.h + - src/core/lib/channel/metrics.h - src/core/lib/channel/promise_based_filter.h - src/core/lib/channel/status_util.h - src/core/lib/channel/tcp_tracer.h @@ -2636,6 +2640,7 @@ libs: - src/core/load_balancing/subchannel_interface.h - src/core/load_balancing/subchannel_list.h - src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h + - src/core/load_balancing/weighted_target/weighted_target.h - src/core/resolver/dns/c_ares/dns_resolver_ares.h - src/core/resolver/dns/c_ares/grpc_ares_ev_driver.h - src/core/resolver/dns/c_ares/grpc_ares_wrapper.h @@ -2784,6 +2789,7 @@ libs: - src/core/lib/channel/channelz.cc - src/core/lib/channel/channelz_registry.cc - src/core/lib/channel/connected_channel.cc + - src/core/lib/channel/metrics.cc - src/core/lib/channel/promise_based_filter.cc - src/core/lib/channel/server_call_tracer_filter.cc - src/core/lib/channel/status_util.cc @@ -4393,6 +4399,7 @@ libs: - src/core/lib/channel/channelz_registry.h - src/core/lib/channel/connected_channel.h - src/core/lib/channel/context.h + - src/core/lib/channel/metrics.h - src/core/lib/channel/promise_based_filter.h - src/core/lib/channel/status_util.h - src/core/lib/channel/tcp_tracer.h @@ -4741,6 +4748,7 @@ libs: - src/core/lib/channel/channelz.cc - src/core/lib/channel/channelz_registry.cc - src/core/lib/channel/connected_channel.cc + - src/core/lib/channel/metrics.cc - src/core/lib/channel/promise_based_filter.cc - src/core/lib/channel/server_call_tracer_filter.cc - src/core/lib/channel/status_util.cc @@ -12714,10 +12722,10 @@ targets: build: test language: c++ headers: - - src/core/lib/channel/metrics.h + - test/core/util/fake_stats_plugin.h src: - - src/core/lib/channel/metrics.cc - test/core/channel/metrics_test.cc + - test/core/util/fake_stats_plugin.cc deps: - gtest - grpc_test_util @@ -19564,9 +19572,11 @@ targets: gtest: true build: test language: c++ - headers: [] + headers: + - test/core/util/fake_stats_plugin.h src: - test/core/client_channel/lb_policy/weighted_round_robin_config_test.cc + - test/core/util/fake_stats_plugin.cc deps: - gtest - grpc_test_util @@ -19579,11 +19589,13 @@ targets: - test/core/client_channel/lb_policy/lb_policy_test_lib.h - test/core/event_engine/event_engine_test_utils.h - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h + - test/core/util/fake_stats_plugin.h src: - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto - test/core/client_channel/lb_policy/weighted_round_robin_test.cc - test/core/event_engine/event_engine_test_utils.cc - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc + - test/core/util/fake_stats_plugin.cc deps: - gtest - protobuf @@ -21224,6 +21236,7 @@ targets: build: test language: c++ headers: + - test/core/util/fake_stats_plugin.h - test/core/util/scoped_env_var.h - test/cpp/end2end/counted_service.h - test/cpp/end2end/test_service_impl.h @@ -21265,6 +21278,7 @@ targets: - src/proto/grpc/testing/xds/v3/router.proto - src/proto/grpc/testing/xds/v3/string.proto - src/proto/grpc/testing/xds/v3/wrr_locality.proto + - test/core/util/fake_stats_plugin.cc - test/cpp/end2end/test_service_impl.cc - test/cpp/end2end/xds/xds_end2end_test_lib.cc - test/cpp/end2end/xds/xds_server.cc diff --git a/config.m4 b/config.m4 index c823a980b8f..a766c912446 100644 --- a/config.m4 +++ b/config.m4 @@ -474,6 +474,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/channel/channelz.cc \ src/core/lib/channel/channelz_registry.cc \ src/core/lib/channel/connected_channel.cc \ + src/core/lib/channel/metrics.cc \ src/core/lib/channel/promise_based_filter.cc \ src/core/lib/channel/server_call_tracer_filter.cc \ src/core/lib/channel/status_util.cc \ diff --git a/config.w32 b/config.w32 index 2275f0669b5..47e5ac16eba 100644 --- a/config.w32 +++ b/config.w32 @@ -439,6 +439,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\channel\\channelz.cc " + "src\\core\\lib\\channel\\channelz_registry.cc " + "src\\core\\lib\\channel\\connected_channel.cc " + + "src\\core\\lib\\channel\\metrics.cc " + "src\\core\\lib\\channel\\promise_based_filter.cc " + "src\\core\\lib\\channel\\server_call_tracer_filter.cc " + "src\\core\\lib\\channel\\status_util.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 939670687e5..32e7dd024d0 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -903,6 +903,7 @@ Pod::Spec.new do |s| 'src/core/lib/channel/channelz_registry.h', 'src/core/lib/channel/connected_channel.h', 'src/core/lib/channel/context.h', + 'src/core/lib/channel/metrics.h', 'src/core/lib/channel/promise_based_filter.h', 'src/core/lib/channel/status_util.h', 'src/core/lib/channel/tcp_tracer.h', @@ -1282,6 +1283,7 @@ Pod::Spec.new do |s| 'src/core/load_balancing/subchannel_interface.h', 'src/core/load_balancing/subchannel_list.h', 'src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h', + 'src/core/load_balancing/weighted_target/weighted_target.h', 'src/core/load_balancing/xds/xds_channel_args.h', 'src/core/load_balancing/xds/xds_override_host.h', 'src/core/resolver/dns/c_ares/dns_resolver_ares.h', @@ -2159,6 +2161,7 @@ Pod::Spec.new do |s| 'src/core/lib/channel/channelz_registry.h', 'src/core/lib/channel/connected_channel.h', 'src/core/lib/channel/context.h', + 'src/core/lib/channel/metrics.h', 'src/core/lib/channel/promise_based_filter.h', 'src/core/lib/channel/status_util.h', 'src/core/lib/channel/tcp_tracer.h', @@ -2538,6 +2541,7 @@ Pod::Spec.new do |s| 'src/core/load_balancing/subchannel_interface.h', 'src/core/load_balancing/subchannel_list.h', 'src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h', + 'src/core/load_balancing/weighted_target/weighted_target.h', 'src/core/load_balancing/xds/xds_channel_args.h', 'src/core/load_balancing/xds/xds_override_host.h', 'src/core/resolver/dns/c_ares/dns_resolver_ares.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index eba3b22cdbf..9d4c4079b4e 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1268,6 +1268,8 @@ Pod::Spec.new do |s| 'src/core/lib/channel/connected_channel.cc', 'src/core/lib/channel/connected_channel.h', 'src/core/lib/channel/context.h', + 'src/core/lib/channel/metrics.cc', + 'src/core/lib/channel/metrics.h', 'src/core/lib/channel/promise_based_filter.cc', 'src/core/lib/channel/promise_based_filter.h', 'src/core/lib/channel/server_call_tracer_filter.cc', @@ -1999,6 +2001,7 @@ Pod::Spec.new do |s| 'src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h', 'src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc', 'src/core/load_balancing/weighted_target/weighted_target.cc', + 'src/core/load_balancing/weighted_target/weighted_target.h', 'src/core/load_balancing/xds/cds.cc', 'src/core/load_balancing/xds/xds_channel_args.h', 'src/core/load_balancing/xds/xds_cluster_impl.cc', @@ -2938,6 +2941,7 @@ Pod::Spec.new do |s| 'src/core/lib/channel/channelz_registry.h', 'src/core/lib/channel/connected_channel.h', 'src/core/lib/channel/context.h', + 'src/core/lib/channel/metrics.h', 'src/core/lib/channel/promise_based_filter.h', 'src/core/lib/channel/status_util.h', 'src/core/lib/channel/tcp_tracer.h', @@ -3317,6 +3321,7 @@ Pod::Spec.new do |s| 'src/core/load_balancing/subchannel_interface.h', 'src/core/load_balancing/subchannel_list.h', 'src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h', + 'src/core/load_balancing/weighted_target/weighted_target.h', 'src/core/load_balancing/xds/xds_channel_args.h', 'src/core/load_balancing/xds/xds_override_host.h', 'src/core/resolver/dns/c_ares/dns_resolver_ares.h', diff --git a/grpc.gemspec b/grpc.gemspec index c46ae2adcca..2946a7b2c65 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1161,6 +1161,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/channel/connected_channel.cc ) s.files += %w( src/core/lib/channel/connected_channel.h ) s.files += %w( src/core/lib/channel/context.h ) + s.files += %w( src/core/lib/channel/metrics.cc ) + s.files += %w( src/core/lib/channel/metrics.h ) s.files += %w( src/core/lib/channel/promise_based_filter.cc ) s.files += %w( src/core/lib/channel/promise_based_filter.h ) s.files += %w( src/core/lib/channel/server_call_tracer_filter.cc ) @@ -1892,6 +1894,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h ) s.files += %w( src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc ) s.files += %w( src/core/load_balancing/weighted_target/weighted_target.cc ) + s.files += %w( src/core/load_balancing/weighted_target/weighted_target.h ) s.files += %w( src/core/load_balancing/xds/cds.cc ) s.files += %w( src/core/load_balancing/xds/xds_channel_args.h ) s.files += %w( src/core/load_balancing/xds/xds_cluster_impl.cc ) diff --git a/grpc.gyp b/grpc.gyp index 0e655558818..fdfece92980 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -705,6 +705,7 @@ 'src/core/lib/channel/channelz.cc', 'src/core/lib/channel/channelz_registry.cc', 'src/core/lib/channel/connected_channel.cc', + 'src/core/lib/channel/metrics.cc', 'src/core/lib/channel/promise_based_filter.cc', 'src/core/lib/channel/server_call_tracer_filter.cc', 'src/core/lib/channel/status_util.cc', @@ -1246,6 +1247,7 @@ 'src/core/lib/channel/channelz.cc', 'src/core/lib/channel/channelz_registry.cc', 'src/core/lib/channel/connected_channel.cc', + 'src/core/lib/channel/metrics.cc', 'src/core/lib/channel/promise_based_filter.cc', 'src/core/lib/channel/server_call_tracer_filter.cc', 'src/core/lib/channel/status_util.cc', @@ -2054,6 +2056,7 @@ 'src/core/lib/channel/channelz.cc', 'src/core/lib/channel/channelz_registry.cc', 'src/core/lib/channel/connected_channel.cc', + 'src/core/lib/channel/metrics.cc', 'src/core/lib/channel/promise_based_filter.cc', 'src/core/lib/channel/server_call_tracer_filter.cc', 'src/core/lib/channel/status_util.cc', diff --git a/package.xml b/package.xml index 6720e2ebe1e..a379ebb1d51 100644 --- a/package.xml +++ b/package.xml @@ -1143,6 +1143,8 @@ + + @@ -1874,6 +1876,7 @@ + diff --git a/src/core/BUILD b/src/core/BUILD index deb1ccf5cab..a63d7ac0251 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -3256,6 +3256,7 @@ grpc_cc_library( "error", "grpc_backend_metric_data", "iomgr_fwd", + "metrics", "pollset_set", "ref_counted", "resolved_address", @@ -5680,6 +5681,7 @@ grpc_cc_library( "connectivity_state", "experiments", "grpc_backend_metric_data", + "grpc_lb_policy_weighted_target", "grpc_lb_subchannel_list", "json", "json_args", @@ -5687,6 +5689,7 @@ grpc_cc_library( "lb_endpoint_list", "lb_policy", "lb_policy_factory", + "metrics", "ref_counted", "resolved_address", "static_stride_scheduler", @@ -5826,6 +5829,9 @@ grpc_cc_library( srcs = [ "load_balancing/weighted_target/weighted_target.cc", ], + hdrs = [ + "load_balancing/weighted_target/weighted_target.h", + ], external_deps = [ "absl/base:core_headers", "absl/meta:type_traits", diff --git a/src/core/client_channel/client_channel_filter.cc b/src/core/client_channel/client_channel_filter.cc index 42ad79f73d5..5068ed57ba1 100644 --- a/src/core/client_channel/client_channel_filter.cc +++ b/src/core/client_channel/client_channel_filter.cc @@ -1148,6 +1148,8 @@ class ClientChannelFilter::ClientChannelControlHelper chand_->resolver_->RequestReresolutionLocked(); } + absl::string_view GetTarget() override { return chand_->target_uri_; } + absl::string_view GetAuthority() override { return chand_->default_authority_; } @@ -1166,6 +1168,10 @@ class ClientChannelFilter::ClientChannelControlHelper return chand_->owning_stack_->EventEngine(); } + GlobalStatsPluginRegistry::StatsPluginGroup& GetStatsPluginGroup() override { + return *chand_->owning_stack_->stats_plugin_group; + } + void AddTraceEvent(TraceSeverity severity, absl::string_view message) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { if (chand_->resolver_ == nullptr) return; // Shutting down. @@ -1270,18 +1276,19 @@ ClientChannelFilter::ClientChannelFilter(grpc_channel_element_args* args, } default_service_config_ = std::move(*service_config); // Get URI to resolve, using proxy mapper if needed. - absl::optional server_uri = + absl::optional target_uri = channel_args_.GetOwnedString(GRPC_ARG_SERVER_URI); - if (!server_uri.has_value()) { + if (!target_uri.has_value()) { *error = GRPC_ERROR_CREATE( "target URI channel arg missing or wrong type in client channel " "filter"); return; } + target_uri_ = std::move(*target_uri); uri_to_resolve_ = CoreConfiguration::Get() .proxy_mapper_registry() - .MapName(*server_uri, &channel_args_) - .value_or(*server_uri); + .MapName(target_uri_, &channel_args_) + .value_or(target_uri_); // Make sure the URI to resolve is valid, so that we know that // resolver creation will succeed later. if (!CoreConfiguration::Get().resolver_registry().IsValidTarget( @@ -1306,7 +1313,7 @@ ClientChannelFilter::ClientChannelFilter(grpc_channel_element_args* args, if (!default_authority.has_value()) { default_authority_ = CoreConfiguration::Get().resolver_registry().GetDefaultAuthority( - *server_uri); + target_uri_); } else { default_authority_ = std::move(*default_authority); } diff --git a/src/core/client_channel/client_channel_filter.h b/src/core/client_channel/client_channel_filter.h index c904c8f23e2..693366a5b0a 100644 --- a/src/core/client_channel/client_channel_filter.h +++ b/src/core/client_channel/client_channel_filter.h @@ -295,6 +295,7 @@ class ClientChannelFilter { grpc_channel_stack* owning_stack_; ClientChannelFactory* client_channel_factory_; RefCountedPtr default_service_config_; + std::string target_uri_; std::string uri_to_resolve_; std::string default_authority_; channelz::ChannelNode* channelz_node_; diff --git a/src/core/lib/channel/channel_stack.cc b/src/core/lib/channel/channel_stack.cc index ee1c1d9e943..a3b0307982e 100644 --- a/src/core/lib/channel/channel_stack.cc +++ b/src/core/lib/channel/channel_stack.cc @@ -129,6 +129,7 @@ grpc_error_handle grpc_channel_stack_init( stack->on_destroy.Init([]() {}); stack->event_engine.Init(channel_args.GetObjectRef()); + stack->stats_plugin_group.Init(); size_t call_size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + @@ -188,6 +189,7 @@ void grpc_channel_stack_destroy(grpc_channel_stack* stack) { (*stack->on_destroy)(); stack->on_destroy.Destroy(); stack->event_engine.Destroy(); + stack->stats_plugin_group.Destroy(); } grpc_error_handle grpc_call_stack_init( diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 32c67017b3c..e32b0a4ae0a 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -61,6 +61,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/channel/context.h" +#include "src/core/lib/channel/metrics.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gpr/time_precise.h" #include "src/core/lib/gprpp/manual_constructor.h" @@ -219,6 +220,10 @@ struct grpc_channel_stack { return event_engine->get(); } + grpc_core::ManualConstructor< + grpc_core::GlobalStatsPluginRegistry::StatsPluginGroup> + stats_plugin_group; + // Minimal infrastructure to act like a RefCounted thing without converting // everything. // It's likely that we'll want to replace grpc_channel_stack with something diff --git a/src/core/lib/channel/metrics.cc b/src/core/lib/channel/metrics.cc index 3e108580e38..36e17d2afc9 100644 --- a/src/core/lib/channel/metrics.cc +++ b/src/core/lib/channel/metrics.cc @@ -21,18 +21,17 @@ #include "src/core/lib/gprpp/crash.h" namespace grpc_core { -namespace { + // Uses the Construct-on-First-Use idiom to avoid the static initialization // order fiasco. absl::flat_hash_map& -GetInstrumentList() { +GlobalInstrumentsRegistry::GetInstrumentList() { static NoDestruct> instruments; return *instruments; } -} // namespace GlobalInstrumentsRegistry::GlobalUInt64CounterHandle GlobalInstrumentsRegistry::RegisterUInt64Counter( @@ -157,11 +156,6 @@ void GlobalInstrumentsRegistry::ForEach( } } -void GlobalInstrumentsRegistry::TestOnlyResetGlobalInstrumentsRegistry() { - auto& instruments = GetInstrumentList(); - instruments.clear(); -} - NoDestruct GlobalStatsPluginRegistry::mutex_; NoDestruct>> GlobalStatsPluginRegistry::plugins_; diff --git a/src/core/lib/channel/metrics.h b/src/core/lib/channel/metrics.h index 994ba36f5aa..51803e7c4fa 100644 --- a/src/core/lib/channel/metrics.h +++ b/src/core/lib/channel/metrics.h @@ -21,6 +21,7 @@ #include #include +#include "absl/container/flat_hash_map.h" #include "absl/functional/function_ref.h" #include "absl/strings/string_view.h" #include "absl/types/span.h" @@ -33,6 +34,8 @@ namespace grpc_core { +constexpr absl::string_view kMetricLabelTarget = "grpc.target"; + // A global registry of instruments(metrics). This API is designed to be used to // register instruments (Counter and Histogram) as part of program startup, // before the execution of the main function (during dynamic initialization @@ -97,9 +100,14 @@ class GlobalInstrumentsRegistry { static void ForEach( absl::FunctionRef f); - static void TestOnlyResetGlobalInstrumentsRegistry(); + private: + friend class GlobalInstrumentsRegistryTestPeer; GlobalInstrumentsRegistry() = delete; + + static absl::flat_hash_map< + absl::string_view, GlobalInstrumentsRegistry::GlobalInstrumentDescriptor>& + GetInstrumentList(); }; // The StatsPlugin interface. @@ -213,12 +221,9 @@ class GlobalStatsPluginRegistry { // TODO(yijiem): Implement this. // StatsPluginsGroup GetStatsPluginsForServer(ChannelArgs& args); - static void TestOnlyResetGlobalStatsPluginRegistry() { - MutexLock lock(&*mutex_); - plugins_->clear(); - } - private: + friend class GlobalStatsPluginRegistryTestPeer; + GlobalStatsPluginRegistry() = default; static NoDestruct mutex_; diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc index 7886baba05e..0c6f1ef460a 100644 --- a/src/core/lib/surface/channel.cc +++ b/src/core/lib/surface/channel.cc @@ -42,6 +42,7 @@ #include "src/core/lib/channel/channel_stack_builder_impl.h" #include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/channel/channelz.h" +#include "src/core/lib/channel/metrics.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats_data.h" @@ -160,6 +161,12 @@ absl::StatusOr> Channel::CreateWithBuilder( *enabled_algorithms_bitset | 1 /* always support no compression */; } + // TODO(roth): Populate authority after merging + // https://github.com/grpc/grpc/pull/35924. + StatsPlugin::ChannelScope scope(builder->target(), ""); + *(*r)->stats_plugin_group = + GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope); + return RefCountedPtr(new Channel( grpc_channel_stack_type_is_client(builder->channel_stack_type()), builder->IsPromising(), std::string(builder->target()), channel_args, diff --git a/src/core/load_balancing/delegating_helper.h b/src/core/load_balancing/delegating_helper.h index 389fdea4542..e0f3885a326 100644 --- a/src/core/load_balancing/delegating_helper.h +++ b/src/core/load_balancing/delegating_helper.h @@ -58,6 +58,10 @@ class LoadBalancingPolicy::DelegatingChannelControlHelper parent_helper()->RequestReresolution(); } + absl::string_view GetTarget() override { + return parent_helper()->GetTarget(); + } + absl::string_view GetAuthority() override { return parent_helper()->GetAuthority(); } @@ -75,6 +79,10 @@ class LoadBalancingPolicy::DelegatingChannelControlHelper return parent_helper()->GetEventEngine(); } + GlobalStatsPluginRegistry::StatsPluginGroup& GetStatsPluginGroup() override { + return parent_helper()->GetStatsPluginGroup(); + } + void AddTraceEvent(TraceSeverity severity, absl::string_view message) override { parent_helper()->AddTraceEvent(severity, message); diff --git a/src/core/load_balancing/lb_policy.h b/src/core/load_balancing/lb_policy.h index 54d30446ae5..a0e10e9eae6 100644 --- a/src/core/load_balancing/lb_policy.h +++ b/src/core/load_balancing/lb_policy.h @@ -40,6 +40,7 @@ #include "src/core/load_balancing/backend_metric_data.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/metrics.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/dual_ref_counted.h" @@ -299,6 +300,9 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// Requests that the resolver re-resolve. virtual void RequestReresolution() = 0; + /// Returns the channel target. + virtual absl::string_view GetTarget() = 0; + /// Returns the channel authority. virtual absl::string_view GetAuthority() = 0; @@ -319,6 +323,10 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// Returns the EventEngine to use for timers and async work. virtual grpc_event_engine::experimental::EventEngine* GetEventEngine() = 0; + /// Returns the stats plugin group for reporting metrics. + virtual GlobalStatsPluginRegistry::StatsPluginGroup& + GetStatsPluginGroup() = 0; + /// Adds a trace message associated with the channel. enum TraceSeverity { TRACE_INFO, TRACE_WARNING, TRACE_ERROR }; virtual void AddTraceEvent(TraceSeverity severity, diff --git a/src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc b/src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc index d52e3d95908..9a049a5a03e 100644 --- a/src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc +++ b/src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc @@ -51,8 +51,10 @@ #include "src/core/load_balancing/oob_backend_metric.h" #include "src/core/load_balancing/subchannel_list.h" #include "src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h" +#include "src/core/load_balancing/weighted_target/weighted_target.h" #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/metrics.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats_data.h" @@ -86,6 +88,41 @@ namespace { constexpr absl::string_view kWeightedRoundRobin = "weighted_round_robin"; +constexpr absl::string_view kMetricLabelLocality = "grpc.lb.locality"; + +const auto kMetricRrFallback = + GlobalInstrumentsRegistry::RegisterUInt64Counter( + "grpc.lb.wrr.rr_fallback", + "EXPERIMENTAL. Number of scheduler updates in which there were not " + "enough endpoints with valid weight, which caused the WRR policy to " + "fall back to RR behavior.", + "{update}", {kMetricLabelTarget}, {kMetricLabelLocality}, false); + +const auto kMetricEndpointWeightNotYetUsable = + GlobalInstrumentsRegistry::RegisterUInt64Counter( + "grpc.lb.wrr.endpoint_weight_not_yet_usable", + "EXPERIMENTAL. Number of endpoints from each scheduler update that " + "don't yet have usable weight information (i.e., either the load " + "report has not yet been received, or it is within the blackout " + "period).", + "{endpoint}", {kMetricLabelTarget}, {kMetricLabelLocality}, false); + +const auto kMetricEndpointWeightStale = + GlobalInstrumentsRegistry::RegisterUInt64Counter( + "grpc.lb.wrr.endpoint_weight_stale", + "EXPERIMENTAL. Number of endpoints from each scheduler update whose " + "latest weight is older than the expiration period.", + "{endpoint}", {kMetricLabelTarget}, {kMetricLabelLocality}, false); + +const auto kMetricEndpointWeights = + GlobalInstrumentsRegistry::RegisterDoubleHistogram( + "grpc.lb.wrr.endpoint_weights", + "EXPERIMENTAL. The histogram buckets will be endpoint weight ranges. " + "Each bucket will be a counter that is incremented once for every " + "endpoint whose weight is within that range. Note that endpoints " + "without usable weights will have weight 0.", + "{weight}", {kMetricLabelTarget}, {kMetricLabelLocality}, false); + // Config for WRR policy. class WeightedRoundRobinConfig : public LoadBalancingPolicy::Config { public: @@ -1021,7 +1058,8 @@ class WeightedRoundRobin : public LoadBalancingPolicy { float error_utilization_penalty); float GetWeight(Timestamp now, Duration weight_expiration_period, - Duration blackout_period); + Duration blackout_period, uint64_t* num_not_yet_usable, + uint64_t* num_stale); void ResetNonEmptySince(); @@ -1215,6 +1253,8 @@ class WeightedRoundRobin : public LoadBalancingPolicy { std::map endpoint_weight_map_ ABSL_GUARDED_BY(&endpoint_weight_map_mu_); + const absl::string_view locality_name_; + bool shutdown_ = false; absl::BitGen bit_gen_; @@ -1276,8 +1316,8 @@ void WeightedRoundRobin::EndpointWeight::MaybeUpdateWeight( } float WeightedRoundRobin::EndpointWeight::GetWeight( - Timestamp now, Duration weight_expiration_period, - Duration blackout_period) { + Timestamp now, Duration weight_expiration_period, Duration blackout_period, + uint64_t* num_not_yet_usable, uint64_t* num_stale) { MutexLock lock(&mu_); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, @@ -1294,12 +1334,14 @@ float WeightedRoundRobin::EndpointWeight::GetWeight( // period, reset non_empty_since_ so that we apply the blackout period // again if we start getting data again in the future, and return 0. if (now - last_update_time_ >= weight_expiration_period) { + ++*num_stale; non_empty_since_ = Timestamp::InfFuture(); return 0; } // If we don't have at least blackout_period worth of data, return 0. if (blackout_period > Duration::Zero() && now - non_empty_since_ < blackout_period) { + ++*num_not_yet_usable; return 0; } // Otherwise, return the weight. @@ -1418,14 +1460,28 @@ size_t WeightedRoundRobin::Picker::PickIndex() { } void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() { - // Build scheduler. + auto& stats_plugins = wrr_->channel_control_helper()->GetStatsPluginGroup(); + // Build scheduler, reporting metrics on endpoint weights. const Timestamp now = Timestamp::Now(); std::vector weights; weights.reserve(endpoints_.size()); + uint64_t num_not_yet_usable = 0; + uint64_t num_stale = 0; for (const auto& endpoint : endpoints_) { - weights.push_back(endpoint.weight->GetWeight( - now, config_->weight_expiration_period(), config_->blackout_period())); - } + float weight = endpoint.weight->GetWeight( + now, config_->weight_expiration_period(), config_->blackout_period(), + &num_not_yet_usable, &num_stale); + weights.push_back(weight); + stats_plugins.RecordHistogram( + kMetricEndpointWeights, weight, + {wrr_->channel_control_helper()->GetTarget()}, {wrr_->locality_name_}); + } + stats_plugins.AddCounter( + kMetricEndpointWeightNotYetUsable, num_not_yet_usable, + {wrr_->channel_control_helper()->GetTarget()}, {wrr_->locality_name_}); + stats_plugins.AddCounter( + kMetricEndpointWeightStale, num_stale, + {wrr_->channel_control_helper()->GetTarget()}, {wrr_->locality_name_}); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p picker %p] new weights: %s", wrr_.get(), this, absl::StrJoin(weights, " ").c_str()); @@ -1440,9 +1496,14 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() { gpr_log(GPR_INFO, "[WRR %p picker %p] new scheduler: %p", wrr_.get(), this, scheduler.get()); } - } else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { - gpr_log(GPR_INFO, "[WRR %p picker %p] no scheduler, falling back to RR", - wrr_.get(), this); + } else { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { + gpr_log(GPR_INFO, "[WRR %p picker %p] no scheduler, falling back to RR", + wrr_.get(), this); + } + stats_plugins.AddCounter( + kMetricRrFallback, 1, {wrr_->channel_control_helper()->GetTarget()}, + {wrr_->locality_name_}); } { MutexLock lock(&scheduler_mu_); @@ -1483,9 +1544,13 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() { // WeightedRoundRobin::WeightedRoundRobin(Args args) - : LoadBalancingPolicy(std::move(args)) { + : LoadBalancingPolicy(std::move(args)), + locality_name_(channel_args() + .GetString(GRPC_ARG_LB_WEIGHTED_TARGET_CHILD) + .value_or("")) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { - gpr_log(GPR_INFO, "[WRR %p] Created", this); + gpr_log(GPR_INFO, "[WRR %p] Created -- locality_name=\"%s\"", this, + std::string(locality_name_).c_str()); } } diff --git a/src/core/load_balancing/weighted_target/weighted_target.cc b/src/core/load_balancing/weighted_target/weighted_target.cc index fe951c308ca..6621436a44d 100644 --- a/src/core/load_balancing/weighted_target/weighted_target.cc +++ b/src/core/load_balancing/weighted_target/weighted_target.cc @@ -16,6 +16,8 @@ #include +#include "src/core/load_balancing/weighted_target/weighted_target.h" + #include #include @@ -160,7 +162,7 @@ class WeightedTargetLb : public LoadBalancingPolicy { absl::Status UpdateLocked( const WeightedTargetLbConfig::ChildConfig& config, absl::StatusOr> addresses, - const std::string& resolution_note, const ChannelArgs& args); + const std::string& resolution_note, ChannelArgs args); void ResetBackoffLocked(); void DeactivateLocked(); @@ -592,7 +594,7 @@ WeightedTargetLb::WeightedChild::CreateChildPolicyLocked( absl::Status WeightedTargetLb::WeightedChild::UpdateLocked( const WeightedTargetLbConfig::ChildConfig& config, absl::StatusOr> addresses, - const std::string& resolution_note, const ChannelArgs& args) { + const std::string& resolution_note, ChannelArgs args) { if (weighted_target_policy_->shutting_down_) return absl::OkStatus(); // Update child weight. if (weight_ != config.weight && @@ -611,6 +613,7 @@ absl::Status WeightedTargetLb::WeightedChild::UpdateLocked( delayed_removal_timer_.reset(); } // Create child policy if needed. + args = args.Set(GRPC_ARG_LB_WEIGHTED_TARGET_CHILD, name_); if (child_policy_ == nullptr) { child_policy_ = CreateChildPolicyLocked(args); } @@ -619,7 +622,7 @@ absl::Status WeightedTargetLb::WeightedChild::UpdateLocked( update_args.config = config.config; update_args.addresses = std::move(addresses); update_args.resolution_note = resolution_note; - update_args.args = args; + update_args.args = std::move(args); // Update the policy. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) { gpr_log(GPR_INFO, diff --git a/src/core/load_balancing/weighted_target/weighted_target.h b/src/core/load_balancing/weighted_target/weighted_target.h new file mode 100644 index 00000000000..2a1c4f17086 --- /dev/null +++ b/src/core/load_balancing/weighted_target/weighted_target.h @@ -0,0 +1,28 @@ +// +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#ifndef GRPC_SRC_CORE_LOAD_BALANCING_WEIGHTED_TARGET_WEIGHTED_TARGET_H +#define GRPC_SRC_CORE_LOAD_BALANCING_WEIGHTED_TARGET_WEIGHTED_TARGET_H + +#include + +#include "src/core/resolver/endpoint_addresses.h" + +// Channel arg key indicating the weighted_target child name. +#define GRPC_ARG_LB_WEIGHTED_TARGET_CHILD \ + GRPC_ARG_NO_SUBCHANNEL_PREFIX "lb_weighted_target_child" + +#endif // GRPC_SRC_CORE_LOAD_BALANCING_WEIGHTED_TARGET_WEIGHTED_TARGET_H diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index a171c782d35..faa41be4548 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -448,6 +448,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/channel/channelz.cc', 'src/core/lib/channel/channelz_registry.cc', 'src/core/lib/channel/connected_channel.cc', + 'src/core/lib/channel/metrics.cc', 'src/core/lib/channel/promise_based_filter.cc', 'src/core/lib/channel/server_call_tracer_filter.cc', 'src/core/lib/channel/status_util.cc', diff --git a/test/core/channel/BUILD b/test/core/channel/BUILD index ada41687228..dc36704723c 100644 --- a/test/core/channel/BUILD +++ b/test/core/channel/BUILD @@ -204,6 +204,7 @@ grpc_cc_test( uses_polling = False, deps = [ "//src/core:metrics", + "//test/core/util:fake_stats_plugin", "//test/core/util:grpc_test_util", ], ) diff --git a/test/core/channel/metrics_test.cc b/test/core/channel/metrics_test.cc index eca4cabcd32..e27a8d09350 100644 --- a/test/core/channel/metrics_test.cc +++ b/test/core/channel/metrics_test.cc @@ -23,299 +23,17 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "test/core/util/fake_stats_plugin.h" #include "test/core/util/test_config.h" namespace grpc_core { namespace { -void AddKeyValuePairs(absl::Span keys, - absl::Span values, - std::vector* key_value_pairs) { - GPR_ASSERT(keys.size() == values.size()); - for (size_t i = 0; i < keys.size(); ++i) { - key_value_pairs->push_back(absl::StrCat(keys[i], "=", values[i])); - } -} - -std::string MakeLabelString( - absl::Span label_keys, - absl::Span label_values, - absl::Span optional_label_keys, - absl::Span optional_values) { - std::vector key_value_pairs; - AddKeyValuePairs(label_keys, label_values, &key_value_pairs); - AddKeyValuePairs(optional_label_keys, optional_values, &key_value_pairs); - return absl::StrJoin(key_value_pairs, ","); -} - -// TODO(yijiem): Move this to test/core/util/fake_stats_plugin.h -class FakeStatsPlugin : public StatsPlugin { - public: - bool IsEnabledForChannel( - const StatsPlugin::ChannelScope& scope) const override { - return channel_filter_(scope); - } - - bool IsEnabledForServer(const ChannelArgs& /*args*/) const override { - return false; - } - - void AddCounter( - GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle, - uint64_t value, absl::Span label_values, - absl::Span optional_values) override { - // The problem with this approach is that we initialize uint64_counters_ in - // BuildAndRegister by querying the GlobalInstrumentsRegistry at the time. - // If the GlobalInstrumentsRegistry has changed since then (which we - // currently don't allow), we might not have seen that descriptor nor have - // we created an instrument for it. We probably could copy the existing - // instruments at build time and for the handle that we haven't seen we will - // just ignore it here. This would also prevent us from having to lock the - // GlobalInstrumentsRegistry everytime a metric is recorded. But this is not - // a concern for now. - auto iter = uint64_counters_.find(handle.index); - if (iter == uint64_counters_.end()) { - return; - } - iter->second.Add(value, label_values, optional_values); - } - void AddCounter( - GlobalInstrumentsRegistry::GlobalDoubleCounterHandle handle, double value, - absl::Span label_values, - absl::Span optional_values) override { - auto iter = double_counters_.find(handle.index); - if (iter == double_counters_.end()) { - return; - } - iter->second.Add(value, label_values, optional_values); - } - void RecordHistogram( - GlobalInstrumentsRegistry::GlobalUInt64HistogramHandle handle, - uint64_t value, absl::Span label_values, - absl::Span optional_values) override { - auto iter = uint64_histograms_.find(handle.index); - if (iter == uint64_histograms_.end()) { - return; - } - iter->second.Record(value, label_values, optional_values); - } - void RecordHistogram( - GlobalInstrumentsRegistry::GlobalDoubleHistogramHandle handle, - double value, absl::Span label_values, - absl::Span optional_values) override { - auto iter = double_histograms_.find(handle.index); - if (iter == double_histograms_.end()) { - return; - } - iter->second.Record(value, label_values, optional_values); - } - - absl::optional GetCounterValue( - GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle, - absl::Span label_values, - absl::Span optional_values) { - auto iter = uint64_counters_.find(handle.index); - if (iter == uint64_counters_.end()) { - return absl::nullopt; - } - return iter->second.GetValue(label_values, optional_values); - } - absl::optional GetCounterValue( - GlobalInstrumentsRegistry::GlobalDoubleCounterHandle handle, - absl::Span label_values, - absl::Span optional_values) { - auto iter = double_counters_.find(handle.index); - if (iter == double_counters_.end()) { - return absl::nullopt; - } - return iter->second.GetValue(label_values, optional_values); - } - absl::optional> GetHistogramValue( - GlobalInstrumentsRegistry::GlobalUInt64HistogramHandle handle, - absl::Span label_values, - absl::Span optional_values) { - auto iter = uint64_histograms_.find(handle.index); - if (iter == uint64_histograms_.end()) { - return absl::nullopt; - } - return iter->second.GetValues(label_values, optional_values); - } - absl::optional> GetHistogramValue( - GlobalInstrumentsRegistry::GlobalDoubleHistogramHandle handle, - absl::Span label_values, - absl::Span optional_values) { - auto iter = double_histograms_.find(handle.index); - if (iter == double_histograms_.end()) { - return absl::nullopt; - } - return iter->second.GetValues(label_values, optional_values); - } - - private: - friend class FakeStatsPluginBuilder; - - explicit FakeStatsPlugin( - absl::AnyInvocable - channel_filter) - : channel_filter_(std::move(channel_filter)) { - GlobalInstrumentsRegistry::ForEach( - [this](const GlobalInstrumentsRegistry::GlobalInstrumentDescriptor& - descriptor) { - if (!descriptor.enable_by_default) { - return; - } - if (descriptor.instrument_type == - GlobalInstrumentsRegistry::InstrumentType::kCounter) { - if (descriptor.value_type == - GlobalInstrumentsRegistry::ValueType::kUInt64) { - uint64_counters_.emplace(descriptor.index, descriptor); - } else { - double_counters_.emplace(descriptor.index, descriptor); - } - } else { - EXPECT_EQ(descriptor.instrument_type, - GlobalInstrumentsRegistry::InstrumentType::kHistogram); - if (descriptor.value_type == - GlobalInstrumentsRegistry::ValueType::kUInt64) { - uint64_histograms_.emplace(descriptor.index, descriptor); - } else { - double_histograms_.emplace(descriptor.index, descriptor); - } - } - }); - } - - template - class Counter { - public: - explicit Counter(GlobalInstrumentsRegistry::GlobalInstrumentDescriptor u) - : name_(u.name), - description_(u.description), - unit_(u.unit), - label_keys_(std::move(u.label_keys)), - optional_label_keys_(std::move(u.optional_label_keys)) {} - - void Add(T t, absl::Span label_values, - absl::Span optional_values) { - auto iter = storage_.find(MakeLabelString( - label_keys_, label_values, optional_label_keys_, optional_values)); - if (iter != storage_.end()) { - iter->second += t; - } else { - storage_[MakeLabelString(label_keys_, label_values, - optional_label_keys_, optional_values)] = t; - } - } - - absl::optional GetValue( - absl::Span label_values, - absl::Span optional_values) { - auto iter = storage_.find(MakeLabelString( - label_keys_, label_values, optional_label_keys_, optional_values)); - if (iter == storage_.end()) { - return absl::nullopt; - } - return iter->second; - } - - private: - absl::string_view name_; - absl::string_view description_; - absl::string_view unit_; - std::vector label_keys_; - std::vector optional_label_keys_; - // Aggregation of the same key attributes. - absl::flat_hash_map storage_; - }; - - template - class Histogram { - public: - explicit Histogram(GlobalInstrumentsRegistry::GlobalInstrumentDescriptor u) - : name_(u.name), - description_(u.description), - unit_(u.unit), - label_keys_(std::move(u.label_keys)), - optional_label_keys_(std::move(u.optional_label_keys)) {} - - void Record(T t, absl::Span label_values, - absl::Span optional_values) { - std::string key = MakeLabelString(label_keys_, label_values, - optional_label_keys_, optional_values); - auto iter = storage_.find(key); - if (iter == storage_.end()) { - storage_.emplace(key, std::initializer_list{t}); - } else { - iter->second.push_back(t); - } - } - - absl::optional> GetValues( - absl::Span label_values, - absl::Span optional_values) { - auto iter = storage_.find(MakeLabelString( - label_keys_, label_values, optional_label_keys_, optional_values)); - if (iter == storage_.end()) { - return absl::nullopt; - } - return iter->second; - } - - private: - absl::string_view name_; - absl::string_view description_; - absl::string_view unit_; - std::vector label_keys_; - std::vector optional_label_keys_; - absl::flat_hash_map> storage_; - }; - - absl::AnyInvocable - channel_filter_; - // Instruments. - absl::flat_hash_map> uint64_counters_; - absl::flat_hash_map> double_counters_; - absl::flat_hash_map> uint64_histograms_; - absl::flat_hash_map> double_histograms_; -}; - -// TODO(yijiem): Move this to test/core/util/fake_stats_plugin.h -class FakeStatsPluginBuilder { - public: - FakeStatsPluginBuilder& SetChannelFilter( - absl::AnyInvocable - channel_filter) { - channel_filter_ = std::move(channel_filter); - return *this; - } - - std::shared_ptr BuildAndRegister() { - auto f = std::shared_ptr( - new FakeStatsPlugin(std::move(channel_filter_))); - GlobalStatsPluginRegistry::RegisterStatsPlugin(f); - return f; - } - - private: - absl::AnyInvocable - channel_filter_; -}; - -std::shared_ptr MakeStatsPluginForTarget( - absl::string_view target_suffix) { - return FakeStatsPluginBuilder() - .SetChannelFilter( - [target_suffix](const StatsPlugin::ChannelScope& scope) { - return absl::EndsWith(scope.target(), target_suffix); - }) - .BuildAndRegister(); -} - -class MetricsTest : public testing::Test { +class MetricsTest : public ::testing::Test { public: void TearDown() override { - GlobalInstrumentsRegistry::TestOnlyResetGlobalInstrumentsRegistry(); - GlobalStatsPluginRegistry::TestOnlyResetGlobalStatsPluginRegistry(); + GlobalInstrumentsRegistryTestPeer::ResetGlobalInstrumentsRegistry(); + GlobalStatsPluginRegistryTestPeer::ResetGlobalStatsPluginRegistry(); } }; diff --git a/test/core/client_channel/lb_policy/BUILD b/test/core/client_channel/lb_policy/BUILD index d0422a50a40..1f5d398a76f 100644 --- a/test/core/client_channel/lb_policy/BUILD +++ b/test/core/client_channel/lb_policy/BUILD @@ -219,6 +219,7 @@ grpc_cc_test( uses_polling = False, deps = [ "//:grpc", + "//test/core/util:fake_stats_plugin", "//test/core/util:grpc_test_util", ], ) @@ -235,6 +236,7 @@ grpc_cc_test( deps = [ ":lb_policy_test_lib", "//src/core:grpc_lb_policy_weighted_round_robin", + "//test/core/util:fake_stats_plugin", "//test/core/util:grpc_test_util", ], ) diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index e7c49e794d0..7048fec08d3 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -587,7 +587,9 @@ class LoadBalancingPolicyTest : public ::testing::Test { queue_.push_back(ReresolutionRequested()); } - absl::string_view GetAuthority() override { return "server.example.com"; } + absl::string_view GetTarget() override { return test_->target_; } + + absl::string_view GetAuthority() override { return test_->authority_; } RefCountedPtr GetChannelCredentials() override { return nullptr; @@ -602,6 +604,11 @@ class LoadBalancingPolicyTest : public ::testing::Test { return test_->fuzzing_ee_.get(); } + GlobalStatsPluginRegistry::StatsPluginGroup& GetStatsPluginGroup() + override { + return test_->stats_plugin_group_; + } + void AddTraceEvent(TraceSeverity, absl::string_view) override {} LoadBalancingPolicyTest* test_; @@ -1494,6 +1501,9 @@ class LoadBalancingPolicyTest : public ::testing::Test { OrphanablePtr lb_policy_; const absl::string_view lb_policy_name_; const ChannelArgs channel_args_; + GlobalStatsPluginRegistry::StatsPluginGroup stats_plugin_group_; + std::string target_ = "dns:server.example.com"; + std::string authority_ = "server.example.com"; }; } // namespace testing diff --git a/test/core/client_channel/lb_policy/weighted_round_robin_test.cc b/test/core/client_channel/lb_policy/weighted_round_robin_test.cc index a6cbcbe2c1a..4572429513e 100644 --- a/test/core/client_channel/lb_policy/weighted_round_robin_test.cc +++ b/test/core/client_channel/lb_policy/weighted_round_robin_test.cc @@ -49,14 +49,18 @@ #include "src/core/lib/json/json_writer.h" #include "src/core/load_balancing/backend_metric_data.h" #include "src/core/load_balancing/lb_policy.h" +#include "src/core/load_balancing/weighted_target/weighted_target.h" #include "src/core/resolver/endpoint_addresses.h" #include "test/core/client_channel/lb_policy/lb_policy_test_lib.h" +#include "test/core/util/fake_stats_plugin.h" #include "test/core/util/test_config.h" namespace grpc_core { namespace testing { namespace { +constexpr absl::string_view kLocalityName = "locality0"; + class WeightedRoundRobinTest : public LoadBalancingPolicyTest { protected: class ConfigBuilder { @@ -103,7 +107,11 @@ class WeightedRoundRobinTest : public LoadBalancingPolicyTest { Json::Object json_; }; - WeightedRoundRobinTest() : LoadBalancingPolicyTest("weighted_round_robin") {} + WeightedRoundRobinTest() + : LoadBalancingPolicyTest( + "weighted_round_robin", + ChannelArgs().Set(GRPC_ARG_LB_WEIGHTED_TARGET_CHILD, + kLocalityName)) {} void SetUp() override { LoadBalancingPolicyTest::SetUp(); @@ -989,6 +997,173 @@ TEST_F(WeightedRoundRobinTest, MultipleAddressesPerEndpoint) { EXPECT_FALSE(subchannel3_1->ConnectionRequested()); } +TEST_F(WeightedRoundRobinTest, MetricDefinitionRrFallback) { + const auto* descriptor = + GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName( + "grpc.lb.wrr.rr_fallback"); + ASSERT_NE(descriptor, nullptr); + EXPECT_EQ(descriptor->value_type, + GlobalInstrumentsRegistry::ValueType::kUInt64); + EXPECT_EQ(descriptor->instrument_type, + GlobalInstrumentsRegistry::InstrumentType::kCounter); + EXPECT_EQ(descriptor->enable_by_default, false); + EXPECT_EQ(descriptor->name, "grpc.lb.wrr.rr_fallback"); + EXPECT_EQ(descriptor->unit, "{update}"); + EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target")); + EXPECT_THAT(descriptor->optional_label_keys, + ::testing::ElementsAre("grpc.lb.locality")); +} + +TEST_F(WeightedRoundRobinTest, MetricDefinitionEndpointWeightNotYetUsable) { + const auto* descriptor = + GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName( + "grpc.lb.wrr.endpoint_weight_not_yet_usable"); + ASSERT_NE(descriptor, nullptr); + EXPECT_EQ(descriptor->value_type, + GlobalInstrumentsRegistry::ValueType::kUInt64); + EXPECT_EQ(descriptor->instrument_type, + GlobalInstrumentsRegistry::InstrumentType::kCounter); + EXPECT_EQ(descriptor->enable_by_default, false); + EXPECT_EQ(descriptor->name, "grpc.lb.wrr.endpoint_weight_not_yet_usable"); + EXPECT_EQ(descriptor->unit, "{endpoint}"); + EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target")); + EXPECT_THAT(descriptor->optional_label_keys, + ::testing::ElementsAre("grpc.lb.locality")); +} + +TEST_F(WeightedRoundRobinTest, MetricDefinitionEndpointWeightStale) { + const auto* descriptor = + GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName( + "grpc.lb.wrr.endpoint_weight_stale"); + ASSERT_NE(descriptor, nullptr); + EXPECT_EQ(descriptor->value_type, + GlobalInstrumentsRegistry::ValueType::kUInt64); + EXPECT_EQ(descriptor->instrument_type, + GlobalInstrumentsRegistry::InstrumentType::kCounter); + EXPECT_EQ(descriptor->enable_by_default, false); + EXPECT_EQ(descriptor->name, "grpc.lb.wrr.endpoint_weight_stale"); + EXPECT_EQ(descriptor->unit, "{endpoint}"); + EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target")); + EXPECT_THAT(descriptor->optional_label_keys, + ::testing::ElementsAre("grpc.lb.locality")); +} + +TEST_F(WeightedRoundRobinTest, MetricDefinitionEndpointWeights) { + const auto* descriptor = + GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName( + "grpc.lb.wrr.endpoint_weights"); + ASSERT_NE(descriptor, nullptr); + EXPECT_EQ(descriptor->value_type, + GlobalInstrumentsRegistry::ValueType::kDouble); + EXPECT_EQ(descriptor->instrument_type, + GlobalInstrumentsRegistry::InstrumentType::kHistogram); + EXPECT_EQ(descriptor->enable_by_default, false); + EXPECT_EQ(descriptor->name, "grpc.lb.wrr.endpoint_weights"); + EXPECT_EQ(descriptor->unit, "{weight}"); + EXPECT_THAT(descriptor->label_keys, ::testing::ElementsAre("grpc.target")); + EXPECT_THAT(descriptor->optional_label_keys, + ::testing::ElementsAre("grpc.lb.locality")); +} + +TEST_F(WeightedRoundRobinTest, MetricValues) { + if (!IsWrrDelegateToPickFirstEnabled()) return; + const auto kRrFallback = + GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName( + "grpc.lb.wrr.rr_fallback") + .value(); + const auto kEndpointWeightNotYetUsable = + GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName( + "grpc.lb.wrr.endpoint_weight_not_yet_usable") + .value(); + const auto kEndpointWeightStale = + GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName( + "grpc.lb.wrr.endpoint_weight_stale") + .value(); + const auto kEndpointWeights = + GlobalInstrumentsRegistryTestPeer::FindDoubleHistogramHandleByName( + "grpc.lb.wrr.endpoint_weights") + .value(); + const absl::string_view kLabelValues[] = {target_}; + const absl::string_view kOptionalLabelValues[] = {kLocalityName}; + auto stats_plugin = std::make_shared( + nullptr, /*use_disabled_by_default_metrics=*/true); + stats_plugin_group_.push_back(stats_plugin); + // Send address list to LB policy. + const std::array kAddresses = { + "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; + auto picker = SendInitialUpdateAndWaitForConnected( + kAddresses, + ConfigBuilder().SetWeightExpirationPeriod(Duration::Seconds(2))); + ASSERT_NE(picker, nullptr); + // Address 0 gets weight 1, address 1 gets weight 3. + // No utilization report from backend 2, so it gets the average weight 2. + WaitForWeightedRoundRobinPicks( + &picker, + {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9, + /*qps=*/100.0, /*eps=*/0.0)}, + {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3, + /*qps=*/100.0, /*eps=*/0.0)}}, + {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 2}}); + // Now have backend 2 report utilization the same as backend 1, so its + // weight will be the same. + WaitForWeightedRoundRobinPicks( + &picker, + {{kAddresses[0], MakeBackendMetricData(/*app_utilization=*/0.9, + /*qps=*/100.0, /*eps=*/0.0)}, + {kAddresses[1], MakeBackendMetricData(/*app_utilization=*/0.3, + /*qps=*/100.0, /*eps=*/0.0)}, + {kAddresses[2], MakeBackendMetricData(/*app_utilization=*/0.3, + /*qps=*/100.0, /*eps=*/0.0)}}, + {{kAddresses[0], 1}, {kAddresses[1], 3}, {kAddresses[2], 3}}); + // Check endpoint weights. + EXPECT_THAT(stats_plugin->GetHistogramValue(kEndpointWeights, kLabelValues, + kOptionalLabelValues), + ::testing::Optional(::testing::ElementsAre( + // Picker created for first endpoint becoming READY. + 0, + // Picker update for second endpoint CONNECTING. + 0, + // Picker update for second endpoint READY. + 0, 0, + // Picker update for third endpoint CONNECTING. + 0, 0, + // Picker update for third endpoint READY. + 0, 0, 0, + // Weights for first two endpoints now start getting used. + ::testing::DoubleNear(111.111115, 0.000001), + ::testing::DoubleNear(333.333344, 0.000001), 0, + // Weights for all endpoints are now used. + ::testing::DoubleNear(111.111115, 0.000001), + ::testing::DoubleNear(333.333344, 0.000001), + ::testing::DoubleNear(333.333344, 0.000001)))); + // RR fallback should trigger for the first 5 updates above, because + // there are less than two endpoints with valid weights. + EXPECT_THAT(stats_plugin->GetCounterValue(kRrFallback, kLabelValues, + kOptionalLabelValues), + ::testing::Optional(5)); + // Endpoint-not-yet-usable will be incremented once for every endpoint + // with weight 0 above. + EXPECT_THAT(stats_plugin->GetCounterValue(kEndpointWeightNotYetUsable, + kLabelValues, kOptionalLabelValues), + ::testing::Optional(10)); + // There are no stale endpoint weights so far. + EXPECT_THAT(stats_plugin->GetCounterValue(kEndpointWeightStale, kLabelValues, + kOptionalLabelValues), + ::testing::Optional(0)); + // Advance time to make weights stale and trigger the timer callback + // to recompute weights. + gpr_log(GPR_INFO, "advancing time to trigger staleness..."); + IncrementTimeBy(Duration::Seconds(2)); + // Picker should now be falling back to round-robin. + ExpectWeightedRoundRobinPicks( + picker.get(), {}, + {{kAddresses[0], 3}, {kAddresses[1], 3}, {kAddresses[2], 3}}); + // All three endpoints should now have stale weights. + EXPECT_THAT(stats_plugin->GetCounterValue(kEndpointWeightStale, kLabelValues, + kOptionalLabelValues), + ::testing::Optional(3)); +} + } // namespace } // namespace testing } // namespace grpc_core diff --git a/test/core/util/BUILD b/test/core/util/BUILD index d8ace2ee84e..5122f7d589f 100644 --- a/test/core/util/BUILD +++ b/test/core/util/BUILD @@ -494,10 +494,21 @@ grpc_cc_library( grpc_cc_library( name = "fake_stats_plugin", + testonly = True, srcs = ["fake_stats_plugin.cc"], hdrs = ["fake_stats_plugin.h"], + external_deps = [ + "absl/container:flat_hash_map", + "absl/functional:any_invocable", + "absl/status", + "absl/strings", + "absl/types:optional", + "absl/types:span", + "gtest", + ], deps = [ "//:grpc", "//src/core:examine_stack", + "//src/core:metrics", ], ) diff --git a/test/core/util/fake_stats_plugin.cc b/test/core/util/fake_stats_plugin.cc index f8819461da4..9aa31783e0e 100644 --- a/test/core/util/fake_stats_plugin.cc +++ b/test/core/util/fake_stats_plugin.cc @@ -79,4 +79,109 @@ void RegisterFakeStatsPlugin() { }); } +namespace { + +void AddKeyValuePairs(absl::Span keys, + absl::Span values, + std::vector* key_value_pairs) { + GPR_ASSERT(keys.size() == values.size()); + for (size_t i = 0; i < keys.size(); ++i) { + key_value_pairs->push_back(absl::StrCat(keys[i], "=", values[i])); + } +} + +} // namespace + +std::string MakeLabelString( + absl::Span label_keys, + absl::Span label_values, + absl::Span optional_label_keys, + absl::Span optional_values) { + std::vector key_value_pairs; + AddKeyValuePairs(label_keys, label_values, &key_value_pairs); + AddKeyValuePairs(optional_label_keys, optional_values, &key_value_pairs); + return absl::StrJoin(key_value_pairs, ","); +} + +std::shared_ptr MakeStatsPluginForTarget( + absl::string_view target_suffix) { + return FakeStatsPluginBuilder() + .SetChannelFilter( + [target_suffix](const StatsPlugin::ChannelScope& scope) { + return absl::EndsWith(scope.target(), target_suffix); + }) + .BuildAndRegister(); +} + +void GlobalInstrumentsRegistryTestPeer::ResetGlobalInstrumentsRegistry() { + auto& instruments = GlobalInstrumentsRegistry::GetInstrumentList(); + instruments.clear(); +} + +namespace { + +template +absl::optional FindInstrument( + const absl::flat_hash_map< + absl::string_view, + GlobalInstrumentsRegistry::GlobalInstrumentDescriptor>& instruments, + absl::string_view name, GlobalInstrumentsRegistry::ValueType value_type, + GlobalInstrumentsRegistry::InstrumentType instrument_type) { + auto it = instruments.find(name); + if (it != instruments.end() && it->second.value_type == value_type && + it->second.instrument_type == instrument_type) { + HandleType handle; + handle.index = it->second.index; + return handle; + } + return absl::nullopt; +} + +} // namespace + +absl::optional +GlobalInstrumentsRegistryTestPeer::FindUInt64CounterHandleByName( + absl::string_view name) { + return FindInstrument( + GlobalInstrumentsRegistry::GetInstrumentList(), name, + GlobalInstrumentsRegistry::ValueType::kUInt64, + GlobalInstrumentsRegistry::InstrumentType::kCounter); +} + +absl::optional +GlobalInstrumentsRegistryTestPeer::FindDoubleCounterHandleByName( + absl::string_view name) { + return FindInstrument( + GlobalInstrumentsRegistry::GetInstrumentList(), name, + GlobalInstrumentsRegistry::ValueType::kDouble, + GlobalInstrumentsRegistry::InstrumentType::kCounter); +} + +absl::optional +GlobalInstrumentsRegistryTestPeer::FindUInt64HistogramHandleByName( + absl::string_view name) { + return FindInstrument( + GlobalInstrumentsRegistry::GetInstrumentList(), name, + GlobalInstrumentsRegistry::ValueType::kUInt64, + GlobalInstrumentsRegistry::InstrumentType::kHistogram); +} + +absl::optional +GlobalInstrumentsRegistryTestPeer::FindDoubleHistogramHandleByName( + absl::string_view name) { + return FindInstrument( + GlobalInstrumentsRegistry::GetInstrumentList(), name, + GlobalInstrumentsRegistry::ValueType::kDouble, + GlobalInstrumentsRegistry::InstrumentType::kHistogram); +} + +GlobalInstrumentsRegistry::GlobalInstrumentDescriptor* +GlobalInstrumentsRegistryTestPeer::FindMetricDescriptorByName( + absl::string_view name) { + auto& instruments = GlobalInstrumentsRegistry::GetInstrumentList(); + auto it = instruments.find(name); + if (it != instruments.end()) return &it->second; + return nullptr; +} + } // namespace grpc_core diff --git a/test/core/util/fake_stats_plugin.h b/test/core/util/fake_stats_plugin.h index 57a659ba4bd..a7a4751ebbd 100644 --- a/test/core/util/fake_stats_plugin.h +++ b/test/core/util/fake_stats_plugin.h @@ -19,7 +19,16 @@ #include #include +#include "absl/container/flat_hash_map.h" +#include "absl/functional/any_invocable.h" +#include "absl/status/status.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" +#include "absl/types/span.h" +#include "gmock/gmock.h" + #include "src/core/lib/channel/call_tracer.h" +#include "src/core/lib/channel/metrics.h" #include "src/core/lib/channel/promise_based_filter.h" #include "src/core/lib/channel/tcp_tracer.h" @@ -187,6 +196,315 @@ class FakeServerCallTracer : public ServerCallTracer { std::vector* annotation_logger_; }; +std::string MakeLabelString( + absl::Span label_keys, + absl::Span label_values, + absl::Span optional_label_keys, + absl::Span optional_values); + +class FakeStatsPlugin : public StatsPlugin { + public: + explicit FakeStatsPlugin( + absl::AnyInvocable + channel_filter = nullptr, + bool use_disabled_by_default_metrics = false) + : channel_filter_(std::move(channel_filter)) { + GlobalInstrumentsRegistry::ForEach( + [&](const GlobalInstrumentsRegistry::GlobalInstrumentDescriptor& + descriptor) { + if (!use_disabled_by_default_metrics && + !descriptor.enable_by_default) { + gpr_log(GPR_INFO, + "FakeStatsPlugin[%p]: skipping disabled metric: %s", this, + std::string(descriptor.name).c_str()); + return; + } + if (descriptor.instrument_type == + GlobalInstrumentsRegistry::InstrumentType::kCounter) { + if (descriptor.value_type == + GlobalInstrumentsRegistry::ValueType::kUInt64) { + uint64_counters_.emplace(descriptor.index, descriptor); + } else { + double_counters_.emplace(descriptor.index, descriptor); + } + } else { + EXPECT_EQ(descriptor.instrument_type, + GlobalInstrumentsRegistry::InstrumentType::kHistogram); + if (descriptor.value_type == + GlobalInstrumentsRegistry::ValueType::kUInt64) { + uint64_histograms_.emplace(descriptor.index, descriptor); + } else { + double_histograms_.emplace(descriptor.index, descriptor); + } + } + }); + } + + bool IsEnabledForChannel(const ChannelScope& scope) const override { + if (channel_filter_ == nullptr) return true; + return channel_filter_(scope); + } + + bool IsEnabledForServer(const ChannelArgs& /*args*/) const override { + return false; + } + + void AddCounter( + GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle, + uint64_t value, absl::Span label_values, + absl::Span optional_values) override { + // The problem with this approach is that we initialize uint64_counters_ in + // BuildAndRegister by querying the GlobalInstrumentsRegistry at the time. + // If the GlobalInstrumentsRegistry has changed since then (which we + // currently don't allow), we might not have seen that descriptor nor have + // we created an instrument for it. We probably could copy the existing + // instruments at build time and for the handle that we haven't seen we will + // just ignore it here. This would also prevent us from having to lock the + // GlobalInstrumentsRegistry everytime a metric is recorded. But this is not + // a concern for now. + gpr_log(GPR_INFO, + "FakeStatsPlugin[%p]::AddCounter(index=%u, value=(uint64)%lu, " + "label_values={%s}, optional_label_values={%s}", + this, handle.index, value, + absl::StrJoin(label_values, ", ").c_str(), + absl::StrJoin(optional_values, ", ").c_str()); + auto iter = uint64_counters_.find(handle.index); + if (iter == uint64_counters_.end()) return; + iter->second.Add(value, label_values, optional_values); + } + void AddCounter( + GlobalInstrumentsRegistry::GlobalDoubleCounterHandle handle, double value, + absl::Span label_values, + absl::Span optional_values) override { + gpr_log(GPR_INFO, + "FakeStatsPlugin[%p]::AddCounter(index=%u, value(double)=%f, " + "label_values={%s}, optional_label_values={%s}", + this, handle.index, value, + absl::StrJoin(label_values, ", ").c_str(), + absl::StrJoin(optional_values, ", ").c_str()); + auto iter = double_counters_.find(handle.index); + if (iter == double_counters_.end()) return; + iter->second.Add(value, label_values, optional_values); + } + void RecordHistogram( + GlobalInstrumentsRegistry::GlobalUInt64HistogramHandle handle, + uint64_t value, absl::Span label_values, + absl::Span optional_values) override { + gpr_log(GPR_INFO, + "FakeStatsPlugin[%p]::RecordHistogram(index=%u, value=(uint64)%lu, " + "label_values={%s}, optional_label_values={%s}", + this, handle.index, value, + absl::StrJoin(label_values, ", ").c_str(), + absl::StrJoin(optional_values, ", ").c_str()); + auto iter = uint64_histograms_.find(handle.index); + if (iter == uint64_histograms_.end()) return; + iter->second.Record(value, label_values, optional_values); + } + void RecordHistogram( + GlobalInstrumentsRegistry::GlobalDoubleHistogramHandle handle, + double value, absl::Span label_values, + absl::Span optional_values) override { + gpr_log(GPR_INFO, + "FakeStatsPlugin[%p]::RecordHistogram(index=%u, value=(double)%f, " + "label_values={%s}, optional_label_values={%s}", + this, handle.index, value, + absl::StrJoin(label_values, ", ").c_str(), + absl::StrJoin(optional_values, ", ").c_str()); + auto iter = double_histograms_.find(handle.index); + if (iter == double_histograms_.end()) return; + iter->second.Record(value, label_values, optional_values); + } + + absl::optional GetCounterValue( + GlobalInstrumentsRegistry::GlobalUInt64CounterHandle handle, + absl::Span label_values, + absl::Span optional_values) { + auto iter = uint64_counters_.find(handle.index); + if (iter == uint64_counters_.end()) { + return absl::nullopt; + } + return iter->second.GetValue(label_values, optional_values); + } + absl::optional GetCounterValue( + GlobalInstrumentsRegistry::GlobalDoubleCounterHandle handle, + absl::Span label_values, + absl::Span optional_values) { + auto iter = double_counters_.find(handle.index); + if (iter == double_counters_.end()) { + return absl::nullopt; + } + return iter->second.GetValue(label_values, optional_values); + } + absl::optional> GetHistogramValue( + GlobalInstrumentsRegistry::GlobalUInt64HistogramHandle handle, + absl::Span label_values, + absl::Span optional_values) { + auto iter = uint64_histograms_.find(handle.index); + if (iter == uint64_histograms_.end()) { + return absl::nullopt; + } + return iter->second.GetValues(label_values, optional_values); + } + absl::optional> GetHistogramValue( + GlobalInstrumentsRegistry::GlobalDoubleHistogramHandle handle, + absl::Span label_values, + absl::Span optional_values) { + auto iter = double_histograms_.find(handle.index); + if (iter == double_histograms_.end()) { + return absl::nullopt; + } + return iter->second.GetValues(label_values, optional_values); + } + + private: + template + class Counter { + public: + explicit Counter(GlobalInstrumentsRegistry::GlobalInstrumentDescriptor u) + : name_(u.name), + description_(u.description), + unit_(u.unit), + label_keys_(std::move(u.label_keys)), + optional_label_keys_(std::move(u.optional_label_keys)) {} + + void Add(T t, absl::Span label_values, + absl::Span optional_values) { + auto iter = storage_.find(MakeLabelString( + label_keys_, label_values, optional_label_keys_, optional_values)); + if (iter != storage_.end()) { + iter->second += t; + } else { + storage_[MakeLabelString(label_keys_, label_values, + optional_label_keys_, optional_values)] = t; + } + } + + absl::optional GetValue( + absl::Span label_values, + absl::Span optional_values) { + auto iter = storage_.find(MakeLabelString( + label_keys_, label_values, optional_label_keys_, optional_values)); + if (iter == storage_.end()) { + return absl::nullopt; + } + return iter->second; + } + + private: + absl::string_view name_; + absl::string_view description_; + absl::string_view unit_; + std::vector label_keys_; + std::vector optional_label_keys_; + // Aggregation of the same key attributes. + absl::flat_hash_map storage_; + }; + + template + class Histogram { + public: + explicit Histogram(GlobalInstrumentsRegistry::GlobalInstrumentDescriptor u) + : name_(u.name), + description_(u.description), + unit_(u.unit), + label_keys_(std::move(u.label_keys)), + optional_label_keys_(std::move(u.optional_label_keys)) {} + + void Record(T t, absl::Span label_values, + absl::Span optional_values) { + std::string key = MakeLabelString(label_keys_, label_values, + optional_label_keys_, optional_values); + auto iter = storage_.find(key); + if (iter == storage_.end()) { + storage_.emplace(key, std::initializer_list{t}); + } else { + iter->second.push_back(t); + } + } + + absl::optional> GetValues( + absl::Span label_values, + absl::Span optional_values) { + auto iter = storage_.find(MakeLabelString( + label_keys_, label_values, optional_label_keys_, optional_values)); + if (iter == storage_.end()) { + return absl::nullopt; + } + return iter->second; + } + + private: + absl::string_view name_; + absl::string_view description_; + absl::string_view unit_; + std::vector label_keys_; + std::vector optional_label_keys_; + absl::flat_hash_map> storage_; + }; + + absl::AnyInvocable channel_filter_; + // Instruments. + absl::flat_hash_map> uint64_counters_; + absl::flat_hash_map> double_counters_; + absl::flat_hash_map> uint64_histograms_; + absl::flat_hash_map> double_histograms_; +}; + +class FakeStatsPluginBuilder { + public: + FakeStatsPluginBuilder& SetChannelFilter( + absl::AnyInvocable + channel_filter) { + channel_filter_ = std::move(channel_filter); + return *this; + } + + FakeStatsPluginBuilder& UseDisabledByDefaultMetrics(bool value) { + use_disabled_by_default_metrics_ = value; + return *this; + } + + std::shared_ptr BuildAndRegister() { + auto f = std::make_shared( + std::move(channel_filter_), use_disabled_by_default_metrics_); + GlobalStatsPluginRegistry::RegisterStatsPlugin(f); + return f; + } + + private: + absl::AnyInvocable + channel_filter_; + bool use_disabled_by_default_metrics_ = false; +}; + +std::shared_ptr MakeStatsPluginForTarget( + absl::string_view target_suffix); + +class GlobalInstrumentsRegistryTestPeer { + public: + static void ResetGlobalInstrumentsRegistry(); + + static absl::optional + FindUInt64CounterHandleByName(absl::string_view name); + static absl::optional + FindDoubleCounterHandleByName(absl::string_view name); + static absl::optional + FindUInt64HistogramHandleByName(absl::string_view name); + static absl::optional + FindDoubleHistogramHandleByName(absl::string_view name); + + static GlobalInstrumentsRegistry::GlobalInstrumentDescriptor* + FindMetricDescriptorByName(absl::string_view name); +}; + +class GlobalStatsPluginRegistryTestPeer { + public: + static void ResetGlobalStatsPluginRegistry() { + MutexLock lock(&*GlobalStatsPluginRegistry::mutex_); + GlobalStatsPluginRegistry::plugins_->clear(); + } +}; + } // namespace grpc_core #endif // GRPC_TEST_CORE_UTIL_FAKE_STATS_PLUGIN_H diff --git a/test/cpp/end2end/xds/BUILD b/test/cpp/end2end/xds/BUILD index 8fad62d50bf..30f165067c4 100644 --- a/test/cpp/end2end/xds/BUILD +++ b/test/cpp/end2end/xds/BUILD @@ -329,6 +329,7 @@ grpc_cc_test( "//src/proto/grpc/testing/xds/v3:client_side_weighted_round_robin_proto", "//src/proto/grpc/testing/xds/v3:cluster_proto", "//src/proto/grpc/testing/xds/v3:wrr_locality_proto", + "//test/core/util:fake_stats_plugin", "//test/core/util:grpc_test_util", "//test/core/util:scoped_env_var", ], diff --git a/test/cpp/end2end/xds/xds_wrr_end2end_test.cc b/test/cpp/end2end/xds/xds_wrr_end2end_test.cc index 02166375669..16799140b8a 100644 --- a/test/cpp/end2end/xds/xds_wrr_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_wrr_end2end_test.cc @@ -27,8 +27,10 @@ #include "src/core/client_channel/backup_poller.h" #include "src/core/lib/config/config_vars.h" +#include "src/core/lib/experiments/experiments.h" #include "src/proto/grpc/testing/xds/v3/client_side_weighted_round_robin.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/wrr_locality.grpc.pb.h" +#include "test/core/util/fake_stats_plugin.h" #include "test/core/util/scoped_env_var.h" #include "test/cpp/end2end/xds/xds_end2end_test_lib.h" @@ -40,12 +42,25 @@ using ::envoy::extensions::load_balancing_policies:: client_side_weighted_round_robin::v3::ClientSideWeightedRoundRobin; using ::envoy::extensions::load_balancing_policies::wrr_locality::v3:: WrrLocality; -using WrrTest = XdsEnd2endTest; + +class WrrTest : public XdsEnd2endTest { + protected: + void SetUp() override { + // No-op -- tests must explicitly call InitClient(). + } + + static std::string LocalityNameString(absl::string_view sub_zone) { + return absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}", + kDefaultLocalityRegion, kDefaultLocalityZone, + sub_zone); + } +}; INSTANTIATE_TEST_SUITE_P(XdsTest, WrrTest, ::testing::Values(XdsTestType()), &XdsTestType::Name); TEST_P(WrrTest, Basic) { + InitClient(); CreateAndStartBackends(3); // Expected weights = qps / (util + (eps/qps)) = // 1/(0.2+0.2) : 1/(0.3+0.3) : 2/(1.5+0.1) = 6:4:3 @@ -92,6 +107,48 @@ TEST_P(WrrTest, Basic) { }); } +TEST_P(WrrTest, MetricsHaveLocalityLabel) { + if (!grpc_core::IsWrrDelegateToPickFirstEnabled()) return; + const auto kEndpointWeights = + grpc_core::GlobalInstrumentsRegistryTestPeer:: + FindDoubleHistogramHandleByName("grpc.lb.wrr.endpoint_weights") + .value(); + const std::string target = absl::StrCat("xds:", kServerName); + const absl::string_view kLabelValues[] = {/*target=*/target}; + // Register stats plugin before initializing client. + auto stats_plugin = grpc_core::FakeStatsPluginBuilder() + .UseDisabledByDefaultMetrics(true) + .BuildAndRegister(); + InitClient(); + CreateAndStartBackends(2); + auto cluster = default_cluster_; + WrrLocality wrr_locality; + wrr_locality.mutable_endpoint_picking_policy() + ->add_policies() + ->mutable_typed_extension_config() + ->mutable_typed_config() + ->PackFrom(ClientSideWeightedRoundRobin()); + cluster.mutable_load_balancing_policy() + ->add_policies() + ->mutable_typed_extension_config() + ->mutable_typed_config() + ->PackFrom(wrr_locality); + balancer_->ads_service()->SetCdsResource(cluster); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}, + {"locality1", CreateEndpointsForBackends(1, 2)}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + WaitForAllBackends(DEBUG_LOCATION); + // Make sure we have a metric value for each of the two localities. + EXPECT_THAT( + stats_plugin->GetHistogramValue(kEndpointWeights, kLabelValues, + {LocalityNameString("locality0")}), + ::testing::Optional(::testing::Not(::testing::IsEmpty()))); + EXPECT_THAT( + stats_plugin->GetHistogramValue(kEndpointWeights, kLabelValues, + {LocalityNameString("locality1")}), + ::testing::Optional(::testing::Not(::testing::IsEmpty()))); +} + } // namespace } // namespace testing } // namespace grpc diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 91da0bd2666..ad87f40205e 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2160,6 +2160,8 @@ src/core/lib/channel/channelz_registry.h \ src/core/lib/channel/connected_channel.cc \ src/core/lib/channel/connected_channel.h \ src/core/lib/channel/context.h \ +src/core/lib/channel/metrics.cc \ +src/core/lib/channel/metrics.h \ src/core/lib/channel/promise_based_filter.cc \ src/core/lib/channel/promise_based_filter.h \ src/core/lib/channel/server_call_tracer_filter.cc \ @@ -2891,6 +2893,7 @@ src/core/load_balancing/weighted_round_robin/static_stride_scheduler.cc \ src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h \ src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc \ src/core/load_balancing/weighted_target/weighted_target.cc \ +src/core/load_balancing/weighted_target/weighted_target.h \ src/core/load_balancing/xds/cds.cc \ src/core/load_balancing/xds/xds_channel_args.h \ src/core/load_balancing/xds/xds_cluster_impl.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index d328e17185f..f6649305f2d 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1932,6 +1932,8 @@ src/core/lib/channel/channelz_registry.h \ src/core/lib/channel/connected_channel.cc \ src/core/lib/channel/connected_channel.h \ src/core/lib/channel/context.h \ +src/core/lib/channel/metrics.cc \ +src/core/lib/channel/metrics.h \ src/core/lib/channel/promise_based_filter.cc \ src/core/lib/channel/promise_based_filter.h \ src/core/lib/channel/server_call_tracer_filter.cc \ @@ -2668,6 +2670,7 @@ src/core/load_balancing/weighted_round_robin/static_stride_scheduler.cc \ src/core/load_balancing/weighted_round_robin/static_stride_scheduler.h \ src/core/load_balancing/weighted_round_robin/weighted_round_robin.cc \ src/core/load_balancing/weighted_target/weighted_target.cc \ +src/core/load_balancing/weighted_target/weighted_target.h \ src/core/load_balancing/xds/cds.cc \ src/core/load_balancing/xds/xds_channel_args.h \ src/core/load_balancing/xds/xds_cluster_impl.cc \