[CSM] Add a server selector based on channel args (#34312)

I've added channel args to `CreateNewServerCallTracer` on the
`ServerCallTracerFactory`.
The motivation is for CSM Observability where the OTel plugin will be
configured to only do stats on servers which are xDS enabled, so I plan
to check this via channel args.
In the future, with the new scopes for metrics, I think I'll be able to
change this to only check once per server or server connection instead
of per call.

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/34375/head
Yash Tibrewal 1 year ago committed by GitHub
parent d589caa679
commit c145b7910e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 1
      build_autogenerated.yaml
  3. 2
      gRPC-C++.podspec
  4. 3
      include/grpcpp/xds_server_builder.h
  5. 8
      src/core/BUILD
  6. 24
      src/core/ext/xds/xds_enabled_server.h
  7. 8
      src/core/lib/channel/call_tracer.cc
  8. 3
      src/core/lib/channel/call_tracer.h
  9. 15
      src/core/lib/surface/call.cc
  10. 4
      src/core/lib/surface/server.cc
  11. 6
      src/core/lib/surface/server.h
  12. 2
      src/cpp/ext/csm/BUILD
  13. 7
      src/cpp/ext/csm/csm_observability.cc
  14. 10
      src/cpp/ext/otel/otel_plugin.cc
  15. 12
      src/cpp/ext/otel/otel_plugin.h
  16. 9
      src/cpp/ext/otel/otel_server_call_tracer.cc
  17. 3
      src/cpp/ext/otel/otel_server_call_tracer.h
  18. 1
      tools/doxygen/Doxyfile.c++.internal

@ -1157,6 +1157,7 @@ grpc_cc_library(
"gpr",
"grpc",
"grpc++_base",
"//src/core:xds_enabled_server",
],
)

@ -3551,6 +3551,7 @@ libs:
- src/core/ext/transport/binder/wire_format/wire_reader.h
- src/core/ext/transport/binder/wire_format/wire_reader_impl.h
- src/core/ext/transport/binder/wire_format/wire_writer.h
- src/core/ext/xds/xds_enabled_server.h
- src/cpp/client/client_stats_interceptor.h
- src/cpp/client/create_channel_internal.h
- src/cpp/client/secure_credentials.h

2
gRPC-C++.podspec generated

@ -711,6 +711,7 @@ Pod::Spec.new do |s|
'src/core/ext/xds/xds_cluster.h',
'src/core/ext/xds/xds_cluster_specifier_plugin.h',
'src/core/ext/xds/xds_common_types.h',
'src/core/ext/xds/xds_enabled_server.h',
'src/core/ext/xds/xds_endpoint.h',
'src/core/ext/xds/xds_health_status.h',
'src/core/ext/xds/xds_http_fault_filter.h',
@ -1780,6 +1781,7 @@ Pod::Spec.new do |s|
'src/core/ext/xds/xds_cluster.h',
'src/core/ext/xds/xds_cluster_specifier_plugin.h',
'src/core/ext/xds/xds_common_types.h',
'src/core/ext/xds/xds_enabled_server.h',
'src/core/ext/xds/xds_endpoint.h',
'src/core/ext/xds/xds_health_status.h',
'src/core/ext/xds/xds_http_fault_filter.h',

