[OTPlugin] Per-channel OpenTelemetry plugin (#36729)

<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes #36729

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36729 from yijiem:per-channel-stats-plugin 4786bed42f
PiperOrigin-RevId: 642030366
pull/36855/head
Yijie Ma 10 months ago committed by Copybara-Service
parent fabf135c7f
commit 87321f08b3
  1. 1
      BUILD
  2. 2
      include/grpc/impl/channel_arg_names.h
  3. 35
      include/grpcpp/ext/otel_plugin.h
  4. 24
      src/core/lib/surface/legacy_channel.cc
  5. 8
      src/core/telemetry/metrics.h
  6. 10
      src/cpp/ext/otel/key_value_iterable.h
  7. 64
      src/cpp/ext/otel/otel_client_call_tracer.cc
  8. 12
      src/cpp/ext/otel/otel_client_call_tracer.h
  9. 114
      src/cpp/ext/otel/otel_plugin.cc
  10. 49
      src/cpp/ext/otel/otel_plugin.h
  11. 8
      src/cpp/ext/otel/otel_server_call_tracer.cc
  12. 12
      src/cpp/ext/otel/otel_server_call_tracer.h
  13. 8
      test/core/test_util/fake_stats_plugin.h
  14. 26
      test/cpp/ext/csm/csm_observability_test.cc
  15. 255
      test/cpp/ext/otel/otel_plugin_test.cc
  16. 52
      test/cpp/ext/otel/otel_test_library.cc
  17. 25
      test/cpp/ext/otel/otel_test_library.h

@ -2950,6 +2950,7 @@ grpc_cc_library(
],
language = "c++",
deps = [
":grpc++",
"//src/cpp/ext/otel:otel_plugin",
],
)

@ -398,6 +398,8 @@
* If unspecified, it is unlimited */
#define GRPC_ARG_MAX_ALLOWED_INCOMING_CONNECTIONS \
"grpc.max_allowed_incoming_connections"
/** Configure per-channel or per-server stats plugins. */
#define GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS "grpc.experimental.stats_plugins"
/** \} */
#endif /* GRPC_IMPL_CHANNEL_ARG_NAMES_H */

