[OTel] Experimental API for metrics (#35348)

Provide a public experimental API and bazel compatible build target for OpenTelemetry metrics.

Details -
* New `OpenTelemetryPluginBuilder` class that provides the API specified in https://github.com/grpc/proposal/blob/master/A66-otel-stats.md
* The existing `grpc::internal::OpenTelemetryPluginBuilder` class is moved to `grpc::internal::OpenTelemetryPluginBuilderImpl` for disambiguation.
* Renamed `OTel` in some instances to `OpenTelemetry` for consistency.

Closes #35348

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35348 from yashykt:OTelPublicApi e32328825e
PiperOrigin-RevId: 594271246
pull/35409/head
Yash Tibrewal 11 months ago committed by Copybara-Service
parent f703530b6d
commit c12a5645f7
  1. 12
      BUILD
  2. 11
      include/grpcpp/ext/csm_observability.h
  3. 108
      include/grpcpp/ext/otel_plugin.h
  4. 20
      src/cpp/ext/csm/csm_observability.cc
  5. 1
      src/cpp/ext/otel/BUILD
  6. 10
      src/cpp/ext/otel/key_value_iterable.h
  7. 74
      src/cpp/ext/otel/otel_client_filter.cc
  8. 227
      src/cpp/ext/otel/otel_plugin.cc
  9. 54
      src/cpp/ext/otel/otel_plugin.h
  10. 60
      src/cpp/ext/otel/otel_server_call_tracer.cc
  11. 1
      test/cpp/ext/csm/BUILD
  12. 24
      test/cpp/ext/csm/metadata_exchange_test.cc
  13. 112
      test/cpp/ext/otel/otel_plugin_test.cc
  14. 15
      test/cpp/ext/otel/otel_test_library.cc
  15. 2
      test/cpp/ext/otel/otel_test_library.h
  16. 2
      test/cpp/interop/observability_client.cc
  17. 2
      test/cpp/interop/xds_interop_client.cc
  18. 2
      test/cpp/interop/xds_interop_server.cc

12
BUILD

@ -2404,6 +2404,18 @@ grpc_cc_library(
],
)
# This is an EXPERIMENTAL target subject to change.
grpc_cc_library(
name = "grpcpp_otel_plugin",
hdrs = [
"include/grpcpp/ext/otel_plugin.h",
],
language = "c++",
deps = [
"//src/cpp/ext/otel:otel_plugin",
],
)
grpc_cc_library(
name = "work_serializer",
srcs = [

@ -28,9 +28,12 @@
#include "absl/strings/string_view.h"
#include "opentelemetry/sdk/metrics/meter_provider.h"
#include "src/cpp/ext/otel/otel_plugin.h"
namespace grpc {
namespace internal {
class OpenTelemetryPluginBuilderImpl;
} // namespace internal
namespace experimental {
// This is a no-op at present, but in the future, this object would be useful
@ -41,6 +44,8 @@ class CsmObservability {};
// for a binary running on CSM.
class CsmObservabilityBuilder {
public:
CsmObservabilityBuilder();
~CsmObservabilityBuilder();
CsmObservabilityBuilder& SetMeterProvider(
std::shared_ptr<opentelemetry::sdk::metrics::MeterProvider>
meter_provider);
@ -80,7 +85,7 @@ class CsmObservabilityBuilder {
absl::StatusOr<CsmObservability> BuildAndRegister();
private:
internal::OpenTelemetryPluginBuilder builder_;
std::unique_ptr<grpc::internal::OpenTelemetryPluginBuilderImpl> builder_;
};
} // namespace experimental

@ -0,0 +1,108 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPCPP_EXT_OTEL_PLUGIN_H
#define GRPCPP_EXT_OTEL_PLUGIN_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include <memory>
#include "absl/functional/any_invocable.h"
#include "absl/strings/string_view.h"
#include "opentelemetry/metrics/meter_provider.h"
namespace grpc {
namespace internal {
class OpenTelemetryPluginBuilderImpl;
} // namespace internal
namespace experimental {
/// The most common way to use this API is -
///
/// OpenTelemetryPluginBuilder().SetMeterProvider(provider).BuildAndRegister();
///
/// The set of instruments available are -
/// grpc.client.attempt.started
/// grpc.client.attempt.duration
/// grpc.client.attempt.sent_total_compressed_message_size
/// grpc.client.attempt.rcvd_total_compressed_message_size
/// grpc.server.call.started
/// grpc.server.call.duration
/// grpc.server.call.sent_total_compressed_message_size
/// grpc.server.call.rcvd_total_compressed_message_size
class OpenTelemetryPluginBuilder {
public:
/// Metrics
static constexpr absl::string_view kClientAttemptStartedInstrumentName =
"grpc.client.attempt.started";
static constexpr absl::string_view kClientAttemptDurationInstrumentName =
"grpc.client.attempt.duration";
static constexpr absl::string_view
kClientAttemptSentTotalCompressedMessageSizeInstrumentName =
"grpc.client.attempt.sent_total_compressed_message_size";
static constexpr absl::string_view
kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName =
"grpc.client.attempt.rcvd_total_compressed_message_size";
static constexpr absl::string_view kServerCallStartedInstrumentName =
"grpc.server.call.started";
static constexpr absl::string_view kServerCallDurationInstrumentName =
"grpc.server.call.duration";
static constexpr absl::string_view
kServerCallSentTotalCompressedMessageSizeInstrumentName =
"grpc.server.call.sent_total_compressed_message_size";
static constexpr absl::string_view
kServerCallRcvdTotalCompressedMessageSizeInstrumentName =
"grpc.server.call.rcvd_total_compressed_message_size";
OpenTelemetryPluginBuilder();
/// If `SetMeterProvider()` is not called, no metrics are collected.
OpenTelemetryPluginBuilder& SetMeterProvider(
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider);
/// If set, \a target_attribute_filter is called per channel to decide whether
/// to record the target attribute on client or to replace it with "other".
/// This helps reduce the cardinality on metrics in cases where many channels
/// are created with different targets in the same binary (which might happen
/// for example, if the channel target string uses IP addresses directly).
OpenTelemetryPluginBuilder& SetTargetAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter);
/// If set, \a generic_method_attribute_filter is called per call with a
/// generic method type to decide whether to record the method name or to
/// replace it with "other". Non-generic or pre-registered methods remain
/// unaffected. If not set, by default, generic method names are replaced with
/// "other" when recording metrics.
OpenTelemetryPluginBuilder& SetGenericMethodAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter);
/// Registers a global plugin that acts on all channels and servers running on
/// the process.
void BuildAndRegisterGlobal();
private:
std::unique_ptr<internal::OpenTelemetryPluginBuilderImpl> impl_;
};
} // namespace experimental
} // namespace grpc
#endif // GRPCPP_EXT_OTEL_PLUGIN_H

@ -48,17 +48,23 @@ namespace experimental {
// CsmObservabilityBuilder
//
CsmObservabilityBuilder::CsmObservabilityBuilder()
: builder_(
std::make_unique<grpc::internal::OpenTelemetryPluginBuilderImpl>()) {}
CsmObservabilityBuilder::~CsmObservabilityBuilder() = default;
CsmObservabilityBuilder& CsmObservabilityBuilder::SetMeterProvider(
std::shared_ptr<opentelemetry::sdk::metrics::MeterProvider>
meter_provider) {
builder_.SetMeterProvider(meter_provider);
builder_->SetMeterProvider(meter_provider);
return *this;
}
CsmObservabilityBuilder& CsmObservabilityBuilder::SetTargetAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter) {
builder_.SetTargetAttributeFilter(std::move(target_attribute_filter));
builder_->SetTargetAttributeFilter(std::move(target_attribute_filter));
return *this;
}
@ -66,22 +72,22 @@ CsmObservabilityBuilder&
CsmObservabilityBuilder::SetGenericMethodAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter) {
builder_.SetGenericMethodAttributeFilter(
builder_->SetGenericMethodAttributeFilter(
std::move(generic_method_attribute_filter));
return *this;
}
absl::StatusOr<CsmObservability> CsmObservabilityBuilder::BuildAndRegister() {
builder_.SetServerSelector([](const grpc_core::ChannelArgs& args) {
builder_->SetServerSelector([](const grpc_core::ChannelArgs& args) {
return args.GetBool(GRPC_ARG_XDS_ENABLED_SERVER).value_or(false);
});
builder_.SetTargetSelector(internal::CsmChannelTargetSelector);
builder_.SetLabelsInjector(
builder_->SetTargetSelector(internal::CsmChannelTargetSelector);
builder_->SetLabelsInjector(
std::make_unique<internal::ServiceMeshLabelsInjector>(
google::cloud::otel::MakeResourceDetector()
->Detect()
.GetAttributes()));
builder_.BuildAndRegisterGlobal();
builder_->BuildAndRegisterGlobal();
return CsmObservability();
}

@ -41,6 +41,7 @@ grpc_cc_library(
"otel_client_filter.h",
"otel_plugin.h",
"otel_server_call_tracer.h",
"//:include/grpcpp/ext/otel_plugin.h",
],
external_deps = [
"absl/base:core_headers",

@ -38,7 +38,7 @@
namespace grpc {
namespace internal {
inline opentelemetry::nostd::string_view AbslStrViewToOTelStrView(
inline opentelemetry::nostd::string_view AbslStrViewToOpenTelemetryStrView(
absl::string_view str) {
return opentelemetry::nostd::string_view(str.data(), str.size());
}
@ -62,15 +62,15 @@ class KeyValueIterable : public opentelemetry::common::KeyValueIterable {
if (injected_labels_iterable_ != nullptr) {
injected_labels_iterable_->ResetIteratorPosition();
while (const auto& pair = injected_labels_iterable_->Next()) {
if (!callback(AbslStrViewToOTelStrView(pair->first),
AbslStrViewToOTelStrView(pair->second))) {
if (!callback(AbslStrViewToOpenTelemetryStrView(pair->first),
AbslStrViewToOpenTelemetryStrView(pair->second))) {
return false;
}
}
}
for (const auto& pair : additional_labels_) {
if (!callback(AbslStrViewToOTelStrView(pair.first),
AbslStrViewToOTelStrView(pair.second))) {
if (!callback(AbslStrViewToOpenTelemetryStrView(pair.first),
AbslStrViewToOpenTelemetryStrView(pair.second))) {
return false;
}
}

@ -76,8 +76,8 @@ absl::StatusOr<OpenTelemetryClientFilter> OpenTelemetryClientFilter::Create(
std::string target = args.GetOwnedString(GRPC_ARG_SERVER_URI).value_or("");
// Use the original target string only if a filter on the attribute is not
// registered or if the filter returns true, otherwise use "other".
if (OTelPluginState().target_attribute_filter == nullptr ||
OTelPluginState().target_attribute_filter(target)) {
if (OpenTelemetryPluginState().target_attribute_filter == nullptr ||
OpenTelemetryPluginState().target_attribute_filter(target)) {
return OpenTelemetryClientFilter(std::move(target));
}
return OpenTelemetryClientFilter("other");
@ -116,13 +116,14 @@ OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
: parent_(parent),
arena_allocated_(arena_allocated),
start_time_(absl::Now()) {
if (OTelPluginState().client.attempt.started != nullptr) {
if (OpenTelemetryPluginState().client.attempt.started != nullptr) {
std::array<std::pair<absl::string_view, absl::string_view>, 2>
additional_labels = {{{OTelMethodKey(), parent_->MethodForStats()},
{OTelTargetKey(), parent_->parent_->target()}}};
additional_labels = {
{{OpenTelemetryMethodKey(), parent_->MethodForStats()},
{OpenTelemetryTargetKey(), parent_->parent_->target()}}};
// We might not have all the injected labels that we want at this point, so
// avoid recording a subset of injected labels here.
OTelPluginState().client.attempt.started->Add(
OpenTelemetryPluginState().client.attempt.started->Add(
1, KeyValueIterable(/*injected_labels_iterable=*/nullptr,
additional_labels));
}
@ -130,17 +131,17 @@ OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) {
if (OTelPluginState().labels_injector != nullptr) {
injected_labels_ =
OTelPluginState().labels_injector->GetLabels(recv_initial_metadata);
if (OpenTelemetryPluginState().labels_injector != nullptr) {
injected_labels_ = OpenTelemetryPluginState().labels_injector->GetLabels(
recv_initial_metadata);
}
}
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
if (OTelPluginState().labels_injector != nullptr) {
OTelPluginState().labels_injector->AddLabels(send_initial_metadata,
nullptr);
if (OpenTelemetryPluginState().labels_injector != nullptr) {
OpenTelemetryPluginState().labels_injector->AddLabels(send_initial_metadata,
nullptr);
}
}
@ -175,32 +176,35 @@ void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
absl::Status status, grpc_metadata_batch* /*recv_trailing_metadata*/,
const grpc_transport_stream_stats* transport_stream_stats) {
std::array<std::pair<absl::string_view, absl::string_view>, 3>
additional_labels = {{{OTelMethodKey(), parent_->MethodForStats()},
{OTelTargetKey(), parent_->parent_->target()},
{OTelStatusKey(), grpc_status_code_to_string(
static_cast<grpc_status_code>(
status.code()))}}};
additional_labels = {
{{OpenTelemetryMethodKey(), parent_->MethodForStats()},
{OpenTelemetryTargetKey(), parent_->parent_->target()},
{OpenTelemetryStatusKey(),
grpc_status_code_to_string(
static_cast<grpc_status_code>(status.code()))}}};
KeyValueIterable labels(injected_labels_.get(), additional_labels);
if (OTelPluginState().client.attempt.duration != nullptr) {
OTelPluginState().client.attempt.duration->Record(
if (OpenTelemetryPluginState().client.attempt.duration != nullptr) {
OpenTelemetryPluginState().client.attempt.duration->Record(
absl::ToDoubleSeconds(absl::Now() - start_time_), labels,
opentelemetry::context::Context{});
}
if (OTelPluginState().client.attempt.sent_total_compressed_message_size !=
nullptr) {
OTelPluginState().client.attempt.sent_total_compressed_message_size->Record(
transport_stream_stats != nullptr
? transport_stream_stats->outgoing.data_bytes
: 0,
labels, opentelemetry::context::Context{});
if (OpenTelemetryPluginState()
.client.attempt.sent_total_compressed_message_size != nullptr) {
OpenTelemetryPluginState()
.client.attempt.sent_total_compressed_message_size->Record(
transport_stream_stats != nullptr
? transport_stream_stats->outgoing.data_bytes
: 0,
labels, opentelemetry::context::Context{});
}
if (OTelPluginState().client.attempt.rcvd_total_compressed_message_size !=
nullptr) {
OTelPluginState().client.attempt.rcvd_total_compressed_message_size->Record(
transport_stream_stats != nullptr
? transport_stream_stats->incoming.data_bytes
: 0,
labels, opentelemetry::context::Context{});
if (OpenTelemetryPluginState()
.client.attempt.rcvd_total_compressed_message_size != nullptr) {
OpenTelemetryPluginState()
.client.attempt.rcvd_total_compressed_message_size->Record(
transport_stream_stats != nullptr
? transport_stream_stats->incoming.data_bytes
: 0,
labels, opentelemetry::context::Context{});
}
}
@ -273,8 +277,8 @@ OpenTelemetryCallTracer::StartNewAttempt(bool is_transparent_retry) {
absl::string_view OpenTelemetryCallTracer::MethodForStats() const {
absl::string_view method = absl::StripPrefix(path_.as_string_view(), "/");
if (registered_method_ ||
(OTelPluginState().generic_method_attribute_filter != nullptr &&
OTelPluginState().generic_method_attribute_filter(method))) {
(OpenTelemetryPluginState().generic_method_attribute_filter != nullptr &&
OpenTelemetryPluginState().generic_method_attribute_filter(method))) {
return method;
}
return "other";

@ -29,6 +29,7 @@
#include "opentelemetry/nostd/unique_ptr.h"
#include <grpc/support/log.h>
#include <grpcpp/ext/otel_plugin.h>
#include <grpcpp/version_info.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
@ -42,199 +43,194 @@
namespace grpc {
namespace internal {
// TODO(yashykt): Extend this to allow multiple OTel plugins to be registered in
// the same binary.
struct OTelPluginState* g_otel_plugin_state_;
// TODO(yashykt): Extend this to allow multiple OpenTelemetry plugins to be
// registered in the same binary.
struct OpenTelemetryPluginState* g_otel_plugin_state_;
const struct OTelPluginState& OTelPluginState() {
const struct OpenTelemetryPluginState& OpenTelemetryPluginState() {
GPR_DEBUG_ASSERT(g_otel_plugin_state_ != nullptr);
return *g_otel_plugin_state_;
}
absl::string_view OTelMethodKey() { return "grpc.method"; }
absl::string_view OpenTelemetryMethodKey() { return "grpc.method"; }
absl::string_view OTelStatusKey() { return "grpc.status"; }
absl::string_view OpenTelemetryStatusKey() { return "grpc.status"; }
absl::string_view OTelTargetKey() { return "grpc.target"; }
absl::string_view OTelClientAttemptStartedInstrumentName() {
return "grpc.client.attempt.started";
}
absl::string_view OTelClientAttemptDurationInstrumentName() {
return "grpc.client.attempt.duration";
}
absl::string_view
OTelClientAttemptSentTotalCompressedMessageSizeInstrumentName() {
return "grpc.client.attempt.sent_total_compressed_message_size";
}
absl::string_view
OTelClientAttemptRcvdTotalCompressedMessageSizeInstrumentName() {
return "grpc.client.attempt.rcvd_total_compressed_message_size";
}
absl::string_view OTelServerCallStartedInstrumentName() {
return "grpc.server.call.started";
}
absl::string_view OTelServerCallDurationInstrumentName() {
return "grpc.server.call.duration";
}
absl::string_view OTelServerCallSentTotalCompressedMessageSizeInstrumentName() {
return "grpc.server.call.sent_total_compressed_message_size";
}
absl::string_view OTelServerCallRcvdTotalCompressedMessageSizeInstrumentName() {
return "grpc.server.call.rcvd_total_compressed_message_size";
}
absl::string_view OpenTelemetryTargetKey() { return "grpc.target"; }
namespace {
absl::flat_hash_set<std::string> BaseMetrics() {
return absl::flat_hash_set<std::string>{
std::string(OTelClientAttemptStartedInstrumentName()),
std::string(OTelClientAttemptDurationInstrumentName()),
std::string(grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName),
std::string(grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName),
std::string(
OTelClientAttemptSentTotalCompressedMessageSizeInstrumentName()),
grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptSentTotalCompressedMessageSizeInstrumentName),
std::string(
OTelClientAttemptRcvdTotalCompressedMessageSizeInstrumentName()),
std::string(OTelServerCallStartedInstrumentName()),
std::string(OTelServerCallDurationInstrumentName()),
std::string(OTelServerCallSentTotalCompressedMessageSizeInstrumentName()),
std::string(
OTelServerCallRcvdTotalCompressedMessageSizeInstrumentName())};
grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName),
std::string(grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallStartedInstrumentName),
std::string(grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName),
std::string(grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallSentTotalCompressedMessageSizeInstrumentName),
std::string(grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallRcvdTotalCompressedMessageSizeInstrumentName)};
}
} // namespace
//
// OpenTelemetryPluginBuilder
// OpenTelemetryPluginBuilderImpl
//
OpenTelemetryPluginBuilder::OpenTelemetryPluginBuilder()
OpenTelemetryPluginBuilderImpl::OpenTelemetryPluginBuilderImpl()
: metrics_(BaseMetrics()) {}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetMeterProvider(
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetMeterProvider(
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider) {
meter_provider_ = std::move(meter_provider);
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::EnableMetric(
OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::EnableMetric(
absl::string_view metric_name) {
metrics_.emplace(metric_name);
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::DisableMetric(
OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::DisableMetric(
absl::string_view metric_name) {
metrics_.erase(metric_name);
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::DisableAllMetrics() {
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::DisableAllMetrics() {
metrics_.clear();
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetLabelsInjector(
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetLabelsInjector(
std::unique_ptr<LabelsInjector> labels_injector) {
labels_injector_ = std::move(labels_injector);
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetTargetSelector(
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetTargetSelector(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_selector) {
target_selector_ = std::move(target_selector);
return *this;
}
OpenTelemetryPluginBuilder&
OpenTelemetryPluginBuilder::SetTargetAttributeFilter(
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetTargetAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter) {
target_attribute_filter_ = std::move(target_attribute_filter);
return *this;
}
OpenTelemetryPluginBuilder&
OpenTelemetryPluginBuilder::SetGenericMethodAttributeFilter(
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetGenericMethodAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter) {
generic_method_attribute_filter_ = std::move(generic_method_attribute_filter);
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetServerSelector(
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetServerSelector(
absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
server_selector) {
server_selector_ = std::move(server_selector);
return *this;
}
void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
void OpenTelemetryPluginBuilderImpl::BuildAndRegisterGlobal() {
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
meter_provider = meter_provider_;
delete g_otel_plugin_state_;
g_otel_plugin_state_ = new struct OTelPluginState;
g_otel_plugin_state_ = new struct OpenTelemetryPluginState;
if (meter_provider == nullptr) {
return;
}
auto meter = meter_provider->GetMeter("grpc-c++", GRPC_CPP_VERSION_STRING);
if (metrics_.contains(OTelClientAttemptStartedInstrumentName())) {
if (metrics_.contains(grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName)) {
g_otel_plugin_state_->client.attempt.started = meter->CreateUInt64Counter(
std::string(OTelClientAttemptStartedInstrumentName()),
std::string(grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName),
"Number of client call attempts started", "{attempt}");
}
if (metrics_.contains(OTelClientAttemptDurationInstrumentName())) {
if (metrics_.contains(grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName)) {
g_otel_plugin_state_->client.attempt.duration =
meter->CreateDoubleHistogram(
std::string(OTelClientAttemptDurationInstrumentName()),
std::string(grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName),
"End-to-end time taken to complete a client call attempt", "s");
}
if (metrics_.contains(
OTelClientAttemptSentTotalCompressedMessageSizeInstrumentName())) {
g_otel_plugin_state_->client.attempt
.sent_total_compressed_message_size = meter->CreateUInt64Histogram(
std::string(
OTelClientAttemptSentTotalCompressedMessageSizeInstrumentName()),
"Compressed message bytes sent per client call attempt", "By");
grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptSentTotalCompressedMessageSizeInstrumentName)) {
g_otel_plugin_state_->client.attempt.sent_total_compressed_message_size =
meter->CreateUInt64Histogram(
std::string(
grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptSentTotalCompressedMessageSizeInstrumentName),
"Compressed message bytes sent per client call attempt", "By");
}
if (metrics_.contains(
OTelClientAttemptRcvdTotalCompressedMessageSizeInstrumentName())) {
g_otel_plugin_state_->client.attempt
.rcvd_total_compressed_message_size = meter->CreateUInt64Histogram(
std::string(
OTelClientAttemptRcvdTotalCompressedMessageSizeInstrumentName()),
"Compressed message bytes received per call attempt", "By");
grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName)) {
g_otel_plugin_state_->client.attempt.rcvd_total_compressed_message_size =
meter->CreateUInt64Histogram(
std::string(
grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName),
"Compressed message bytes received per call attempt", "By");
}
if (metrics_.contains(OTelServerCallStartedInstrumentName())) {
if (metrics_.contains(grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallStartedInstrumentName)) {
g_otel_plugin_state_->server.call.started = meter->CreateUInt64Counter(
std::string(OTelServerCallStartedInstrumentName()),
std::string(grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallStartedInstrumentName),
"Number of server calls started", "{call}");
}
if (metrics_.contains(OTelServerCallDurationInstrumentName())) {
if (metrics_.contains(grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName)) {
g_otel_plugin_state_->server.call.duration = meter->CreateDoubleHistogram(
std::string(OTelServerCallDurationInstrumentName()),
std::string(grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName),
"End-to-end time taken to complete a call from server transport's "
"perspective",
"s");
}
if (metrics_.contains(
OTelServerCallSentTotalCompressedMessageSizeInstrumentName())) {
grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallSentTotalCompressedMessageSizeInstrumentName)) {
g_otel_plugin_state_->server.call.sent_total_compressed_message_size =
meter->CreateUInt64Histogram(
std::string(
OTelServerCallSentTotalCompressedMessageSizeInstrumentName()),
grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallSentTotalCompressedMessageSizeInstrumentName),
"Compressed message bytes sent per server call", "By");
}
if (metrics_.contains(
OTelServerCallRcvdTotalCompressedMessageSizeInstrumentName())) {
grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallRcvdTotalCompressedMessageSizeInstrumentName)) {
g_otel_plugin_state_->server.call.rcvd_total_compressed_message_size =
meter->CreateUInt64Histogram(
std::string(
OTelServerCallRcvdTotalCompressedMessageSizeInstrumentName()),
grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallRcvdTotalCompressedMessageSizeInstrumentName),
"Compressed message bytes received per server call", "By");
}
g_otel_plugin_state_->labels_injector = std::move(labels_injector_);
@ -264,4 +260,59 @@ void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
}
} // namespace internal
namespace experimental {
constexpr absl::string_view
OpenTelemetryPluginBuilder::kClientAttemptStartedInstrumentName;
constexpr absl::string_view
OpenTelemetryPluginBuilder::kClientAttemptDurationInstrumentName;
constexpr absl::string_view OpenTelemetryPluginBuilder::
kClientAttemptSentTotalCompressedMessageSizeInstrumentName;
constexpr absl::string_view OpenTelemetryPluginBuilder::
kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName;
constexpr absl::string_view
OpenTelemetryPluginBuilder::kServerCallStartedInstrumentName;
constexpr absl::string_view
OpenTelemetryPluginBuilder::kServerCallDurationInstrumentName;
constexpr absl::string_view OpenTelemetryPluginBuilder::
kServerCallSentTotalCompressedMessageSizeInstrumentName;
constexpr absl::string_view OpenTelemetryPluginBuilder::
kServerCallRcvdTotalCompressedMessageSizeInstrumentName;
//
// OpenTelemetryPluginBuilder
//
OpenTelemetryPluginBuilder::OpenTelemetryPluginBuilder()
: impl_(std::make_unique<internal::OpenTelemetryPluginBuilderImpl>()) {}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetMeterProvider(
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider) {
impl_->SetMeterProvider(std::move(meter_provider));
return *this;
}
OpenTelemetryPluginBuilder&
OpenTelemetryPluginBuilder::SetTargetAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter) {
impl_->SetTargetAttributeFilter(std::move(target_attribute_filter));
return *this;
}
OpenTelemetryPluginBuilder&
OpenTelemetryPluginBuilder::SetGenericMethodAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter) {
impl_->SetGenericMethodAttributeFilter(
std::move(generic_method_attribute_filter));
return *this;
}
void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
impl_->BuildAndRegisterGlobal();
}
} // namespace experimental
} // namespace grpc

@ -43,7 +43,7 @@ namespace grpc {
namespace internal {
// An iterable container interface that can be used as a return type for the
// OTel plugin's label injector.
// OpenTelemetry plugin's label injector.
class LabelsIterable {
public:
virtual ~LabelsIterable() = default;
@ -60,7 +60,7 @@ class LabelsIterable {
};
// An interface that allows you to add additional labels on the calls traced
// through the OTel plugin.
// through the OpenTelemetry plugin.
class LabelsInjector {
public:
virtual ~LabelsInjector() {}
@ -77,7 +77,7 @@ class LabelsInjector {
LabelsIterable* labels_from_incoming_metadata) = 0;
};
struct OTelPluginState {
struct OpenTelemetryPluginState {
struct Client {
struct Attempt {
std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>> started;
@ -109,33 +109,21 @@ struct OTelPluginState {
server_selector;
};
const struct OTelPluginState& OTelPluginState();
const struct OpenTelemetryPluginState& OpenTelemetryPluginState();
// Tags
absl::string_view OTelMethodKey();
absl::string_view OTelStatusKey();
absl::string_view OTelTargetKey();
// Metrics
absl::string_view OTelClientAttemptStartedInstrumentName();
absl::string_view OTelClientAttemptDurationInstrumentName();
absl::string_view
OTelClientAttemptSentTotalCompressedMessageSizeInstrumentName();
absl::string_view
OTelClientAttemptRcvdTotalCompressedMessageSizeInstrumentName();
absl::string_view OTelServerCallStartedInstrumentName();
absl::string_view OTelServerCallDurationInstrumentName();
absl::string_view OTelServerCallSentTotalCompressedMessageSizeInstrumentName();
absl::string_view OTelServerCallRcvdTotalCompressedMessageSizeInstrumentName();
class OpenTelemetryPluginBuilder {
absl::string_view OpenTelemetryMethodKey();
absl::string_view OpenTelemetryStatusKey();
absl::string_view OpenTelemetryTargetKey();
class OpenTelemetryPluginBuilderImpl {
public:
OpenTelemetryPluginBuilder();
OpenTelemetryPluginBuilderImpl();
// If `SetMeterProvider()` is not called, no metrics are collected.
OpenTelemetryPluginBuilder& SetMeterProvider(
OpenTelemetryPluginBuilderImpl& SetMeterProvider(
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider);
// Methods to manipulate which instruments are enabled in the OTel Stats
// Plugin. The default set of instruments are -
// Methods to manipulate which instruments are enabled in the OpenTelemetry
// Stats Plugin. The default set of instruments are -
// grpc.client.attempt.started
// grpc.client.attempt.duration
// grpc.client.attempt.sent_total_compressed_message_size
@ -144,22 +132,22 @@ class OpenTelemetryPluginBuilder {
// grpc.server.call.duration
// grpc.server.call.sent_total_compressed_message_size
// grpc.server.call.rcvd_total_compressed_message_size
OpenTelemetryPluginBuilder& EnableMetric(absl::string_view metric_name);
OpenTelemetryPluginBuilder& DisableMetric(absl::string_view metric_name);
OpenTelemetryPluginBuilder& DisableAllMetrics();
OpenTelemetryPluginBuilderImpl& EnableMetric(absl::string_view metric_name);
OpenTelemetryPluginBuilderImpl& DisableMetric(absl::string_view metric_name);
OpenTelemetryPluginBuilderImpl& DisableAllMetrics();
// Allows setting a labels injector on calls traced through this plugin.
OpenTelemetryPluginBuilder& SetLabelsInjector(
OpenTelemetryPluginBuilderImpl& SetLabelsInjector(
std::unique_ptr<LabelsInjector> labels_injector);
// If set, \a target_selector is called per channel to decide whether to
// collect metrics on that target or not.
OpenTelemetryPluginBuilder& SetTargetSelector(
OpenTelemetryPluginBuilderImpl& SetTargetSelector(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_selector);
// If set, \a server_selector is called per incoming call on the server
// to decide whether to collect metrics on that call or not.
// TODO(yashkt): We should only need to do this per server connection or even
// per server. Change this when we have a ServerTracer.
OpenTelemetryPluginBuilder& SetServerSelector(
OpenTelemetryPluginBuilderImpl& SetServerSelector(
absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>
server_selector);
// If set, \a target_attribute_filter is called per channel to decide whether
@ -167,7 +155,7 @@ class OpenTelemetryPluginBuilder {
// This helps reduce the cardinality on metrics in cases where many channels
// are created with different targets in the same binary (which might happen
// for example, if the channel target string uses IP addresses directly).
OpenTelemetryPluginBuilder& SetTargetAttributeFilter(
OpenTelemetryPluginBuilderImpl& SetTargetAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter);
// If set, \a generic_method_attribute_filter is called per call with a
@ -175,7 +163,7 @@ class OpenTelemetryPluginBuilder {
// replace it with "other". Non-generic or pre-registered methods remain
// unaffected. If not set, by default, generic method names are replaced with
// "other" when recording metrics.
OpenTelemetryPluginBuilder& SetGenericMethodAttributeFilter(
OpenTelemetryPluginBuilderImpl& SetGenericMethodAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter);
void BuildAndRegisterGlobal();

@ -77,9 +77,9 @@ class OpenTelemetryServerCallTracer : public grpc_core::ServerCallTracer {
// arguments.
void RecordSendInitialMetadata(
grpc_metadata_batch* send_initial_metadata) override {
if (OTelPluginState().labels_injector != nullptr) {
OTelPluginState().labels_injector->AddLabels(send_initial_metadata,
injected_labels_.get());
if (OpenTelemetryPluginState().labels_injector != nullptr) {
OpenTelemetryPluginState().labels_injector->AddLabels(
send_initial_metadata, injected_labels_.get());
}
}
@ -135,8 +135,9 @@ class OpenTelemetryServerCallTracer : public grpc_core::ServerCallTracer {
absl::string_view MethodForStats() const {
absl::string_view method = absl::StripPrefix(path_.as_string_view(), "/");
if (registered_method_ ||
(OTelPluginState().generic_method_attribute_filter != nullptr &&
OTelPluginState().generic_method_attribute_filter(method))) {
(OpenTelemetryPluginState().generic_method_attribute_filter !=
nullptr &&
OpenTelemetryPluginState().generic_method_attribute_filter(method))) {
return method;
}
return "other";
@ -153,19 +154,19 @@ void OpenTelemetryServerCallTracer::RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) {
path_ =
recv_initial_metadata->get_pointer(grpc_core::HttpPathMetadata())->Ref();
if (OTelPluginState().labels_injector != nullptr) {
injected_labels_ =
OTelPluginState().labels_injector->GetLabels(recv_initial_metadata);
if (OpenTelemetryPluginState().labels_injector != nullptr) {
injected_labels_ = OpenTelemetryPluginState().labels_injector->GetLabels(
recv_initial_metadata);
}
registered_method_ =
recv_initial_metadata->get(grpc_core::GrpcRegisteredMethod())
.value_or(nullptr) != nullptr;
std::array<std::pair<absl::string_view, absl::string_view>, 1>
additional_labels = {{{OTelMethodKey(), MethodForStats()}}};
if (OTelPluginState().server.call.started != nullptr) {
additional_labels = {{{OpenTelemetryMethodKey(), MethodForStats()}}};
if (OpenTelemetryPluginState().server.call.started != nullptr) {
// We might not have all the injected labels that we want at this point, so
// avoid recording a subset of injected labels here.
OTelPluginState().server.call.started->Add(
OpenTelemetryPluginState().server.call.started->Add(
1, KeyValueIterable(/*injected_labels_iterable=*/nullptr,
additional_labels));
}
@ -181,26 +182,29 @@ void OpenTelemetryServerCallTracer::RecordSendTrailingMetadata(
void OpenTelemetryServerCallTracer::RecordEnd(
const grpc_call_final_info* final_info) {
std::array<std::pair<absl::string_view, absl::string_view>, 2>
additional_labels = {{{OTelMethodKey(), MethodForStats()},
{OTelStatusKey(), grpc_status_code_to_string(
final_info->final_status)}}};
additional_labels = {
{{OpenTelemetryMethodKey(), MethodForStats()},
{OpenTelemetryStatusKey(),
grpc_status_code_to_string(final_info->final_status)}}};
KeyValueIterable labels(injected_labels_.get(), additional_labels);
if (OTelPluginState().server.call.duration != nullptr) {
OTelPluginState().server.call.duration->Record(
if (OpenTelemetryPluginState().server.call.duration != nullptr) {
OpenTelemetryPluginState().server.call.duration->Record(
absl::ToDoubleSeconds(elapsed_time_), labels,
opentelemetry::context::Context{});
}
if (OTelPluginState().server.call.sent_total_compressed_message_size !=
nullptr) {
OTelPluginState().server.call.sent_total_compressed_message_size->Record(
final_info->stats.transport_stream_stats.outgoing.data_bytes, labels,
opentelemetry::context::Context{});
if (OpenTelemetryPluginState()
.server.call.sent_total_compressed_message_size != nullptr) {
OpenTelemetryPluginState()
.server.call.sent_total_compressed_message_size->Record(
final_info->stats.transport_stream_stats.outgoing.data_bytes,
labels, opentelemetry::context::Context{});
}
if (OTelPluginState().server.call.rcvd_total_compressed_message_size !=
nullptr) {
OTelPluginState().server.call.rcvd_total_compressed_message_size->Record(
final_info->stats.transport_stream_stats.incoming.data_bytes, labels,
opentelemetry::context::Context{});
if (OpenTelemetryPluginState()
.server.call.rcvd_total_compressed_message_size != nullptr) {
OpenTelemetryPluginState()
.server.call.rcvd_total_compressed_message_size->Record(
final_info->stats.transport_stream_stats.incoming.data_bytes,
labels, opentelemetry::context::Context{});
}
}
@ -220,8 +224,8 @@ bool OpenTelemetryServerCallTracerFactory::IsServerTraced(
const grpc_core::ChannelArgs& args) {
// Return true only if there is no server selector registered or if the server
// selector returns true.
return OTelPluginState().server_selector == nullptr ||
OTelPluginState().server_selector(args);
return OpenTelemetryPluginState().server_selector == nullptr ||
OpenTelemetryPluginState().server_selector(args);
}
} // namespace internal

@ -37,6 +37,7 @@ grpc_cc_test(
deps = [
"//:grpc++",
"//src/cpp/ext/csm:csm_observability",
"//src/cpp/ext/otel:otel_plugin",
"//test/core/util:grpc_test_util",
],
)

@ -26,6 +26,7 @@
#include "opentelemetry/sdk/metrics/meter_provider.h"
#include "opentelemetry/sdk/metrics/metric_reader.h"
#include <grpcpp/ext/otel_plugin.h>
#include <grpcpp/grpcpp.h>
#include "src/core/lib/channel/call_tracer.h"
@ -108,7 +109,7 @@ class TestScenario {
};
class MetadataExchangeTest
: public OTelPluginEnd2EndTest,
: public OpenTelemetryPluginEnd2EndTest,
public ::testing::WithParamInterface<TestScenario> {
protected:
void Init(const absl::flat_hash_set<absl::string_view>& metric_names,
@ -131,7 +132,7 @@ class MetadataExchangeTest
grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP_CONFIG", kBootstrap);
break;
}
OTelPluginEnd2EndTest::Init(
OpenTelemetryPluginEnd2EndTest::Init(
metric_names, /*resource=*/GetParam().GetTestResource(),
/*labels_injector=*/
std::make_unique<grpc::internal::ServiceMeshLabelsInjector>(
@ -205,8 +206,8 @@ class MetadataExchangeTest
// Verify that grpc.client.attempt.started does not get service mesh attributes
TEST_P(MetadataExchangeTest, ClientAttemptStarted) {
Init(/*metric_names=*/{
grpc::internal::OTelClientAttemptStartedInstrumentName()});
Init(/*metric_names=*/{grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName});
SendRPC();
const char* kMetricName = "grpc.client.attempt.started";
auto data = ReadCurrentMetricsData(
@ -229,8 +230,8 @@ TEST_P(MetadataExchangeTest, ClientAttemptStarted) {
}
TEST_P(MetadataExchangeTest, ClientAttemptDuration) {
Init(/*metric_names=*/{
grpc::internal::OTelClientAttemptDurationInstrumentName()});
Init(/*metric_names=*/{grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName});
SendRPC();
const char* kMetricName = "grpc.client.attempt.duration";
auto data = ReadCurrentMetricsData(
@ -255,7 +256,8 @@ TEST_P(MetadataExchangeTest, ClientAttemptDuration) {
// Verify that grpc.server.call.started does not get service mesh attributes
TEST_P(MetadataExchangeTest, ServerCallStarted) {
Init(
/*metric_names=*/{grpc::internal::OTelServerCallStartedInstrumentName()});
/*metric_names=*/{grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallStartedInstrumentName});
SendRPC();
const char* kMetricName = "grpc.server.call.started";
auto data = ReadCurrentMetricsData(
@ -274,8 +276,9 @@ TEST_P(MetadataExchangeTest, ServerCallStarted) {
}
TEST_P(MetadataExchangeTest, ServerCallDuration) {
Init(/*metric_names=*/{
grpc::internal::OTelServerCallDurationInstrumentName()});
Init(
/*metric_names=*/{grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName});
SendRPC();
const char* kMetricName = "grpc.server.call.duration";
auto data = ReadCurrentMetricsData(
@ -298,7 +301,8 @@ TEST_P(MetadataExchangeTest, ServerCallDuration) {
// Test that the server records unknown when the client does not send metadata
TEST_P(MetadataExchangeTest, ClientDoesNotSendMetadata) {
Init(
/*metric_names=*/{grpc::internal::OTelServerCallDurationInstrumentName()},
/*metric_names=*/{grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName},
/*enable_client_side_injector=*/false);
SendRPC();
const char* kMetricName = "grpc.server.call.duration";

@ -25,6 +25,7 @@
#include "opentelemetry/sdk/metrics/meter_provider.h"
#include "opentelemetry/sdk/metrics/metric_reader.h"
#include <grpcpp/ext/otel_plugin.h>
#include <grpcpp/grpcpp.h>
#include "src/core/lib/channel/call_tracer.h"
@ -37,16 +38,21 @@ namespace grpc {
namespace testing {
namespace {
TEST(OTelPluginBuildTest, ApiDependency) {
TEST(OpenTelemetryPluginBuildTest, ApiDependency) {
opentelemetry::metrics::Provider::GetMeterProvider();
}
TEST(OTelPluginBuildTest, SdkDependency) {
TEST(OpenTelemetryPluginBuildTest, SdkDependency) {
opentelemetry::sdk::metrics::MeterProvider();
}
TEST_F(OTelPluginEnd2EndTest, ClientAttemptStarted) {
Init({grpc::internal::OTelClientAttemptStartedInstrumentName()});
TEST(OpenTelemetryPluginBuildTest, Basic) {
grpc::experimental::OpenTelemetryPluginBuilder builder;
}
TEST_F(OpenTelemetryPluginEnd2EndTest, ClientAttemptStarted) {
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName});
SendRPC();
const char* kMetricName = "grpc.client.attempt.started";
auto data = ReadCurrentMetricsData(
@ -73,8 +79,9 @@ TEST_F(OTelPluginEnd2EndTest, ClientAttemptStarted) {
EXPECT_EQ(*target_value, canonical_server_address_);
}
TEST_F(OTelPluginEnd2EndTest, ClientAttemptDuration) {
Init({grpc::internal::OTelClientAttemptDurationInstrumentName()});
TEST_F(OpenTelemetryPluginEnd2EndTest, ClientAttemptDuration) {
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName});
SendRPC();
const char* kMetricName = "grpc.client.attempt.duration";
auto data = ReadCurrentMetricsData(
@ -104,9 +111,10 @@ TEST_F(OTelPluginEnd2EndTest, ClientAttemptDuration) {
EXPECT_EQ(*status_value, "OK");
}
TEST_F(OTelPluginEnd2EndTest, ClientAttemptSentTotalCompressedMessageSize) {
Init({grpc::internal::
OTelClientAttemptSentTotalCompressedMessageSizeInstrumentName()});
TEST_F(OpenTelemetryPluginEnd2EndTest,
ClientAttemptSentTotalCompressedMessageSize) {
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptSentTotalCompressedMessageSizeInstrumentName});
SendRPC();
const char* kMetricName =
"grpc.client.attempt.sent_total_compressed_message_size";
@ -138,9 +146,10 @@ TEST_F(OTelPluginEnd2EndTest, ClientAttemptSentTotalCompressedMessageSize) {
EXPECT_EQ(*status_value, "OK");
}
TEST_F(OTelPluginEnd2EndTest, ClientAttemptRcvdTotalCompressedMessageSize) {
Init({grpc::internal::
OTelClientAttemptRcvdTotalCompressedMessageSizeInstrumentName()});
TEST_F(OpenTelemetryPluginEnd2EndTest,
ClientAttemptRcvdTotalCompressedMessageSize) {
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName});
SendRPC();
const char* kMetricName =
"grpc.client.attempt.rcvd_total_compressed_message_size";
@ -172,8 +181,9 @@ TEST_F(OTelPluginEnd2EndTest, ClientAttemptRcvdTotalCompressedMessageSize) {
EXPECT_EQ(*status_value, "OK");
}
TEST_F(OTelPluginEnd2EndTest, ServerCallStarted) {
Init({grpc::internal::OTelServerCallStartedInstrumentName()});
TEST_F(OpenTelemetryPluginEnd2EndTest, ServerCallStarted) {
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallStartedInstrumentName});
SendRPC();
const char* kMetricName = "grpc.server.call.started";
auto data = ReadCurrentMetricsData(
@ -196,8 +206,9 @@ TEST_F(OTelPluginEnd2EndTest, ServerCallStarted) {
EXPECT_EQ(*method_value, kMethodName);
}
TEST_F(OTelPluginEnd2EndTest, ServerCallDuration) {
Init({grpc::internal::OTelServerCallDurationInstrumentName()});
TEST_F(OpenTelemetryPluginEnd2EndTest, ServerCallDuration) {
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName});
SendRPC();
const char* kMetricName = "grpc.server.call.duration";
auto data = ReadCurrentMetricsData(
@ -223,9 +234,10 @@ TEST_F(OTelPluginEnd2EndTest, ServerCallDuration) {
EXPECT_EQ(*status_value, "OK");
}
TEST_F(OTelPluginEnd2EndTest, ServerCallSentTotalCompressedMessageSize) {
Init({grpc::internal::
OTelServerCallSentTotalCompressedMessageSizeInstrumentName()});
TEST_F(OpenTelemetryPluginEnd2EndTest,
ServerCallSentTotalCompressedMessageSize) {
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallSentTotalCompressedMessageSizeInstrumentName});
SendRPC();
const char* kMetricName =
"grpc.server.call.sent_total_compressed_message_size";
@ -253,9 +265,10 @@ TEST_F(OTelPluginEnd2EndTest, ServerCallSentTotalCompressedMessageSize) {
EXPECT_EQ(*status_value, "OK");
}
TEST_F(OTelPluginEnd2EndTest, ServerCallRcvdTotalCompressedMessageSize) {
Init({grpc::internal::
OTelServerCallRcvdTotalCompressedMessageSizeInstrumentName()});
TEST_F(OpenTelemetryPluginEnd2EndTest,
ServerCallRcvdTotalCompressedMessageSize) {
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallRcvdTotalCompressedMessageSizeInstrumentName});
SendRPC();
const char* kMetricName =
"grpc.server.call.rcvd_total_compressed_message_size";
@ -284,8 +297,9 @@ TEST_F(OTelPluginEnd2EndTest, ServerCallRcvdTotalCompressedMessageSize) {
}
// Make sure that no meter provider results in normal operations.
TEST_F(OTelPluginEnd2EndTest, NoMeterProviderRegistered) {
Init({grpc::internal::OTelClientAttemptStartedInstrumentName()},
TEST_F(OpenTelemetryPluginEnd2EndTest, NoMeterProviderRegistered) {
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName},
/*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr,
/*test_no_meter_provider=*/true);
@ -293,8 +307,9 @@ TEST_F(OTelPluginEnd2EndTest, NoMeterProviderRegistered) {
}
// Test that a channel selector returning true records metrics on the channel.
TEST_F(OTelPluginEnd2EndTest, TargetSelectorReturnsTrue) {
Init({grpc::internal::OTelClientAttemptStartedInstrumentName()}, /*resource=*/
TEST_F(OpenTelemetryPluginEnd2EndTest, TargetSelectorReturnsTrue) {
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName}, /*resource=*/
opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false,
@ -328,8 +343,9 @@ TEST_F(OTelPluginEnd2EndTest, TargetSelectorReturnsTrue) {
// Test that a target selector returning false does not record metrics on the
// channel.
TEST_F(OTelPluginEnd2EndTest, TargetSelectorReturnsFalse) {
Init({grpc::internal::OTelClientAttemptStartedInstrumentName()}, /*resource=*/
TEST_F(OpenTelemetryPluginEnd2EndTest, TargetSelectorReturnsFalse) {
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName}, /*resource=*/
opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false,
@ -346,8 +362,9 @@ TEST_F(OTelPluginEnd2EndTest, TargetSelectorReturnsFalse) {
// Test that a target attribute filter returning true records metrics with the
// target as is on the channel.
TEST_F(OTelPluginEnd2EndTest, TargetAttributeFilterReturnsTrue) {
Init({grpc::internal::OTelClientAttemptStartedInstrumentName()}, /*resource=*/
TEST_F(OpenTelemetryPluginEnd2EndTest, TargetAttributeFilterReturnsTrue) {
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName}, /*resource=*/
opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false,
@ -383,8 +400,9 @@ TEST_F(OTelPluginEnd2EndTest, TargetAttributeFilterReturnsTrue) {
// Test that a target attribute filter returning false records metrics with the
// target as "other".
TEST_F(OTelPluginEnd2EndTest, TargetAttributeFilterReturnsFalse) {
Init({grpc::internal::OTelClientAttemptStartedInstrumentName()}, /*resource=*/
TEST_F(OpenTelemetryPluginEnd2EndTest, TargetAttributeFilterReturnsFalse) {
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName}, /*resource=*/
opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false,
@ -419,8 +437,9 @@ TEST_F(OTelPluginEnd2EndTest, TargetAttributeFilterReturnsFalse) {
}
// Test that generic method names are scrubbed properly on the client side.
TEST_F(OTelPluginEnd2EndTest, GenericClientRpc) {
Init({grpc::internal::OTelClientAttemptStartedInstrumentName()});
TEST_F(OpenTelemetryPluginEnd2EndTest, GenericClientRpc) {
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName});
SendGenericRPC();
const char* kMetricName = "grpc.client.attempt.started";
auto data = ReadCurrentMetricsData(
@ -449,9 +468,10 @@ TEST_F(OTelPluginEnd2EndTest, GenericClientRpc) {
// Test that generic method names are scrubbed properly on the client side if
// the method attribute filter is set and it returns false.
TEST_F(OTelPluginEnd2EndTest,
TEST_F(OpenTelemetryPluginEnd2EndTest,
GenericClientRpcWithMethodAttributeFilterReturningFalse) {
Init({grpc::internal::OTelClientAttemptStartedInstrumentName()},
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName},
/*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false,
@ -488,9 +508,10 @@ TEST_F(OTelPluginEnd2EndTest,
// Test that generic method names is not scrubbed on the client side if
// the method attribute filter is set and it returns true.
TEST_F(OTelPluginEnd2EndTest,
TEST_F(OpenTelemetryPluginEnd2EndTest,
GenericClientRpcWithMethodAttributeFilterReturningTrue) {
Init({grpc::internal::OTelClientAttemptStartedInstrumentName()},
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName},
/*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false,
@ -526,8 +547,9 @@ TEST_F(OTelPluginEnd2EndTest,
}
// Test that generic method names are scrubbed properly on the server side.
TEST_F(OTelPluginEnd2EndTest, GenericServerRpc) {
Init({grpc::internal::OTelServerCallDurationInstrumentName()});
TEST_F(OpenTelemetryPluginEnd2EndTest, GenericServerRpc) {
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName});
SendGenericRPC();
const char* kMetricName = "grpc.server.call.duration";
auto data = ReadCurrentMetricsData(
@ -555,9 +577,10 @@ TEST_F(OTelPluginEnd2EndTest, GenericServerRpc) {
// Test that generic method names are scrubbed properly on the server side if
// the method attribute filter is set and it returns false.
TEST_F(OTelPluginEnd2EndTest,
TEST_F(OpenTelemetryPluginEnd2EndTest,
GenericServerRpcWithMethodAttributeFilterReturningFalse) {
Init({grpc::internal::OTelServerCallDurationInstrumentName()},
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName},
/*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false,
@ -593,9 +616,10 @@ TEST_F(OTelPluginEnd2EndTest,
// Test that generic method names are not scrubbed on the server side if
// the method attribute filter is set and it returns true.
TEST_F(OTelPluginEnd2EndTest,
TEST_F(OpenTelemetryPluginEnd2EndTest,
GenericServerRpcWithMethodAttributeFilterReturningTrue) {
Init({grpc::internal::OTelServerCallDurationInstrumentName()},
Init({grpc::experimental::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName},
/*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false,

@ -38,7 +38,7 @@
namespace grpc {
namespace testing {
void OTelPluginEnd2EndTest::Init(
void OpenTelemetryPluginEnd2EndTest::Init(
const absl::flat_hash_set<absl::string_view>& metric_names,
opentelemetry::sdk::resource::Resource resource,
std::unique_ptr<grpc::internal::LabelsInjector> labels_injector,
@ -59,7 +59,7 @@ void OTelPluginEnd2EndTest::Init(
reader_.reset(new grpc::testing::MockMetricReader);
meter_provider->AddMetricReader(reader_);
grpc_core::CoreConfiguration::Reset();
grpc::internal::OpenTelemetryPluginBuilder ot_builder;
grpc::internal::OpenTelemetryPluginBuilderImpl ot_builder;
ot_builder.DisableAllMetrics();
for (const auto& metric_name : metric_names) {
ot_builder.EnableMetric(metric_name);
@ -96,19 +96,20 @@ void OTelPluginEnd2EndTest::Init(
generic_stub_ = std::make_unique<GenericStub>(std::move(channel));
}
void OTelPluginEnd2EndTest::TearDown() {
void OpenTelemetryPluginEnd2EndTest::TearDown() {
server_->Shutdown();
grpc_shutdown_blocking();
delete grpc_core::ServerCallTracerFactory::Get(grpc_core::ChannelArgs());
grpc_core::ServerCallTracerFactory::RegisterGlobal(nullptr);
}
void OTelPluginEnd2EndTest::ResetStub(std::shared_ptr<Channel> channel) {
void OpenTelemetryPluginEnd2EndTest::ResetStub(
std::shared_ptr<Channel> channel) {
stub_ = EchoTestService::NewStub(channel);
generic_stub_ = std::make_unique<GenericStub>(std::move(channel));
}
void OTelPluginEnd2EndTest::SendRPC() {
void OpenTelemetryPluginEnd2EndTest::SendRPC() {
EchoRequest request;
request.set_message("foo");
EchoResponse response;
@ -116,7 +117,7 @@ void OTelPluginEnd2EndTest::SendRPC() {
grpc::Status status = stub_->Echo(&context, request, &response);
}
void OTelPluginEnd2EndTest::SendGenericRPC() {
void OpenTelemetryPluginEnd2EndTest::SendGenericRPC() {
grpc::ClientContext context;
EchoRequest request;
std::unique_ptr<ByteBuffer> send_buf = SerializeToByteBuffer(&request);
@ -130,7 +131,7 @@ void OTelPluginEnd2EndTest::SendGenericRPC() {
absl::flat_hash_map<
std::string, std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
OTelPluginEnd2EndTest::ReadCurrentMetricsData(
OpenTelemetryPluginEnd2EndTest::ReadCurrentMetricsData(
absl::AnyInvocable<
bool(const absl::flat_hash_map<
std::string,

@ -54,7 +54,7 @@ class MockMetricReader : public opentelemetry::sdk::metrics::MetricReader {
void OnInitialized() noexcept override {}
};
class OTelPluginEnd2EndTest : public ::testing::Test {
class OpenTelemetryPluginEnd2EndTest : public ::testing::Test {
protected:
// Note that we can't use SetUp() here since we want to send in parameters.
void Init(

@ -236,7 +236,7 @@ int main(int argc, char** argv) {
auto meter_provider =
std::make_shared<opentelemetry::sdk::metrics::MeterProvider>();
meter_provider->AddMetricReader(std::move(prometheus_exporter));
grpc::internal::OpenTelemetryPluginBuilder otel_builder;
grpc::internal::OpenTelemetryPluginBuilderImpl otel_builder;
otel_builder.SetMeterProvider(std::move(meter_provider));
otel_builder.BuildAndRegisterGlobal();
}

@ -403,7 +403,7 @@ void EnableCsmObservability() {
auto meter_provider =
std::make_shared<opentelemetry::sdk::metrics::MeterProvider>();
meter_provider->AddMetricReader(std::move(prometheus_exporter));
auto observability = grpc::experimental::CsmObservabilityBuilder();
grpc::experimental::CsmObservabilityBuilder observability;
observability.SetMeterProvider(std::move(meter_provider));
auto status = observability.BuildAndRegister();
}

@ -54,7 +54,7 @@ void EnableCsmObservability() {
auto meter_provider =
std::make_shared<opentelemetry::sdk::metrics::MeterProvider>();
meter_provider->AddMetricReader(std::move(prometheus_exporter));
auto observability = grpc::experimental::CsmObservabilityBuilder();
grpc::experimental::CsmObservabilityBuilder observability;
observability.SetMeterProvider(std::move(meter_provider));
auto status = observability.BuildAndRegister();
}

Loading…
Cancel
Save