@ -23,6 +23,8 @@
#include <grpcpp/server_builder.h>
#include "src/core/ext/xds/xds_enabled_server.h"
namespace grpc {
class XdsServerServingStatusNotifierInterface {
@ -85,6 +87,7 @@ class XdsServerBuilder : public grpc::ServerBuilder {
args.SetInt(GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS,
drain_grace_time_ms_);
}
args.SetInt(GRPC_ARG_XDS_ENABLED_SERVER, 1);
grpc_channel_args c_channel_args = args.c_channel_args();
grpc_server_config_fetcher* fetcher = grpc_server_config_fetcher_xds_create(
{OnServingStatusUpdate, notifier_}, &c_channel_args);

@ -4179,6 +4179,14 @@ grpc_cc_library(
deps = ["//:gpr_platform"],
)
grpc_cc_library(
name = "xds_enabled_server",
hdrs = [
"ext/xds/xds_enabled_server.h",
],
language = "c++",
)
grpc_cc_library(
name = "grpc_xds_client",
srcs = [

@ -0,0 +1,24 @@
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_SRC_CORE_EXT_XDS_XDS_ENABLED_SERVER_H
#define GRPC_SRC_CORE_EXT_XDS_XDS_ENABLED_SERVER_H
// EXPERIMENTAL. Bool-valued channel arg used as an indicator that a server is
// xds enabled.
#define GRPC_ARG_XDS_ENABLED_SERVER "grpc.experimental.xds_enabled_server"
#endif // GRPC_SRC_CORE_EXT_XDS_XDS_ENABLED_SERVER_H

@ -46,7 +46,13 @@ ServerCallTracerFactory* ServerCallTracerFactory::Get(
const ChannelArgs& channel_args) {
ServerCallTracerFactory* factory =
channel_args.GetObject<ServerCallTracerFactory>();
return factory != nullptr ? factory : g_server_call_tracer_factory_;
if (factory == nullptr) {
factory = g_server_call_tracer_factory_;
}
if (factory && factory->IsServerTraced(channel_args)) {
return factory;
}
return nullptr;
}
void ServerCallTracerFactory::RegisterGlobal(ServerCallTracerFactory* factory) {

@ -170,6 +170,9 @@ class ServerCallTracerFactory {
virtual ServerCallTracer* CreateNewServerCallTracer(Arena* arena) = 0;
// Returns true if a server is to be traced, false otherwise.
virtual bool IsServerTraced(const ChannelArgs& /*args*/) { return true; }
// Use this method to get the server call tracer factory from channel args,
// instead of directly fetching it with `GetObject`.
static ServerCallTracerFactory* Get(const ChannelArgs& channel_args);

@ -59,7 +59,6 @@
#include "src/core/lib/channel/call_finalization.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/channel/context.h"
@ -844,11 +843,10 @@ grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
// collecting from when the call is created at the transport. The idea is
// that the transport would create the call tracer and pass it in as part of
// the metadata.
auto* server_call_tracer_factory = ServerCallTracerFactory::Get(
args->server != nullptr ? args->server->channel_args() : ChannelArgs());
if (server_call_tracer_factory != nullptr) {
if (args->server->server_call_tracer_factory() != nullptr) {
auto* server_call_tracer =
server_call_tracer_factory->CreateNewServerCallTracer(arena);
args->server->server_call_tracer_factory()->CreateNewServerCallTracer(
arena);
if (server_call_tracer != nullptr) {
// Note that we are setting both
// GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and
@ -3266,11 +3264,10 @@ ServerPromiseBasedCall::ServerPromiseBasedCall(Arena* arena,
// collecting from when the call is created at the transport. The idea is that
// the transport would create the call tracer and pass it in as part of the
// metadata.
auto* server_call_tracer_factory =
ServerCallTracerFactory::Get(args->server->channel_args());
if (server_call_tracer_factory != nullptr) {
if (args->server->server_call_tracer_factory() != nullptr) {
auto* server_call_tracer =
server_call_tracer_factory->CreateNewServerCallTracer(arena);
args->server->server_call_tracer_factory()->CreateNewServerCallTracer(
arena);
if (server_call_tracer != nullptr) {
// Note that we are setting both
// GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and

@ -687,7 +687,9 @@ RefCountedPtr<channelz::ServerNode> CreateChannelzNode(
} // namespace
Server::Server(const ChannelArgs& args)
: channel_args_(args), channelz_node_(CreateChannelzNode(args)) {}
: channel_args_(args),
channelz_node_(CreateChannelzNode(args)),
server_call_tracer_factory_(ServerCallTracerFactory::Get(args)) {}
Server::~Server() {
// Remove the cq pollsets from the config_fetcher.

@ -38,6 +38,7 @@
#include <grpc/slice.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
@ -135,6 +136,10 @@ class Server : public InternallyRefCounted<Server>,
return config_fetcher_.get();
}
ServerCallTracerFactory* server_call_tracer_factory() const {
return server_call_tracer_factory_;
}
void set_config_fetcher(
std::unique_ptr<grpc_server_config_fetcher> config_fetcher);
@ -428,6 +433,7 @@ class Server : public InternallyRefCounted<Server>,
ChannelArgs const channel_args_;
RefCountedPtr<channelz::ServerNode> channelz_node_;
std::unique_ptr<grpc_server_config_fetcher> config_fetcher_;
ServerCallTracerFactory* const server_call_tracer_factory_;
std::vector<grpc_completion_queue*> cqs_;
std::vector<grpc_pollset*> pollsets_;

@ -57,6 +57,7 @@ grpc_cc_library(
"//:grpc_base",
"//:protobuf_struct_upb",
"//:uri_parser",
"//src/core:channel_args",
"//src/core:env",
"//src/core:error",
"//src/core:json",
@ -64,6 +65,7 @@ grpc_cc_library(
"//src/core:json_object_loader",
"//src/core:json_reader",
"//src/core:slice",
"//src/core:xds_enabled_server",
"//src/cpp/ext/otel:otel_plugin",
],
)

@ -23,8 +23,12 @@
#include <string>
#include <utility>
#include "absl/types/optional.h"
#include <grpc/support/log.h>
#include "src/core/ext/xds/xds_enabled_server.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/uri/uri_parser.h"
#include "src/cpp/ext/otel/otel_plugin.h"
@ -74,6 +78,9 @@ CsmObservabilityBuilder& CsmObservabilityBuilder::SetTargetAttributeFilter(
}
absl::StatusOr<CsmObservability> CsmObservabilityBuilder::BuildAndRegister() {
builder_.SetServerSelector([](const grpc_core::ChannelArgs& args) {
return args.GetBool(GRPC_ARG_XDS_ENABLED_SERVER).value_or(false);
});
builder_.BuildAndRegisterGlobal();
builder_.SetTargetSelector(CsmChannelTargetSelector);
return CsmObservability();

@ -160,6 +160,13 @@ OpenTelemetryPluginBuilder::SetTargetAttributeFilter(
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetServerSelector(
absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
server_selector) {
server_selector_ = std::move(server_selector);
return *this;
}
void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
meter_provider = meter_provider_;
@ -213,9 +220,10 @@ void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
g_otel_plugin_state_->labels_injector = std::move(labels_injector_);
g_otel_plugin_state_->target_attribute_filter =
std::move(target_attribute_filter_);
g_otel_plugin_state_->server_selector = std::move(server_selector_);
g_otel_plugin_state_->meter_provider = std::move(meter_provider);
grpc_core::ServerCallTracerFactory::RegisterGlobal(
new grpc::internal::OpenTelemetryServerCallTracerFactory);
new grpc::internal::OpenTelemetryServerCallTracerFactory());
grpc_core::CoreConfiguration::RegisterBuilder(
[target_selector = std::move(target_selector_)](
grpc_core::CoreConfiguration::Builder* builder) mutable {

@ -36,6 +36,7 @@
#include "opentelemetry/metrics/sync_instruments.h"
#include "opentelemetry/nostd/shared_ptr.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/transport/metadata_batch.h"
namespace grpc {
@ -99,6 +100,8 @@ struct OTelPluginState {
std::unique_ptr<LabelsInjector> labels_injector;
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter;
absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
server_selector;
};
const struct OTelPluginState& OTelPluginState();
@ -147,6 +150,13 @@ class OpenTelemetryPluginBuilder {
OpenTelemetryPluginBuilder& SetTargetSelector(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_selector);
// If set, \a server_selector is called per incoming call on the server
// to decide whether to collect metrics on that call or not.
// TODO(yashkt): We should only need to do this per server connection or even
// per server. Change this when we have a ServerTracer.
OpenTelemetryPluginBuilder& SetServerSelector(
absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
server_selector);
// If set, \a target_attribute_filter is called per channel to decide whether
// to record the target attribute on client or to replace it with "other".
// This helps reduce the cardinality on metrics in cases where many channels
@ -164,6 +174,8 @@ class OpenTelemetryPluginBuilder {
target_attribute_filter_;
absl::flat_hash_set<std::string> metrics_;
absl::AnyInvocable<bool(absl::string_view /*target*/) const> target_selector_;
absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
server_selector_;
};
} // namespace internal

@ -26,6 +26,7 @@
#include <string>
#include <utility>
#include "absl/functional/any_invocable.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
@ -199,5 +200,13 @@ OpenTelemetryServerCallTracerFactory::CreateNewServerCallTracer(
return arena->ManagedNew<OpenTelemetryServerCallTracer>();
}
bool OpenTelemetryServerCallTracerFactory::IsServerTraced(
const grpc_core::ChannelArgs& args) {
// Return true only if there is no server selector registered or if the server
// selector returns true.
return OTelPluginState().server_selector == nullptr ||
OTelPluginState().server_selector(args);
}
} // namespace internal
} // namespace grpc

@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/resource_quota/arena.h"
namespace grpc {
@ -32,6 +33,8 @@ class OpenTelemetryServerCallTracerFactory
public:
grpc_core::ServerCallTracer* CreateNewServerCallTracer(
grpc_core::Arena* arena) override;
bool IsServerTraced(const grpc_core::ChannelArgs& args) override;
};
} // namespace internal

@ -1982,6 +1982,7 @@ src/core/ext/xds/xds_cluster_specifier_plugin.cc \
src/core/ext/xds/xds_cluster_specifier_plugin.h \
src/core/ext/xds/xds_common_types.cc \
src/core/ext/xds/xds_common_types.h \
src/core/ext/xds/xds_enabled_server.h \
src/core/ext/xds/xds_endpoint.cc \
src/core/ext/xds/xds_endpoint.h \
src/core/ext/xds/xds_health_status.cc \

Loading…
Cancel
Save