From c145b7910e8878f19e0b93b5e3c95300f46a0d1d Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 15 Sep 2023 10:19:46 -0700 Subject: [PATCH] [CSM] Add a server selector based on channel args (#34312) I've added channel args to `CreateNewServerCallTracer` on the `ServerCallTracerFactory`. The motivation is for CSM Observability where the OTel plugin will be configured to only do stats on servers which are xDS enabled, so I plan to check this via channel args. In the future, with the new scopes for metrics, I think I'll be able to change this to only check once per server or server connection instead of per call. --- BUILD | 1 + build_autogenerated.yaml | 1 + gRPC-C++.podspec | 2 ++ include/grpcpp/xds_server_builder.h | 3 +++ src/core/BUILD | 8 +++++++ src/core/ext/xds/xds_enabled_server.h | 24 +++++++++++++++++++++ src/core/lib/channel/call_tracer.cc | 8 ++++++- src/core/lib/channel/call_tracer.h | 3 +++ src/core/lib/surface/call.cc | 15 ++++++------- src/core/lib/surface/server.cc | 4 +++- src/core/lib/surface/server.h | 6 ++++++ src/cpp/ext/csm/BUILD | 2 ++ src/cpp/ext/csm/csm_observability.cc | 7 ++++++ src/cpp/ext/otel/otel_plugin.cc | 10 ++++++++- src/cpp/ext/otel/otel_plugin.h | 12 +++++++++++ src/cpp/ext/otel/otel_server_call_tracer.cc | 9 ++++++++ src/cpp/ext/otel/otel_server_call_tracer.h | 3 +++ tools/doxygen/Doxyfile.c++.internal | 1 + 18 files changed, 107 insertions(+), 12 deletions(-) create mode 100644 src/core/ext/xds/xds_enabled_server.h diff --git a/BUILD b/BUILD index bde7f174e6a..9cd2497ca16 100644 --- a/BUILD +++ b/BUILD @@ -1157,6 +1157,7 @@ grpc_cc_library( "gpr", "grpc", "grpc++_base", + "//src/core:xds_enabled_server", ], ) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 7636c17cc6d..fbdebdd3222 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -3551,6 +3551,7 @@ libs: - src/core/ext/transport/binder/wire_format/wire_reader.h - src/core/ext/transport/binder/wire_format/wire_reader_impl.h - src/core/ext/transport/binder/wire_format/wire_writer.h + - src/core/ext/xds/xds_enabled_server.h - src/cpp/client/client_stats_interceptor.h - src/cpp/client/create_channel_internal.h - src/cpp/client/secure_credentials.h diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 66fc44b4d39..747d7d63c9e 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -711,6 +711,7 @@ Pod::Spec.new do |s| 'src/core/ext/xds/xds_cluster.h', 'src/core/ext/xds/xds_cluster_specifier_plugin.h', 'src/core/ext/xds/xds_common_types.h', + 'src/core/ext/xds/xds_enabled_server.h', 'src/core/ext/xds/xds_endpoint.h', 'src/core/ext/xds/xds_health_status.h', 'src/core/ext/xds/xds_http_fault_filter.h', @@ -1780,6 +1781,7 @@ Pod::Spec.new do |s| 'src/core/ext/xds/xds_cluster.h', 'src/core/ext/xds/xds_cluster_specifier_plugin.h', 'src/core/ext/xds/xds_common_types.h', + 'src/core/ext/xds/xds_enabled_server.h', 'src/core/ext/xds/xds_endpoint.h', 'src/core/ext/xds/xds_health_status.h', 'src/core/ext/xds/xds_http_fault_filter.h', diff --git a/include/grpcpp/xds_server_builder.h b/include/grpcpp/xds_server_builder.h index 1962ac6cdd1..ae560595df6 100644 --- a/include/grpcpp/xds_server_builder.h +++ b/include/grpcpp/xds_server_builder.h @@ -23,6 +23,8 @@ #include +#include "src/core/ext/xds/xds_enabled_server.h" + namespace grpc { class XdsServerServingStatusNotifierInterface { @@ -85,6 +87,7 @@ class XdsServerBuilder : public grpc::ServerBuilder { args.SetInt(GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS, drain_grace_time_ms_); } + args.SetInt(GRPC_ARG_XDS_ENABLED_SERVER, 1); grpc_channel_args c_channel_args = args.c_channel_args(); grpc_server_config_fetcher* fetcher = grpc_server_config_fetcher_xds_create( {OnServingStatusUpdate, notifier_}, &c_channel_args); diff --git a/src/core/BUILD b/src/core/BUILD index be6c89730da..19d1d3333bb 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4179,6 +4179,14 @@ grpc_cc_library( deps = ["//:gpr_platform"], ) +grpc_cc_library( + name = "xds_enabled_server", + hdrs = [ + "ext/xds/xds_enabled_server.h", + ], + language = "c++", +) + grpc_cc_library( name = "grpc_xds_client", srcs = [ diff --git a/src/core/ext/xds/xds_enabled_server.h b/src/core/ext/xds/xds_enabled_server.h new file mode 100644 index 00000000000..ea6edffa6ae --- /dev/null +++ b/src/core/ext/xds/xds_enabled_server.h @@ -0,0 +1,24 @@ +// +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#ifndef GRPC_SRC_CORE_EXT_XDS_XDS_ENABLED_SERVER_H +#define GRPC_SRC_CORE_EXT_XDS_XDS_ENABLED_SERVER_H + +// EXPERIMENTAL. Bool-valued channel arg used as an indicator that a server is +// xds enabled. +#define GRPC_ARG_XDS_ENABLED_SERVER "grpc.experimental.xds_enabled_server" + +#endif // GRPC_SRC_CORE_EXT_XDS_XDS_ENABLED_SERVER_H diff --git a/src/core/lib/channel/call_tracer.cc b/src/core/lib/channel/call_tracer.cc index ae1e477b5f1..9763b89469f 100644 --- a/src/core/lib/channel/call_tracer.cc +++ b/src/core/lib/channel/call_tracer.cc @@ -46,7 +46,13 @@ ServerCallTracerFactory* ServerCallTracerFactory::Get( const ChannelArgs& channel_args) { ServerCallTracerFactory* factory = channel_args.GetObject(); - return factory != nullptr ? factory : g_server_call_tracer_factory_; + if (factory == nullptr) { + factory = g_server_call_tracer_factory_; + } + if (factory && factory->IsServerTraced(channel_args)) { + return factory; + } + return nullptr; } void ServerCallTracerFactory::RegisterGlobal(ServerCallTracerFactory* factory) { diff --git a/src/core/lib/channel/call_tracer.h b/src/core/lib/channel/call_tracer.h index ad7d8dd0a06..4410817be47 100644 --- a/src/core/lib/channel/call_tracer.h +++ b/src/core/lib/channel/call_tracer.h @@ -170,6 +170,9 @@ class ServerCallTracerFactory { virtual ServerCallTracer* CreateNewServerCallTracer(Arena* arena) = 0; + // Returns true if a server is to be traced, false otherwise. + virtual bool IsServerTraced(const ChannelArgs& /*args*/) { return true; } + // Use this method to get the server call tracer factory from channel args, // instead of directly fetching it with `GetObject`. static ServerCallTracerFactory* Get(const ChannelArgs& channel_args); diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index d567bcece38..2e674c0f3da 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -59,7 +59,6 @@ #include "src/core/lib/channel/call_finalization.h" #include "src/core/lib/channel/call_tracer.h" -#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/context.h" @@ -844,11 +843,10 @@ grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args, // collecting from when the call is created at the transport. The idea is // that the transport would create the call tracer and pass it in as part of // the metadata. - auto* server_call_tracer_factory = ServerCallTracerFactory::Get( - args->server != nullptr ? args->server->channel_args() : ChannelArgs()); - if (server_call_tracer_factory != nullptr) { + if (args->server->server_call_tracer_factory() != nullptr) { auto* server_call_tracer = - server_call_tracer_factory->CreateNewServerCallTracer(arena); + args->server->server_call_tracer_factory()->CreateNewServerCallTracer( + arena); if (server_call_tracer != nullptr) { // Note that we are setting both // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and @@ -3266,11 +3264,10 @@ ServerPromiseBasedCall::ServerPromiseBasedCall(Arena* arena, // collecting from when the call is created at the transport. The idea is that // the transport would create the call tracer and pass it in as part of the // metadata. - auto* server_call_tracer_factory = - ServerCallTracerFactory::Get(args->server->channel_args()); - if (server_call_tracer_factory != nullptr) { + if (args->server->server_call_tracer_factory() != nullptr) { auto* server_call_tracer = - server_call_tracer_factory->CreateNewServerCallTracer(arena); + args->server->server_call_tracer_factory()->CreateNewServerCallTracer( + arena); if (server_call_tracer != nullptr) { // Note that we are setting both // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index be829a0d57a..5bec18b059c 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -687,7 +687,9 @@ RefCountedPtr CreateChannelzNode( } // namespace Server::Server(const ChannelArgs& args) - : channel_args_(args), channelz_node_(CreateChannelzNode(args)) {} + : channel_args_(args), + channelz_node_(CreateChannelzNode(args)), + server_call_tracer_factory_(ServerCallTracerFactory::Get(args)) {} Server::~Server() { // Remove the cq pollsets from the config_fetcher. diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 05678146aaf..6916cf0000a 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -38,6 +38,7 @@ #include #include +#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/channel/channel_stack.h" @@ -135,6 +136,10 @@ class Server : public InternallyRefCounted, return config_fetcher_.get(); } + ServerCallTracerFactory* server_call_tracer_factory() const { + return server_call_tracer_factory_; + } + void set_config_fetcher( std::unique_ptr config_fetcher); @@ -428,6 +433,7 @@ class Server : public InternallyRefCounted, ChannelArgs const channel_args_; RefCountedPtr channelz_node_; std::unique_ptr config_fetcher_; + ServerCallTracerFactory* const server_call_tracer_factory_; std::vector cqs_; std::vector pollsets_; diff --git a/src/cpp/ext/csm/BUILD b/src/cpp/ext/csm/BUILD index 411a3b3075a..dfe4772e98e 100644 --- a/src/cpp/ext/csm/BUILD +++ b/src/cpp/ext/csm/BUILD @@ -57,6 +57,7 @@ grpc_cc_library( "//:grpc_base", "//:protobuf_struct_upb", "//:uri_parser", + "//src/core:channel_args", "//src/core:env", "//src/core:error", "//src/core:json", @@ -64,6 +65,7 @@ grpc_cc_library( "//src/core:json_object_loader", "//src/core:json_reader", "//src/core:slice", + "//src/core:xds_enabled_server", "//src/cpp/ext/otel:otel_plugin", ], ) diff --git a/src/cpp/ext/csm/csm_observability.cc b/src/cpp/ext/csm/csm_observability.cc index cff3d5d327c..0df41e782d8 100644 --- a/src/cpp/ext/csm/csm_observability.cc +++ b/src/cpp/ext/csm/csm_observability.cc @@ -23,8 +23,12 @@ #include #include +#include "absl/types/optional.h" + #include +#include "src/core/ext/xds/xds_enabled_server.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/uri/uri_parser.h" #include "src/cpp/ext/otel/otel_plugin.h" @@ -74,6 +78,9 @@ CsmObservabilityBuilder& CsmObservabilityBuilder::SetTargetAttributeFilter( } absl::StatusOr CsmObservabilityBuilder::BuildAndRegister() { + builder_.SetServerSelector([](const grpc_core::ChannelArgs& args) { + return args.GetBool(GRPC_ARG_XDS_ENABLED_SERVER).value_or(false); + }); builder_.BuildAndRegisterGlobal(); builder_.SetTargetSelector(CsmChannelTargetSelector); return CsmObservability(); diff --git a/src/cpp/ext/otel/otel_plugin.cc b/src/cpp/ext/otel/otel_plugin.cc index 8fcd57aa37a..cacb1f53227 100644 --- a/src/cpp/ext/otel/otel_plugin.cc +++ b/src/cpp/ext/otel/otel_plugin.cc @@ -160,6 +160,13 @@ OpenTelemetryPluginBuilder::SetTargetAttributeFilter( return *this; } +OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetServerSelector( + absl::AnyInvocable + server_selector) { + server_selector_ = std::move(server_selector); + return *this; +} + void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() { opentelemetry::nostd::shared_ptr meter_provider = meter_provider_; @@ -213,9 +220,10 @@ void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() { g_otel_plugin_state_->labels_injector = std::move(labels_injector_); g_otel_plugin_state_->target_attribute_filter = std::move(target_attribute_filter_); + g_otel_plugin_state_->server_selector = std::move(server_selector_); g_otel_plugin_state_->meter_provider = std::move(meter_provider); grpc_core::ServerCallTracerFactory::RegisterGlobal( - new grpc::internal::OpenTelemetryServerCallTracerFactory); + new grpc::internal::OpenTelemetryServerCallTracerFactory()); grpc_core::CoreConfiguration::RegisterBuilder( [target_selector = std::move(target_selector_)]( grpc_core::CoreConfiguration::Builder* builder) mutable { diff --git a/src/cpp/ext/otel/otel_plugin.h b/src/cpp/ext/otel/otel_plugin.h index 8b4355ce038..3dfeaa6013f 100644 --- a/src/cpp/ext/otel/otel_plugin.h +++ b/src/cpp/ext/otel/otel_plugin.h @@ -36,6 +36,7 @@ #include "opentelemetry/metrics/sync_instruments.h" #include "opentelemetry/nostd/shared_ptr.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/transport/metadata_batch.h" namespace grpc { @@ -99,6 +100,8 @@ struct OTelPluginState { std::unique_ptr labels_injector; absl::AnyInvocable target_attribute_filter; + absl::AnyInvocable + server_selector; }; const struct OTelPluginState& OTelPluginState(); @@ -147,6 +150,13 @@ class OpenTelemetryPluginBuilder { OpenTelemetryPluginBuilder& SetTargetSelector( absl::AnyInvocable target_selector); + // If set, \a server_selector is called per incoming call on the server + // to decide whether to collect metrics on that call or not. + // TODO(yashkt): We should only need to do this per server connection or even + // per server. Change this when we have a ServerTracer. + OpenTelemetryPluginBuilder& SetServerSelector( + absl::AnyInvocable + server_selector); // If set, \a target_attribute_filter is called per channel to decide whether // to record the target attribute on client or to replace it with "other". // This helps reduce the cardinality on metrics in cases where many channels @@ -164,6 +174,8 @@ class OpenTelemetryPluginBuilder { target_attribute_filter_; absl::flat_hash_set metrics_; absl::AnyInvocable target_selector_; + absl::AnyInvocable + server_selector_; }; } // namespace internal diff --git a/src/cpp/ext/otel/otel_server_call_tracer.cc b/src/cpp/ext/otel/otel_server_call_tracer.cc index e5ab55b7dfd..6c9f5e014be 100644 --- a/src/cpp/ext/otel/otel_server_call_tracer.cc +++ b/src/cpp/ext/otel/otel_server_call_tracer.cc @@ -26,6 +26,7 @@ #include #include +#include "absl/functional/any_invocable.h" #include "absl/strings/str_format.h" #include "absl/strings/string_view.h" #include "absl/strings/strip.h" @@ -199,5 +200,13 @@ OpenTelemetryServerCallTracerFactory::CreateNewServerCallTracer( return arena->ManagedNew(); } +bool OpenTelemetryServerCallTracerFactory::IsServerTraced( + const grpc_core::ChannelArgs& args) { + // Return true only if there is no server selector registered or if the server + // selector returns true. + return OTelPluginState().server_selector == nullptr || + OTelPluginState().server_selector(args); +} + } // namespace internal } // namespace grpc diff --git a/src/cpp/ext/otel/otel_server_call_tracer.h b/src/cpp/ext/otel/otel_server_call_tracer.h index 582c40a498a..2ce422e9d5a 100644 --- a/src/cpp/ext/otel/otel_server_call_tracer.h +++ b/src/cpp/ext/otel/otel_server_call_tracer.h @@ -22,6 +22,7 @@ #include #include "src/core/lib/channel/call_tracer.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/resource_quota/arena.h" namespace grpc { @@ -32,6 +33,8 @@ class OpenTelemetryServerCallTracerFactory public: grpc_core::ServerCallTracer* CreateNewServerCallTracer( grpc_core::Arena* arena) override; + + bool IsServerTraced(const grpc_core::ChannelArgs& args) override; }; } // namespace internal diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 42159e17351..bc2255fc402 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1982,6 +1982,7 @@ src/core/ext/xds/xds_cluster_specifier_plugin.cc \ src/core/ext/xds/xds_cluster_specifier_plugin.h \ src/core/ext/xds/xds_common_types.cc \ src/core/ext/xds/xds_common_types.h \ +src/core/ext/xds/xds_enabled_server.h \ src/core/ext/xds/xds_endpoint.cc \ src/core/ext/xds/xds_endpoint.h \ src/core/ext/xds/xds_health_status.cc \