@ -26,17 +26,18 @@
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "opentelemetry/metrics/meter_provider.h"
#include <grpc/support/metrics.h>
#include <grpc/support/port_platform.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/support/channel_arguments.h>
namespace grpc {
namespace internal {
class OpenTelemetryPluginBuilderImpl;
class OpenTelemetryPlugin;
} // namespace internal
class OpenTelemetryPluginOption {
@ -44,6 +45,21 @@ class OpenTelemetryPluginOption {
virtual ~OpenTelemetryPluginOption() = default;
};
namespace experimental {
/// EXPERIMENTAL API
class OpenTelemetryPlugin {
public:
virtual ~OpenTelemetryPlugin() = default;
/// EXPERIMENTAL API
/// Adds this OpenTelemetryPlugin to the channel args \a args.
virtual void AddToChannelArguments(grpc::ChannelArguments* args) = 0;
/// EXPERIMENTAL API
/// Adds this OpenTelemetryPlugin to the channel arguments that will be used
/// to create the server through \a builder.
virtual void AddToServerBuilder(grpc::ServerBuilder* builder) = 0;
};
} // namespace experimental
/// The most common way to use this API is -
///
/// OpenTelemetryPluginBuilder().SetMeterProvider(provider).BuildAndRegister();
@ -113,8 +129,8 @@ class OpenTelemetryPluginBuilder {
/// If set, \a generic_method_attribute_filter is called per call with a
/// generic method type to decide whether to record the method name or to
/// replace it with "other". Non-generic or pre-registered methods remain
/// unaffected. If not set, by default, generic method names are replaced with
/// "other" when recording metrics.
/// unaffected. If not set, by default, generic method names are replaced
/// with "other" when recording metrics.
OpenTelemetryPluginBuilder& SetGenericMethodAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter);
@ -139,9 +155,16 @@ class OpenTelemetryPluginBuilder {
OpenTelemetryPluginBuilder& SetChannelScopeFilter(
absl::AnyInvocable<bool(const ChannelScope& /*scope*/) const>
channel_scope_filter);
/// Registers a global plugin that acts on all channels and servers running on
/// the process.
/// Builds and registers a global plugin that acts on all channels and servers
/// running on the process. Must be called no more than once and must not be
/// called if Build() is called.
absl::Status BuildAndRegisterGlobal();
/// EXPERIMENTAL API
/// Builds an open telemetry plugin, returns the plugin object when succeeded
/// or an error status when failed. Must be called no more than once and must
/// not be called if BuildAndRegisterGlobal() is called.
GRPC_MUST_USE_RESULT
absl::StatusOr<std::shared_ptr<experimental::OpenTelemetryPlugin>> Build();
private:
std::unique_ptr<internal::OpenTelemetryPluginBuilderImpl> impl_;

@ -92,14 +92,34 @@ absl::StatusOr<RefCountedPtr<Channel>> LegacyChannel::Create(
if (channel_stack_type == GRPC_SERVER_CHANNEL) {
*(*r)->stats_plugin_group =
GlobalStatsPluginRegistry::GetStatsPluginsForServer(args);
// Add per-server stats plugins.
auto* stats_plugin_list = args.GetPointer<
std::shared_ptr<std::vector<std::shared_ptr<StatsPlugin>>>>(
GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS);
if (stats_plugin_list != nullptr) {
for (const auto& plugin : **stats_plugin_list) {
(*r)->stats_plugin_group->AddStatsPlugin(
plugin, plugin->GetServerScopeConfig(args));
}
}
} else {
std::string authority = args.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY)
.value_or(CoreConfiguration::Get()
.resolver_registry()
.GetDefaultAuthority(target));
experimental::StatsPluginChannelScope scope(target, authority);
*(*r)->stats_plugin_group =
GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
experimental::StatsPluginChannelScope(target, authority));
GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope);
// Add per-channel stats plugins.
auto* stats_plugin_list = args.GetPointer<
std::shared_ptr<std::vector<std::shared_ptr<StatsPlugin>>>>(
GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS);
if (stats_plugin_list != nullptr) {
for (const auto& plugin : **stats_plugin_list) {
(*r)->stats_plugin_group->AddStatsPlugin(
plugin, plugin->GetChannelScopeConfig(scope));
}
}
}
return MakeRefCounted<LegacyChannel>(
grpc_channel_stack_type_is_client(builder.channel_stack_type()),

@ -289,6 +289,14 @@ class StatsPlugin {
// configure the ServerCallTracer in GetServerCallTracer().
virtual std::pair<bool, std::shared_ptr<ScopeConfig>> IsEnabledForServer(
const ChannelArgs& args) const = 0;
// Gets a scope config for the client channel specified by \a scope. Note that
// the stats plugin should have been enabled for the channel.
virtual std::shared_ptr<StatsPlugin::ScopeConfig> GetChannelScopeConfig(
const experimental::StatsPluginChannelScope& scope) const = 0;
// Gets a scope config for the server specified by \a args. Note that the
// stats plugin should have been enabled for the server.
virtual std::shared_ptr<StatsPlugin::ScopeConfig> GetServerScopeConfig(
const ChannelArgs& args) const = 0;
// Adds \a value to the uint64 counter specified by \a handle. \a label_values
// and \a optional_label_values specify attributes that are associated with

@ -47,7 +47,7 @@ inline opentelemetry::nostd::string_view AbslStrViewToOpenTelemetryStrView(
// An iterable class based on opentelemetry::common::KeyValueIterable that
// allows gRPC to iterate on its various sources of attributes and avoid an
// allocation in cases wherever possible.
class OpenTelemetryPlugin::KeyValueIterable
class OpenTelemetryPluginImpl::KeyValueIterable
: public opentelemetry::common::KeyValueIterable {
public:
KeyValueIterable(
@ -55,10 +55,10 @@ class OpenTelemetryPlugin::KeyValueIterable
injected_labels_from_plugin_options,
absl::Span<const std::pair<absl::string_view, absl::string_view>>
additional_labels,
const OpenTelemetryPlugin::ActivePluginOptionsView*
const OpenTelemetryPluginImpl::ActivePluginOptionsView*
active_plugin_options_view,
absl::Span<const grpc_core::RefCountedStringValue> optional_labels,
bool is_client, const OpenTelemetryPlugin* otel_plugin)
bool is_client, const OpenTelemetryPluginImpl* otel_plugin)
: injected_labels_from_plugin_options_(
injected_labels_from_plugin_options),
additional_labels_(additional_labels),
@ -149,11 +149,11 @@ class OpenTelemetryPlugin::KeyValueIterable
injected_labels_from_plugin_options_;
absl::Span<const std::pair<absl::string_view, absl::string_view>>
additional_labels_;
const OpenTelemetryPlugin::ActivePluginOptionsView*
const OpenTelemetryPluginImpl::ActivePluginOptionsView*
active_plugin_options_view_;
absl::Span<const grpc_core::RefCountedStringValue> optional_labels_;
bool is_client_;
const OpenTelemetryPlugin* otel_plugin_;
const OpenTelemetryPluginImpl* otel_plugin_;
};
} // namespace internal

@ -61,11 +61,12 @@ namespace grpc {
namespace internal {
//
// OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer
// OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer
//
OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::CallAttemptTracer(
const OpenTelemetryPlugin::ClientCallTracer* parent, bool arena_allocated)
OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::CallAttemptTracer(
const OpenTelemetryPluginImpl::ClientCallTracer* parent,
bool arena_allocated)
: parent_(parent),
arena_allocated_(arena_allocated),
start_time_(absl::Now()) {
@ -86,7 +87,7 @@ OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::CallAttemptTracer(
}
}
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) {
if (recv_initial_metadata != nullptr &&
recv_initial_metadata->get(grpc_core::GrpcTrailersOnly())
@ -97,7 +98,7 @@ void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
PopulateLabelInjectors(recv_initial_metadata);
}
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
parent_->scope_config_->active_plugin_options_view().ForEach(
[&](const InternalOpenTelemetryPluginOption& plugin_option,
@ -111,33 +112,33 @@ void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
parent_->otel_plugin_);
}
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordSendMessage(const grpc_core::SliceBuffer& send_message) {
RecordAnnotation(
absl::StrFormat("Send message: %ld bytes", send_message.Length()));
}
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordSendCompressedMessage(
const grpc_core::SliceBuffer& send_compressed_message) {
RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
send_compressed_message.Length()));
}
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message) {
RecordAnnotation(
absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
}
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& recv_decompressed_message) {
RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
recv_decompressed_message.Length()));
}
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats* transport_stream_stats) {
@ -179,10 +180,10 @@ void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
}
}
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordCancel(
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::RecordCancel(
absl::Status /*cancel_error*/) {}
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordEnd(
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::RecordEnd(
const gpr_timespec& /*latency*/) {
if (arena_allocated_) {
this->~CallAttemptTracer();
@ -191,29 +192,30 @@ void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordEnd(
}
}
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordAnnotation(
absl::string_view /*annotation*/) {
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordAnnotation(absl::string_view /*annotation*/) {
// Not implemented
}
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordAnnotation(
const Annotation& /*annotation*/) {
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
RecordAnnotation(const Annotation& /*annotation*/) {
// Not implemented
}
std::shared_ptr<grpc_core::TcpTracerInterface>
OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::StartNewTcpTrace() {
std::shared_ptr<grpc_core::TcpTracerInterface> OpenTelemetryPluginImpl::
ClientCallTracer::CallAttemptTracer::StartNewTcpTrace() {
// No TCP trace.
return nullptr;
}
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::SetOptionalLabel(
OptionalLabelKey key, grpc_core::RefCountedStringValue value) {
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
SetOptionalLabel(OptionalLabelKey key,
grpc_core::RefCountedStringValue value) {
CHECK(key < OptionalLabelKey::kSize);
optional_labels_[static_cast<size_t>(key)] = std::move(value);
}
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
void OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer::
PopulateLabelInjectors(grpc_metadata_batch* metadata) {
parent_->scope_config_->active_plugin_options_view().ForEach(
[&](const InternalOpenTelemetryPluginOption& plugin_option,
@ -229,23 +231,23 @@ void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
}
//
// OpenTelemetryPlugin::ClientCallTracer
// OpenTelemetryPluginImpl::ClientCallTracer
//
OpenTelemetryPlugin::ClientCallTracer::ClientCallTracer(
OpenTelemetryPluginImpl::ClientCallTracer::ClientCallTracer(
const grpc_core::Slice& path, grpc_core::Arena* arena,
bool registered_method, OpenTelemetryPlugin* otel_plugin,
std::shared_ptr<OpenTelemetryPlugin::ClientScopeConfig> scope_config)
bool registered_method, OpenTelemetryPluginImpl* otel_plugin,
std::shared_ptr<OpenTelemetryPluginImpl::ClientScopeConfig> scope_config)
: path_(path.Ref()),
arena_(arena),
registered_method_(registered_method),
otel_plugin_(otel_plugin),
scope_config_(std::move(scope_config)) {}
OpenTelemetryPlugin::ClientCallTracer::~ClientCallTracer() {}
OpenTelemetryPluginImpl::ClientCallTracer::~ClientCallTracer() {}
OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer*
OpenTelemetryPlugin::ClientCallTracer::StartNewAttempt(
OpenTelemetryPluginImpl::ClientCallTracer::CallAttemptTracer*
OpenTelemetryPluginImpl::ClientCallTracer::StartNewAttempt(
bool is_transparent_retry) {
// We allocate the first attempt on the arena and all subsequent attempts
// on the heap, so that in the common case we don't require a heap
@ -268,7 +270,7 @@ OpenTelemetryPlugin::ClientCallTracer::StartNewAttempt(
return new CallAttemptTracer(this, /*arena_allocated=*/false);
}
absl::string_view OpenTelemetryPlugin::ClientCallTracer::MethodForStats()
absl::string_view OpenTelemetryPluginImpl::ClientCallTracer::MethodForStats()
const {
absl::string_view method = absl::StripPrefix(path_.as_string_view(), "/");
if (registered_method_ ||
@ -279,12 +281,12 @@ absl::string_view OpenTelemetryPlugin::ClientCallTracer::MethodForStats()
return "other";
}
void OpenTelemetryPlugin::ClientCallTracer::RecordAnnotation(
void OpenTelemetryPluginImpl::ClientCallTracer::RecordAnnotation(
absl::string_view /*annotation*/) {
// Not implemented
}
void OpenTelemetryPlugin::ClientCallTracer::RecordAnnotation(
void OpenTelemetryPluginImpl::ClientCallTracer::RecordAnnotation(
const Annotation& /*annotation*/) {
// Not implemented
}

@ -46,13 +46,13 @@
namespace grpc {
namespace internal {
class OpenTelemetryPlugin::ClientCallTracer
class OpenTelemetryPluginImpl::ClientCallTracer
: public grpc_core::ClientCallTracer {
public:
class CallAttemptTracer
: public grpc_core::ClientCallTracer::CallAttemptTracer {
public:
CallAttemptTracer(const OpenTelemetryPlugin::ClientCallTracer* parent,
CallAttemptTracer(const OpenTelemetryPluginImpl::ClientCallTracer* parent,
bool arena_allocated);
std::string TraceId() override {
@ -113,8 +113,8 @@ class OpenTelemetryPlugin::ClientCallTracer
ClientCallTracer(
const grpc_core::Slice& path, grpc_core::Arena* arena,
bool registered_method, OpenTelemetryPlugin* otel_plugin,
std::shared_ptr<OpenTelemetryPlugin::ClientScopeConfig> scope_config);
bool registered_method, OpenTelemetryPluginImpl* otel_plugin,
std::shared_ptr<OpenTelemetryPluginImpl::ClientScopeConfig> scope_config);
~ClientCallTracer() override;
std::string TraceId() override {
@ -143,8 +143,8 @@ class OpenTelemetryPlugin::ClientCallTracer
grpc_core::Slice path_;
grpc_core::Arena* arena_;
const bool registered_method_;
OpenTelemetryPlugin* otel_plugin_;
std::shared_ptr<OpenTelemetryPlugin::ClientScopeConfig> scope_config_;
OpenTelemetryPluginImpl* otel_plugin_;
std::shared_ptr<OpenTelemetryPluginImpl::ClientScopeConfig> scope_config_;
grpc_core::Mutex mu_;
// Non-transparent attempts per call
uint64_t retries_ ABSL_GUARDED_BY(&mu_) = 0;

@ -86,7 +86,7 @@ absl::flat_hash_set<std::string> BaseMetrics() {
}
} // namespace
class OpenTelemetryPlugin::NPCMetricsKeyValueIterable
class OpenTelemetryPluginImpl::NPCMetricsKeyValueIterable
: public opentelemetry::common::KeyValueIterable {
public:
NPCMetricsKeyValueIterable(
@ -229,10 +229,11 @@ OpenTelemetryPluginBuilderImpl::SetChannelScopeFilter(
absl::Status OpenTelemetryPluginBuilderImpl::BuildAndRegisterGlobal() {
if (meter_provider_ == nullptr) {
return absl::OkStatus();
return absl::InvalidArgumentError(
"Need to configure a valid meter provider.");
}
grpc_core::GlobalStatsPluginRegistry::RegisterStatsPlugin(
std::make_shared<OpenTelemetryPlugin>(
std::make_shared<OpenTelemetryPluginImpl>(
metrics_, meter_provider_, std::move(target_attribute_filter_),
std::move(generic_method_attribute_filter_),
std::move(server_selector_), std::move(plugin_options_),
@ -240,8 +241,22 @@ absl::Status OpenTelemetryPluginBuilderImpl::BuildAndRegisterGlobal() {
return absl::OkStatus();
}
OpenTelemetryPlugin::CallbackMetricReporter::CallbackMetricReporter(
OpenTelemetryPlugin* ot_plugin, grpc_core::RegisteredMetricCallback* key)
absl::StatusOr<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>>
OpenTelemetryPluginBuilderImpl::Build() {
if (meter_provider_ == nullptr) {
return absl::InvalidArgumentError(
"Need to configure a valid meter provider.");
}
return std::make_shared<OpenTelemetryPluginImpl>(
metrics_, meter_provider_, std::move(target_attribute_filter_),
std::move(generic_method_attribute_filter_), std::move(server_selector_),
std::move(plugin_options_), std::move(optional_label_keys_),
std::move(channel_scope_filter_));
}
OpenTelemetryPluginImpl::CallbackMetricReporter::CallbackMetricReporter(
OpenTelemetryPluginImpl* ot_plugin,
grpc_core::RegisteredMetricCallback* key)
: ot_plugin_(ot_plugin), key_(key) {
// Since we are updating the timestamp and updating the cache for all
// registered instruments in a RegisteredMetricCallback, we will need to
@ -275,7 +290,7 @@ OpenTelemetryPlugin::CallbackMetricReporter::CallbackMetricReporter(
}
}
void OpenTelemetryPlugin::CallbackMetricReporter::ReportInt64(
void OpenTelemetryPluginImpl::CallbackMetricReporter::ReportInt64(
grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
int64_t value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
@ -303,7 +318,7 @@ void OpenTelemetryPlugin::CallbackMetricReporter::ReportInt64(
cell.insert_or_assign(std::move(key), value);
}
void OpenTelemetryPlugin::CallbackMetricReporter::ReportDouble(
void OpenTelemetryPluginImpl::CallbackMetricReporter::ReportDouble(
grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
double value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
@ -331,7 +346,12 @@ void OpenTelemetryPlugin::CallbackMetricReporter::ReportDouble(
cell.insert_or_assign(std::move(key), value);
}
OpenTelemetryPlugin::OpenTelemetryPlugin(
void OpenTelemetryPluginImpl::ServerBuilderOption::UpdateArguments(
grpc::ChannelArguments* args) {
plugin_->AddToChannelArguments(args);
}
OpenTelemetryPluginImpl::OpenTelemetryPluginImpl(
const absl::flat_hash_set<std::string>& metrics,
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
meter_provider,
@ -548,7 +568,7 @@ namespace {
constexpr absl::string_view kLocality = "grpc.lb.locality";
}
absl::string_view OpenTelemetryPlugin::OptionalLabelKeyToString(
absl::string_view OpenTelemetryPluginImpl::OptionalLabelKeyToString(
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey key) {
switch (key) {
case grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey::
@ -560,7 +580,7 @@ absl::string_view OpenTelemetryPlugin::OptionalLabelKeyToString(
}
absl::optional<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey>
OpenTelemetryPlugin::OptionalLabelStringToKey(absl::string_view key) {
OpenTelemetryPluginImpl::OptionalLabelStringToKey(absl::string_view key) {
if (key == kLocality) {
return grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey::
kLocality;
@ -569,7 +589,7 @@ OpenTelemetryPlugin::OptionalLabelStringToKey(absl::string_view key) {
}
std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
OpenTelemetryPlugin::IsEnabledForChannel(
OpenTelemetryPluginImpl::IsEnabledForChannel(
const OpenTelemetryPluginBuilder::ChannelScope& scope) const {
if (channel_scope_filter_ == nullptr || channel_scope_filter_(scope)) {
return {true, std::make_shared<ClientScopeConfig>(this, scope)};
@ -578,7 +598,7 @@ OpenTelemetryPlugin::IsEnabledForChannel(
}
std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
OpenTelemetryPlugin::IsEnabledForServer(
OpenTelemetryPluginImpl::IsEnabledForServer(
const grpc_core::ChannelArgs& args) const {
// Return true only if there is no server selector registered or if the
// server selector returns true.
@ -588,7 +608,21 @@ OpenTelemetryPlugin::IsEnabledForServer(
return {false, nullptr};
}
void OpenTelemetryPlugin::AddCounter(
std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>
OpenTelemetryPluginImpl::GetChannelScopeConfig(
const OpenTelemetryPluginBuilder::ChannelScope& scope) const {
GPR_ASSERT(channel_scope_filter_ == nullptr || channel_scope_filter_(scope));
return std::make_shared<ClientScopeConfig>(this, scope);
}
std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>
OpenTelemetryPluginImpl::GetServerScopeConfig(
const grpc_core::ChannelArgs& args) const {
GPR_ASSERT(server_selector_ == nullptr || server_selector_(args));
return std::make_shared<ServerScopeConfig>(this, args);
}
void OpenTelemetryPluginImpl::AddCounter(
grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
uint64_t value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
@ -612,7 +646,7 @@ void OpenTelemetryPlugin::AddCounter(
instrument_data.optional_labels_bits));
}
void OpenTelemetryPlugin::AddCounter(
void OpenTelemetryPluginImpl::AddCounter(
grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
double value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
@ -636,7 +670,7 @@ void OpenTelemetryPlugin::AddCounter(
instrument_data.optional_labels_bits));
}
void OpenTelemetryPlugin::RecordHistogram(
void OpenTelemetryPluginImpl::RecordHistogram(
grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
uint64_t value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
@ -662,7 +696,7 @@ void OpenTelemetryPlugin::RecordHistogram(
opentelemetry::context::Context{});
}
void OpenTelemetryPlugin::RecordHistogram(
void OpenTelemetryPluginImpl::RecordHistogram(
grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
double value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
@ -688,7 +722,7 @@ void OpenTelemetryPlugin::RecordHistogram(
opentelemetry::context::Context{});
}
void OpenTelemetryPlugin::AddCallback(
void OpenTelemetryPluginImpl::AddCallback(
grpc_core::RegisteredMetricCallback* callback) {
std::vector<
absl::variant<CallbackGaugeState<int64_t>*, CallbackGaugeState<double>*>>
@ -764,7 +798,7 @@ void OpenTelemetryPlugin::AddCallback(
}
}
void OpenTelemetryPlugin::RemoveCallback(
void OpenTelemetryPluginImpl::RemoveCallback(
grpc_core::RegisteredMetricCallback* callback) {
std::vector<
absl::variant<CallbackGaugeState<int64_t>*, CallbackGaugeState<double>*>>
@ -841,7 +875,7 @@ void OpenTelemetryPlugin::RemoveCallback(
}
template <typename ValueType>
void OpenTelemetryPlugin::CallbackGaugeState<ValueType>::Observe(
void OpenTelemetryPluginImpl::CallbackGaugeState<ValueType>::Observe(
opentelemetry::metrics::ObserverResult& result, const Cache& cache) {
const auto& descriptor =
grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor({id});
@ -868,8 +902,9 @@ void OpenTelemetryPlugin::CallbackGaugeState<ValueType>::Observe(
// OpenTelemetry calls our callback with its observable_registry's lock
// held.
template <typename ValueType>
void OpenTelemetryPlugin::CallbackGaugeState<ValueType>::CallbackGaugeCallback(
opentelemetry::metrics::ObserverResult result, void* arg) {
void OpenTelemetryPluginImpl::CallbackGaugeState<ValueType>::
CallbackGaugeCallback(opentelemetry::metrics::ObserverResult result,
void* arg) {
auto* callback_gauge_state = static_cast<CallbackGaugeState<ValueType>*>(arg);
auto now = grpc_core::Timestamp::Now();
grpc_core::MutexLock plugin_lock(&callback_gauge_state->ot_plugin->mu_);
@ -892,32 +927,54 @@ void OpenTelemetryPlugin::CallbackGaugeState<ValueType>::CallbackGaugeCallback(
}
}
grpc_core::ClientCallTracer* OpenTelemetryPlugin::GetClientCallTracer(
grpc_core::ClientCallTracer* OpenTelemetryPluginImpl::GetClientCallTracer(
const grpc_core::Slice& path, bool registered_method,
std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config) {
return grpc_core::GetContext<grpc_core::Arena>()
->ManagedNew<ClientCallTracer>(
path, grpc_core::GetContext<grpc_core::Arena>(), registered_method,
this,
std::static_pointer_cast<OpenTelemetryPlugin::ClientScopeConfig>(
std::static_pointer_cast<OpenTelemetryPluginImpl::ClientScopeConfig>(
scope_config));
}
grpc_core::ServerCallTracer* OpenTelemetryPlugin::GetServerCallTracer(
grpc_core::ServerCallTracer* OpenTelemetryPluginImpl::GetServerCallTracer(
std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config) {
return grpc_core::GetContext<grpc_core::Arena>()
->ManagedNew<ServerCallTracer>(
this,
std::static_pointer_cast<OpenTelemetryPlugin::ServerScopeConfig>(
std::static_pointer_cast<OpenTelemetryPluginImpl::ServerScopeConfig>(
scope_config));
}
bool OpenTelemetryPlugin::IsInstrumentEnabled(
bool OpenTelemetryPluginImpl::IsInstrumentEnabled(
grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle) const {
return !absl::holds_alternative<Disabled>(
instruments_data_.at(handle.index).instrument);
}
void OpenTelemetryPluginImpl::AddToChannelArguments(
grpc::ChannelArguments* args) {
const grpc_channel_args c_args = args->c_channel_args();
auto* stats_plugin_list = grpc_channel_args_find_pointer<
std::shared_ptr<std::vector<std::shared_ptr<grpc_core::StatsPlugin>>>>(
&c_args, GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS);
if (stats_plugin_list != nullptr) {
(*stats_plugin_list)->emplace_back(shared_from_this());
} else {
auto stats_plugin_list = std::make_shared<
std::vector<std::shared_ptr<grpc_core::StatsPlugin>>>();
args->SetPointerWithVtable(
GRPC_ARG_EXPERIMENTAL_STATS_PLUGINS, &stats_plugin_list,
grpc_core::ChannelArgTypeTraits<decltype(stats_plugin_list)>::VTable());
stats_plugin_list->emplace_back(shared_from_this());
}
}
void OpenTelemetryPluginImpl::AddToServerBuilder(grpc::ServerBuilder* builder) {
builder->SetOption(std::make_unique<ServerBuilderOption>(shared_from_this()));
}
} // namespace internal
constexpr absl::string_view
@ -1012,4 +1069,9 @@ absl::Status OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
return impl_->BuildAndRegisterGlobal();
}
absl::StatusOr<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>>
OpenTelemetryPluginBuilder::Build() {
return impl_->Build();
}
} // namespace grpc

@ -40,6 +40,7 @@
#include <grpc/support/port_platform.h>
#include <grpcpp/ext/otel_plugin.h>
#include <grpcpp/impl/server_builder_option.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/transport/metadata_batch.h"
@ -176,6 +177,8 @@ class OpenTelemetryPluginBuilderImpl {
bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
channel_scope_filter);
absl::Status BuildAndRegisterGlobal();
absl::StatusOr<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>>
Build();
const absl::flat_hash_set<std::string>& TestOnlyEnabledMetrics() {
return metrics_;
@ -199,9 +202,12 @@ class OpenTelemetryPluginBuilderImpl {
channel_scope_filter_;
};
class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
class OpenTelemetryPluginImpl
: public grpc::experimental::OpenTelemetryPlugin,
public grpc_core::StatsPlugin,
public std::enable_shared_from_this<OpenTelemetryPluginImpl> {
public:
OpenTelemetryPlugin(
OpenTelemetryPluginImpl(
const absl::flat_hash_set<std::string>& metrics,
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
meter_provider,
@ -229,7 +235,7 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
class ActivePluginOptionsView {
public:
static ActivePluginOptionsView MakeForClient(
absl::string_view target, const OpenTelemetryPlugin* otel_plugin) {
absl::string_view target, const OpenTelemetryPluginImpl* otel_plugin) {
return ActivePluginOptionsView(
[target](const InternalOpenTelemetryPluginOption& plugin_option) {
return plugin_option.IsActiveOnClientChannel(target);
@ -239,7 +245,7 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
static ActivePluginOptionsView MakeForServer(
const grpc_core::ChannelArgs& args,
const OpenTelemetryPlugin* otel_plugin) {
const OpenTelemetryPluginImpl* otel_plugin) {
return ActivePluginOptionsView(
[&args](const InternalOpenTelemetryPluginOption& plugin_option) {
return plugin_option.IsActiveOnServer(args);
@ -250,7 +256,7 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
bool ForEach(absl::FunctionRef<
bool(const InternalOpenTelemetryPluginOption&, size_t)>
func,
const OpenTelemetryPlugin* otel_plugin) const {
const OpenTelemetryPluginImpl* otel_plugin) const {
for (size_t i = 0; i < otel_plugin->plugin_options().size(); ++i) {
const auto& plugin_option = otel_plugin->plugin_options()[i];
if (active_mask_[i] && !func(*plugin_option, i)) {
@ -263,7 +269,7 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
private:
explicit ActivePluginOptionsView(
absl::FunctionRef<bool(const InternalOpenTelemetryPluginOption&)> func,
const OpenTelemetryPlugin* otel_plugin) {
const OpenTelemetryPluginImpl* otel_plugin) {
for (size_t i = 0; i < otel_plugin->plugin_options().size(); ++i) {
const auto& plugin_option = otel_plugin->plugin_options()[i];
if (plugin_option != nullptr && func(*plugin_option)) {
@ -277,7 +283,7 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
class ClientScopeConfig : public grpc_core::StatsPlugin::ScopeConfig {
public:
ClientScopeConfig(const OpenTelemetryPlugin* otel_plugin,
ClientScopeConfig(const OpenTelemetryPluginImpl* otel_plugin,
const OpenTelemetryPluginBuilder::ChannelScope& scope)
: active_plugin_options_view_(ActivePluginOptionsView::MakeForClient(
scope.target(), otel_plugin)),
@ -302,7 +308,7 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
};
class ServerScopeConfig : public grpc_core::StatsPlugin::ScopeConfig {
public:
ServerScopeConfig(const OpenTelemetryPlugin* otel_plugin,
ServerScopeConfig(const OpenTelemetryPluginImpl* otel_plugin,
const grpc_core::ChannelArgs& args)
: active_plugin_options_view_(
ActivePluginOptionsView::MakeForServer(args, otel_plugin)) {}
@ -339,7 +345,7 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
// This object should be used inline.
class CallbackMetricReporter : public grpc_core::CallbackMetricReporter {
public:
CallbackMetricReporter(OpenTelemetryPlugin* ot_plugin,
CallbackMetricReporter(OpenTelemetryPluginImpl* ot_plugin,
grpc_core::RegisteredMetricCallback* key)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(ot_plugin->mu_);
@ -357,10 +363,23 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(
CallbackGaugeState<double>::ot_plugin->mu_) override;
OpenTelemetryPlugin* ot_plugin_;
OpenTelemetryPluginImpl* ot_plugin_;
grpc_core::RegisteredMetricCallback* key_;
};
class ServerBuilderOption : public grpc::ServerBuilderOption {
public:
explicit ServerBuilderOption(
std::shared_ptr<OpenTelemetryPluginImpl> plugin)
: plugin_(std::move(plugin)) {}
void UpdateArguments(grpc::ChannelArguments* args) override;
void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>*
/*plugins*/) override {}
private:
std::shared_ptr<OpenTelemetryPluginImpl> plugin_;
};
// Returns the string form of \a key
static absl::string_view OptionalLabelKeyToString(
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey key);
@ -371,12 +390,20 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey>
OptionalLabelStringToKey(absl::string_view key);
// grpc::OpenTelemetryPlugin:
void AddToChannelArguments(grpc::ChannelArguments* args) override;
void AddToServerBuilder(grpc::ServerBuilder* builder) override;
// StatsPlugin:
std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
IsEnabledForChannel(
const OpenTelemetryPluginBuilder::ChannelScope& scope) const override;
std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
IsEnabledForServer(const grpc_core::ChannelArgs& args) const override;
std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> GetChannelScopeConfig(
const OpenTelemetryPluginBuilder::ChannelScope& scope) const override;
std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> GetServerScopeConfig(
const grpc_core::ChannelArgs& args) const override;
void AddCounter(
grpc_core::GlobalInstrumentsRegistry::GlobalInstrumentHandle handle,
uint64_t value, absl::Span<const absl::string_view> label_values,
@ -442,7 +469,7 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
// One instrument can be registered by multiple callbacks.
absl::flat_hash_map<grpc_core::RegisteredMetricCallback*, Cache> caches
ABSL_GUARDED_BY(ot_plugin->mu_);
OpenTelemetryPlugin* ot_plugin;
OpenTelemetryPluginImpl* ot_plugin;
static void CallbackGaugeCallback(
opentelemetry::metrics::ObserverResult result, void* arg)

@ -49,7 +49,7 @@
namespace grpc {
namespace internal {
void OpenTelemetryPlugin::ServerCallTracer::RecordReceivedInitialMetadata(
void OpenTelemetryPluginImpl::ServerCallTracer::RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) {
path_ =
recv_initial_metadata->get_pointer(grpc_core::HttpPathMetadata())->Ref();
@ -80,7 +80,7 @@ void OpenTelemetryPlugin::ServerCallTracer::RecordReceivedInitialMetadata(
}
}
void OpenTelemetryPlugin::ServerCallTracer::RecordSendInitialMetadata(
void OpenTelemetryPluginImpl::ServerCallTracer::RecordSendInitialMetadata(
grpc_metadata_batch* send_initial_metadata) {
scope_config_->active_plugin_options_view().ForEach(
[&](const InternalOpenTelemetryPluginOption& plugin_option,
@ -96,14 +96,14 @@ void OpenTelemetryPlugin::ServerCallTracer::RecordSendInitialMetadata(
otel_plugin_);
}
void OpenTelemetryPlugin::ServerCallTracer::RecordSendTrailingMetadata(
void OpenTelemetryPluginImpl::ServerCallTracer::RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) {
// We need to record the time when the trailing metadata was sent to
// mark the completeness of the request.
elapsed_time_ = absl::Now() - start_time_;
}
void OpenTelemetryPlugin::ServerCallTracer::RecordEnd(
void OpenTelemetryPluginImpl::ServerCallTracer::RecordEnd(
const grpc_call_final_info* final_info) {
std::array<std::pair<absl::string_view, absl::string_view>, 2>
additional_labels = {

@ -30,14 +30,14 @@
namespace grpc {
namespace internal {
// OpenTelemetryPlugin::ServerCallTracer implementation
// OpenTelemetryPluginImpl::ServerCallTracer implementation
class OpenTelemetryPlugin::ServerCallTracer
class OpenTelemetryPluginImpl::ServerCallTracer
: public grpc_core::ServerCallTracer {
public:
ServerCallTracer(
OpenTelemetryPlugin* otel_plugin,
std::shared_ptr<OpenTelemetryPlugin::ServerScopeConfig> scope_config)
OpenTelemetryPluginImpl* otel_plugin,
std::shared_ptr<OpenTelemetryPluginImpl::ServerScopeConfig> scope_config)
: start_time_(absl::Now()),
injected_labels_from_plugin_options_(
otel_plugin->plugin_options().size()),
@ -129,8 +129,8 @@ class OpenTelemetryPlugin::ServerCallTracer
bool registered_method_;
std::vector<std::unique_ptr<LabelsIterable>>
injected_labels_from_plugin_options_;
OpenTelemetryPlugin* otel_plugin_;
std::shared_ptr<OpenTelemetryPlugin::ServerScopeConfig> scope_config_;
OpenTelemetryPluginImpl* otel_plugin_;
std::shared_ptr<OpenTelemetryPluginImpl::ServerScopeConfig> scope_config_;
};
} // namespace internal

@ -270,6 +270,14 @@ class FakeStatsPlugin : public StatsPlugin {
const ChannelArgs& /*args*/) const override {
return {true, nullptr};
}
std::shared_ptr<StatsPlugin::ScopeConfig> GetChannelScopeConfig(
const experimental::StatsPluginChannelScope& /*scope*/) const override {
return nullptr;
}
std::shared_ptr<StatsPlugin::ScopeConfig> GetServerScopeConfig(
const ChannelArgs& /*args*/) const override {
return nullptr;
}
void AddCounter(
GlobalInstrumentsRegistry::GlobalInstrumentHandle handle, uint64_t value,

@ -20,6 +20,7 @@
#include "google/cloud/opentelemetry/resource_detector.h"
#include "gtest/gtest.h"
#include "opentelemetry/sdk/metrics/meter_provider.h"
#include <grpcpp/ext/csm_observability.h>
#include <grpcpp/ext/otel_plugin.h>
@ -32,8 +33,13 @@ namespace testing {
namespace {
TEST(CsmObservabilityBuilderTest, Basic) {
EXPECT_EQ(CsmObservabilityBuilder().BuildAndRegister().status(),
absl::OkStatus());
EXPECT_EQ(
CsmObservabilityBuilder()
.SetMeterProvider(
std::make_shared<opentelemetry::sdk::metrics::MeterProvider>())
.BuildAndRegister()
.status(),
absl::OkStatus());
}
TEST(GsmDependencyTest, GoogleCloudOpenTelemetryDependency) {
@ -68,7 +74,13 @@ TEST(CsmChannelTargetSelectorTest, XdsTargetsWithTDAuthority) {
}
TEST(CsmChannelTargetSelectorTest, CsmObservabilityOutOfScope) {
{ auto obs = CsmObservabilityBuilder().BuildAndRegister(); }
{
auto obs =
CsmObservabilityBuilder()
.SetMeterProvider(
std::make_shared<opentelemetry::sdk::metrics::MeterProvider>())
.BuildAndRegister();
}
// When CsmObservability goes out of scope, the target selector should return
// false as well.
EXPECT_FALSE(internal::CsmChannelTargetSelector("foo.bar.google.com"));
@ -83,7 +95,13 @@ TEST(CsmServerSelectorTest, ChannelArgs) {
}
TEST(CsmServerSelectorTest, CsmObservabilityOutOfScope) {
{ auto obs = CsmObservabilityBuilder().BuildAndRegister(); }
{
auto obs =
CsmObservabilityBuilder()
.SetMeterProvider(
std::make_shared<opentelemetry::sdk::metrics::MeterProvider>())
.BuildAndRegister();
}
// When CsmObservability goes out of scope, the server selector should return
// false as well.
EXPECT_FALSE(internal::CsmServerSelector(grpc_core::ChannelArgs()));

@ -30,6 +30,7 @@
#include "opentelemetry/metrics/provider.h"
#include "opentelemetry/nostd/variant.h"
#include "opentelemetry/sdk/common/attribute_utils.h"
#include "opentelemetry/sdk/metrics/data/metric_data.h"
#include "opentelemetry/sdk/metrics/data/point_data.h"
#include "opentelemetry/sdk/metrics/meter_provider.h"
#include "opentelemetry/sdk/metrics/metric_reader.h"
@ -1524,7 +1525,6 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest,
.Build();
// Build and register a separate OpenTelemetryPlugin and verify its histogram
// recording.
grpc::internal::OpenTelemetryPluginBuilderImpl ot_builder;
auto reader = BuildAndRegisterOpenTelemetryPlugin(std::move(
Options()
.set_metric_names({kMetricName})
@ -1534,7 +1534,6 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest,
})
.add_optional_label(kOptionalLabelKeys[0])
.add_optional_label(kOptionalLabelKeys[1])));
EXPECT_EQ(ot_builder.BuildAndRegisterGlobal(), absl::OkStatus());
grpc_core::ChannelArgs args;
args = args.Set(GRPC_ARG_SERVER_SELECTOR_KEY, GRPC_ARG_SERVER_SELECTOR_VALUE);
{
@ -2002,6 +2001,258 @@ TEST(OpenTelemetryPluginMetricsEnablingDisablingTest, TestEnableDisableAPIs) {
::testing::UnorderedElementsAre("grpc.test.metric_3"));
}
TEST_F(OpenTelemetryPluginEnd2EndTest, RegisterMultipleStatsPluginsPerChannel) {
std::shared_ptr<grpc::experimental::OpenTelemetryPlugin> plugin1;
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader1;
std::tie(plugin1, reader1) = BuildOpenTelemetryPlugin(std::move(
Options().set_metric_names({grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName})));
std::shared_ptr<grpc::experimental::OpenTelemetryPlugin> plugin2;
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader2;
std::tie(plugin2, reader2) = BuildOpenTelemetryPlugin(std::move(
Options().set_metric_names({grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName})));
Init(std::move(
Options()
.set_metric_names({grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName})
.add_per_channel_stats_plugin(std::move(plugin1))
.add_per_channel_stats_plugin(std::move(plugin2))));
const std::vector<absl::string_view> kLabelKeys = {
"grpc.method", "grpc.target", "grpc.status"};
const std::vector<absl::string_view> kLabelValues = {
kMethodName, canonical_server_address_, "OK"};
const std::vector<absl::string_view> kOptionalLabelKeys = {};
const std::vector<absl::string_view> kOptionalLabelValues = {};
SendRPC();
const char* kMetricName = "grpc.client.attempt.duration";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); });
EXPECT_THAT(
data,
::testing::ElementsAre(::testing::Pair(
kMetricName,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
::testing::Field(
&opentelemetry::sdk::metrics::PointDataAttributes::point_data,
::testing::VariantWith<
opentelemetry::sdk::metrics::HistogramPointData>(
::testing::Field(&opentelemetry::sdk::metrics::
HistogramPointData::count_,
::testing::Eq(1)))))))));
data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); },
reader1.get());
EXPECT_THAT(
data,
::testing::ElementsAre(::testing::Pair(
kMetricName,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
::testing::Field(
&opentelemetry::sdk::metrics::PointDataAttributes::point_data,
::testing::VariantWith<
opentelemetry::sdk::metrics::HistogramPointData>(
::testing::Field(&opentelemetry::sdk::metrics::
HistogramPointData::count_,
::testing::Eq(1)))))))));
data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); },
reader2.get());
EXPECT_THAT(
data,
::testing::ElementsAre(::testing::Pair(
kMetricName,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
::testing::Field(
&opentelemetry::sdk::metrics::PointDataAttributes::point_data,
::testing::VariantWith<
opentelemetry::sdk::metrics::HistogramPointData>(
::testing::Field(&opentelemetry::sdk::metrics::
HistogramPointData::count_,
::testing::Eq(1)))))))));
}
TEST_F(OpenTelemetryPluginEnd2EndTest,
RegisterSameStatsPluginForMultipleChannels) {
// channel1 channel2
// | |
// | (global plugin, plugin1) | (global plugin, plugin1, plugin2)
// | |
std::shared_ptr<grpc::experimental::OpenTelemetryPlugin> plugin1;
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader1;
std::tie(plugin1, reader1) = BuildOpenTelemetryPlugin(std::move(
Options().set_metric_names({grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName})));
std::shared_ptr<grpc::experimental::OpenTelemetryPlugin> plugin2;
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader2;
std::tie(plugin2, reader2) = BuildOpenTelemetryPlugin(std::move(
Options().set_metric_names({grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName})));
Init(std::move(
Options()
.set_metric_names({grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName})
.add_per_channel_stats_plugin(plugin1)));
// Adds the same plugin to another channel.
ChannelArguments channel_args;
plugin1->AddToChannelArguments(&channel_args);
plugin2->AddToChannelArguments(&channel_args);
auto channel2 = grpc::CreateCustomChannel(
server_address_, grpc::InsecureChannelCredentials(), channel_args);
std::unique_ptr<EchoTestService::Stub> stub =
EchoTestService::NewStub(std::move(channel2));
// Sends 2 RPCs, one from each channel.
EchoRequest request;
request.set_message("foo");
EchoResponse response;
grpc::ClientContext context;
stub->Echo(&context, request, &response);
SendRPC();
const std::vector<absl::string_view> kLabelKeys = {
"grpc.method", "grpc.target", "grpc.status"};
const std::vector<absl::string_view> kLabelValues = {
kMethodName, canonical_server_address_, "OK"};
const std::vector<absl::string_view> kOptionalLabelKeys = {};
const std::vector<absl::string_view> kOptionalLabelValues = {};
const char* kMetricName = "grpc.client.attempt.duration";
// Verifies that we got 2 histogram points in global plugin.
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); });
EXPECT_THAT(
data,
::testing::ElementsAre(::testing::Pair(
kMetricName,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
::testing::Field(
&opentelemetry::sdk::metrics::PointDataAttributes::point_data,
::testing::VariantWith<
opentelemetry::sdk::metrics::HistogramPointData>(
::testing::Field(&opentelemetry::sdk::metrics::
HistogramPointData::count_,
::testing::Eq(2)))))))));
// Verifies that we got 2 histogram points in plugin1.
data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); },
reader1.get());
EXPECT_THAT(
data,
::testing::ElementsAre(::testing::Pair(
kMetricName,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
::testing::Field(
&opentelemetry::sdk::metrics::PointDataAttributes::point_data,
::testing::VariantWith<
opentelemetry::sdk::metrics::HistogramPointData>(
::testing::Field(&opentelemetry::sdk::metrics::
HistogramPointData::count_,
::testing::Eq(2)))))))));
// Verifies that we got 1 histogram point in plugin2.
data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); },
reader2.get());
EXPECT_THAT(
data,
::testing::ElementsAre(::testing::Pair(
kMetricName,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
::testing::Field(
&opentelemetry::sdk::metrics::PointDataAttributes::point_data,
::testing::VariantWith<
opentelemetry::sdk::metrics::HistogramPointData>(
::testing::Field(&opentelemetry::sdk::metrics::
HistogramPointData::count_,
::testing::Eq(1)))))))));
}
TEST_F(OpenTelemetryPluginEnd2EndTest, RegisterMultipleStatsPluginsPerServer) {
std::shared_ptr<grpc::experimental::OpenTelemetryPlugin> plugin;
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader;
std::tie(plugin, reader) = BuildOpenTelemetryPlugin(std::move(
Options().set_metric_names({grpc::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName})));
Init(std::move(Options()
.set_metric_names({grpc::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName})
.add_per_server_stats_plugin(std::move(plugin))));
const std::vector<absl::string_view> kLabelKeys = {"grpc.method",
"grpc.status"};
const std::vector<absl::string_view> kLabelValues = {kMethodName, "OK"};
const std::vector<absl::string_view> kOptionalLabelKeys = {};
const std::vector<absl::string_view> kOptionalLabelValues = {};
SendRPC();
const char* kMetricName = "grpc.server.call.duration";
// Verifies that both plugins have the server-side metrics recorded.
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); });
EXPECT_THAT(
data,
::testing::ElementsAre(::testing::Pair(
kMetricName,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
::testing::Field(
&opentelemetry::sdk::metrics::PointDataAttributes::point_data,
::testing::VariantWith<
opentelemetry::sdk::metrics::HistogramPointData>(
::testing::Field(&opentelemetry::sdk::metrics::
HistogramPointData::count_,
::testing::Eq(1)))))))));
data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); },
reader.get());
EXPECT_THAT(
data,
::testing::ElementsAre(::testing::Pair(
kMetricName,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
::testing::Field(
&opentelemetry::sdk::metrics::PointDataAttributes::point_data,
::testing::VariantWith<
opentelemetry::sdk::metrics::HistogramPointData>(
::testing::Field(&opentelemetry::sdk::metrics::
HistogramPointData::count_,
::testing::Eq(1)))))))));
}
} // namespace
} // namespace testing
} // namespace grpc

@ -149,7 +149,6 @@ void OpenTelemetryPluginEnd2EndTest::Init(Options config) {
if (!config.service_config.empty()) {
channel_args.SetString(GRPC_ARG_SERVICE_CONFIG, config.service_config);
}
reader_ = BuildAndRegisterOpenTelemetryPlugin(std::move(config));
grpc_init();
grpc::ServerBuilder builder;
int port;
@ -157,11 +156,18 @@ void OpenTelemetryPluginEnd2EndTest::Init(Options config) {
builder.AddListeningPort("0.0.0.0:0", grpc::InsecureServerCredentials(),
&port);
builder.RegisterService(&service_);
for (auto& per_server_stats_plugin : config.per_server_stats_plugins) {
per_server_stats_plugin->AddToServerBuilder(&builder);
}
server_ = builder.BuildAndStart();
ASSERT_NE(nullptr, server_);
ASSERT_NE(0, port);
server_address_ = absl::StrCat("localhost:", port);
canonical_server_address_ = absl::StrCat("dns:///", server_address_);
for (auto& per_channel_stats_plugin : config.per_channel_stats_plugins) {
per_channel_stats_plugin->AddToChannelArguments(&channel_args);
}
reader_ = BuildAndRegisterOpenTelemetryPlugin(std::move(config));
auto channel = grpc::CreateCustomChannel(
server_address_, grpc::InsecureChannelCredentials(), channel_args);
@ -238,10 +244,35 @@ OpenTelemetryPluginEnd2EndTest::ReadCurrentMetricsData(
return data;
}
std::pair<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>,
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader>>
OpenTelemetryPluginEnd2EndTest::BuildOpenTelemetryPlugin(
OpenTelemetryPluginEnd2EndTest::Options options) {
grpc::internal::OpenTelemetryPluginBuilderImpl ot_builder;
auto reader = ConfigureOTBuilder(std::move(options), &ot_builder);
auto plugin = ot_builder.Build();
EXPECT_TRUE(plugin.ok());
return {*plugin, reader};
}
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader>
OpenTelemetryPluginEnd2EndTest::BuildAndRegisterOpenTelemetryPlugin(
OpenTelemetryPluginEnd2EndTest::Options options) {
grpc::internal::OpenTelemetryPluginBuilderImpl ot_builder;
absl::Status expected_status;
if (!options.use_meter_provider) {
expected_status =
absl::InvalidArgumentError("Need to configure a valid meter provider.");
}
auto reader = ConfigureOTBuilder(std::move(options), &ot_builder);
EXPECT_EQ(ot_builder.BuildAndRegisterGlobal(), expected_status);
return reader;
}
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader>
OpenTelemetryPluginEnd2EndTest::ConfigureOTBuilder(
OpenTelemetryPluginEnd2EndTest::Options options,
grpc::internal::OpenTelemetryPluginBuilderImpl* ot_builder) {
// We are resetting the MeterProvider and OpenTelemetry plugin at the start
// of each test to avoid test results from one test carrying over to another
// test. (Some measurements can get arbitrarily delayed.)
@ -252,28 +283,27 @@ OpenTelemetryPluginEnd2EndTest::BuildAndRegisterOpenTelemetryPlugin(
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader =
std::make_shared<grpc::testing::MockMetricReader>();
meter_provider->AddMetricReader(reader);
ot_builder.DisableAllMetrics();
ot_builder.EnableMetrics(options.metric_names);
ot_builder->DisableAllMetrics();
ot_builder->EnableMetrics(options.metric_names);
if (options.use_meter_provider) {
auto meter_provider =
std::make_shared<opentelemetry::sdk::metrics::MeterProvider>();
reader.reset(new grpc::testing::MockMetricReader);
meter_provider->AddMetricReader(reader);
ot_builder.SetMeterProvider(std::move(meter_provider));
ot_builder->SetMeterProvider(std::move(meter_provider));
}
ot_builder.SetChannelScopeFilter(std::move(options.channel_scope_filter));
ot_builder.SetServerSelector(std::move(options.server_selector));
ot_builder.SetTargetAttributeFilter(
ot_builder->SetChannelScopeFilter(std::move(options.channel_scope_filter));
ot_builder->SetServerSelector(std::move(options.server_selector));
ot_builder->SetTargetAttributeFilter(
std::move(options.target_attribute_filter));
ot_builder.SetGenericMethodAttributeFilter(
ot_builder->SetGenericMethodAttributeFilter(
std::move(options.generic_method_attribute_filter));
for (auto& option : options.plugin_options) {
ot_builder.AddPluginOption(std::move(option));
ot_builder->AddPluginOption(std::move(option));
}
for (auto& optional_label_key : options.optional_label_keys) {
ot_builder.AddOptionalLabel(optional_label_key);
ot_builder->AddOptionalLabel(optional_label_key);
}
EXPECT_EQ(ot_builder.BuildAndRegisterGlobal(), absl::OkStatus());
return reader;
}

@ -132,6 +132,18 @@ class OpenTelemetryPluginEnd2EndTest : public ::testing::Test {
return *this;
}
Options& add_per_channel_stats_plugin(
std::shared_ptr<grpc::experimental::OpenTelemetryPlugin> plugin) {
per_channel_stats_plugins.emplace_back(std::move(plugin));
return *this;
}
Options& add_per_server_stats_plugin(
std::shared_ptr<grpc::experimental::OpenTelemetryPlugin> plugin) {
per_server_stats_plugins.emplace_back(std::move(plugin));
return *this;
}
std::vector<absl::string_view> metric_names;
// TODO(yashykt): opentelemetry::sdk::resource::Resource doesn't have a copy
// assignment operator so wrapping it in a unique_ptr till it is fixed.
@ -158,6 +170,10 @@ class OpenTelemetryPluginEnd2EndTest : public ::testing::Test {
std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
plugin_options;
absl::flat_hash_set<absl::string_view> optional_label_keys;
std::vector<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>>
per_channel_stats_plugins;
std::vector<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>>
per_server_stats_plugins;
};
class MetricsCollectorThread {
@ -183,6 +199,11 @@ class OpenTelemetryPluginEnd2EndTest : public ::testing::Test {
std::thread thread_;
};
static std::shared_ptr<opentelemetry::sdk::metrics::MetricReader>
ConfigureOTBuilder(
OpenTelemetryPluginEnd2EndTest::Options options,
grpc::internal::OpenTelemetryPluginBuilderImpl* ot_builder);
// Note that we can't use SetUp() here since we want to send in parameters.
void Init(Options config);
@ -193,6 +214,10 @@ class OpenTelemetryPluginEnd2EndTest : public ::testing::Test {
void SendRPC();
void SendGenericRPC();
std::pair<std::shared_ptr<grpc::experimental::OpenTelemetryPlugin>,
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader>>
BuildOpenTelemetryPlugin(OpenTelemetryPluginEnd2EndTest::Options options);
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader>
BuildAndRegisterOpenTelemetryPlugin(
OpenTelemetryPluginEnd2EndTest::Options options);

Loading…
Cancel
Save