[CSM] Modify CsmObservability to CsmOpenTelemetryPluginOption internally (#35803)

Changes -
* `CsmObservability` API will now use the `CsmOpenTelemetryPluginOption` internally. After this change, `CsmObservability` will enable observability for all channels and servers. (Earlier, `CsmObservability` only enabled observability for CSM-enabled channels and servers.) CSM labels will still be added just for CSM-enabled channels and servers.
* Also, we no longer need the ability to set `LabelInjector` on the `OpenTelemetryPluginBuilder` directly. Instead, we always use `PluginOption` to inject the `LabelInjector`. This simplifies the code as well.

Note that `SetTargetSelector` and `SetServerSelector` APIs on the `OpenTelemetryPluginBuilderImpl` are not being deleted yet since we might need them shortly. This is also why `OpenTelemetryPluginBuilderImpl` is not being deleted right now.

Closes #35803

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35803 from yashykt:CsmO11yApisUsePluginOption cf3d65900d
PiperOrigin-RevId: 604323898
pull/35629/head^2
Yash Tibrewal 10 months ago committed by Copybara-Service
parent 06ac429fea
commit 20e5e0cb29
  1. 9
      src/cpp/ext/csm/csm_observability.cc
  2. 27
      src/cpp/ext/otel/key_value_iterable.h
  3. 24
      src/cpp/ext/otel/otel_client_filter.cc
  4. 8
      src/cpp/ext/otel/otel_plugin.cc
  5. 4
      src/cpp/ext/otel/otel_plugin.h
  6. 14
      src/cpp/ext/otel/otel_server_call_tracer.cc
  7. 32
      test/cpp/ext/csm/metadata_exchange_test.cc
  8. 65
      test/cpp/ext/otel/otel_plugin_test.cc
  9. 1
      test/cpp/ext/otel/otel_test_library.cc
  10. 15
      test/cpp/ext/otel/otel_test_library.h

@ -130,13 +130,8 @@ CsmObservabilityBuilder::SetGenericMethodAttributeFilter(
}
absl::StatusOr<CsmObservability> CsmObservabilityBuilder::BuildAndRegister() {
builder_->SetServerSelector(internal::CsmServerSelector);
builder_->SetTargetSelector(internal::CsmChannelTargetSelector);
builder_->SetLabelsInjector(
std::make_unique<internal::ServiceMeshLabelsInjector>(
google::cloud::otel::MakeResourceDetector()
->Detect()
.GetAttributes()));
builder_->AddPluginOption(
std::make_unique<grpc::internal::CsmOpenTelemetryPluginOption>());
auto status = builder_->BuildAndRegisterGlobal();
if (!status.ok()) {
return status;

@ -49,7 +49,6 @@ inline opentelemetry::nostd::string_view AbslStrViewToOpenTelemetryStrView(
class KeyValueIterable : public opentelemetry::common::KeyValueIterable {
public:
explicit KeyValueIterable(
LabelsIterable* injected_labels_iterable,
const std::vector<std::unique_ptr<LabelsIterable>>&
injected_labels_from_plugin_options,
absl::Span<const std::pair<absl::string_view, absl::string_view>>
@ -58,8 +57,7 @@ class KeyValueIterable : public opentelemetry::common::KeyValueIterable {
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_span,
bool is_client)
: injected_labels_iterable_(injected_labels_iterable),
injected_labels_from_plugin_options_(
: injected_labels_from_plugin_options_(
injected_labels_from_plugin_options),
additional_labels_(additional_labels),
active_plugin_options_view_(active_plugin_options_view),
@ -70,20 +68,6 @@ class KeyValueIterable : public opentelemetry::common::KeyValueIterable {
bool(opentelemetry::nostd::string_view,
opentelemetry::common::AttributeValue)>
callback) const noexcept override {
if (injected_labels_iterable_ != nullptr) {
injected_labels_iterable_->ResetIteratorPosition();
while (const auto& pair = injected_labels_iterable_->Next()) {
if (!callback(AbslStrViewToOpenTelemetryStrView(pair->first),
AbslStrViewToOpenTelemetryStrView(pair->second))) {
return false;
}
}
}
if (OpenTelemetryPluginState().labels_injector != nullptr &&
!OpenTelemetryPluginState().labels_injector->AddOptionalLabels(
is_client_, optional_labels_, callback)) {
return false;
}
if (active_plugin_options_view_ != nullptr &&
!active_plugin_options_view_->ForEach(
[callback, this](
@ -116,9 +100,7 @@ class KeyValueIterable : public opentelemetry::common::KeyValueIterable {
}
size_t size() const noexcept override {
size_t size = injected_labels_iterable_ != nullptr
? injected_labels_iterable_->Size()
: 0;
size_t size = 0;
for (const auto& plugin_option_injected_iterable :
injected_labels_from_plugin_options_) {
if (plugin_option_injected_iterable != nullptr) {
@ -126,10 +108,6 @@ class KeyValueIterable : public opentelemetry::common::KeyValueIterable {
}
}
size += additional_labels_.size();
if (OpenTelemetryPluginState().labels_injector != nullptr) {
size += OpenTelemetryPluginState().labels_injector->GetOptionalLabelsSize(
is_client_, optional_labels_);
}
if (active_plugin_options_view_ != nullptr) {
active_plugin_options_view_->ForEach(
[&size, this](const InternalOpenTelemetryPluginOption& plugin_option,
@ -143,7 +121,6 @@ class KeyValueIterable : public opentelemetry::common::KeyValueIterable {
}
private:
LabelsIterable* injected_labels_iterable_;
const std::vector<std::unique_ptr<LabelsIterable>>&
injected_labels_from_plugin_options_;
absl::Span<const std::pair<absl::string_view, absl::string_view>>

@ -131,19 +131,15 @@ OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
// We might not have all the injected labels that we want at this point, so
// avoid recording a subset of injected labels here.
OpenTelemetryPluginState().client.attempt.started->Add(
1, KeyValueIterable(
/*injected_labels_iterable=*/nullptr, {}, additional_labels,
/*active_plugin_options_view=*/nullptr,
/*optional_labels_span=*/{}, /*is_client=*/true));
1, KeyValueIterable(/*injected_labels_from_plugin_options=*/{},
additional_labels,
/*active_plugin_options_view=*/nullptr,
/*optional_labels_span=*/{}, /*is_client=*/true));
}
}
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) {
if (OpenTelemetryPluginState().labels_injector != nullptr) {
injected_labels_ = OpenTelemetryPluginState().labels_injector->GetLabels(
recv_initial_metadata);
}
parent_->parent_->active_plugin_options_view().ForEach(
[&](const InternalOpenTelemetryPluginOption& plugin_option,
size_t /*index*/) {
@ -158,10 +154,6 @@ void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
if (OpenTelemetryPluginState().labels_injector != nullptr) {
OpenTelemetryPluginState().labels_injector->AddLabels(send_initial_metadata,
nullptr);
}
parent_->parent_->active_plugin_options_view().ForEach(
[&](const InternalOpenTelemetryPluginOption& plugin_option,
size_t /*index*/) {
@ -210,10 +202,10 @@ void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
{OpenTelemetryStatusKey(),
grpc_status_code_to_string(
static_cast<grpc_status_code>(status.code()))}}};
KeyValueIterable labels(
injected_labels_.get(), injected_labels_from_plugin_options_,
additional_labels, &parent_->parent_->active_plugin_options_view(),
optional_labels_array_, /*is_client=*/true);
KeyValueIterable labels(injected_labels_from_plugin_options_,
additional_labels,
&parent_->parent_->active_plugin_options_view(),
optional_labels_array_, /*is_client=*/true);
if (OpenTelemetryPluginState().client.attempt.duration != nullptr) {
OpenTelemetryPluginState().client.attempt.duration->Record(
absl::ToDoubleSeconds(absl::Now() - start_time_), labels,

@ -116,13 +116,6 @@ OpenTelemetryPluginBuilderImpl::DisableAllMetrics() {
return *this;
}
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetLabelsInjector(
std::unique_ptr<LabelsInjector> labels_injector) {
labels_injector_ = std::move(labels_injector);
return *this;
}
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::SetTargetSelector(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
@ -243,7 +236,6 @@ absl::Status OpenTelemetryPluginBuilderImpl::BuildAndRegisterGlobal() {
kServerCallRcvdTotalCompressedMessageSizeInstrumentName),
"Compressed message bytes received per server call", "By");
}
g_otel_plugin_state_->labels_injector = std::move(labels_injector_);
g_otel_plugin_state_->target_attribute_filter =
std::move(target_attribute_filter_);
g_otel_plugin_state_->server_selector = std::move(server_selector_);

@ -135,7 +135,6 @@ struct OpenTelemetryPluginState {
} server;
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
meter_provider;
std::unique_ptr<LabelsInjector> labels_injector;
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter;
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
@ -173,9 +172,6 @@ class OpenTelemetryPluginBuilderImpl {
OpenTelemetryPluginBuilderImpl& EnableMetric(absl::string_view metric_name);
OpenTelemetryPluginBuilderImpl& DisableMetric(absl::string_view metric_name);
OpenTelemetryPluginBuilderImpl& DisableAllMetrics();
// Allows setting a labels injector on calls traced through this plugin.
OpenTelemetryPluginBuilderImpl& SetLabelsInjector(
std::unique_ptr<LabelsInjector> labels_injector);
// If set, \a target_selector is called per channel to decide whether to
// collect metrics on that target or not.
OpenTelemetryPluginBuilderImpl& SetTargetSelector(

@ -82,10 +82,6 @@ class OpenTelemetryServerCallTracer : public grpc_core::ServerCallTracer {
// arguments.
void RecordSendInitialMetadata(
grpc_metadata_batch* send_initial_metadata) override {
if (OpenTelemetryPluginState().labels_injector != nullptr) {
OpenTelemetryPluginState().labels_injector->AddLabels(
send_initial_metadata, injected_labels_.get());
}
active_plugin_options_view_.ForEach(
[&](const InternalOpenTelemetryPluginOption& plugin_option,
size_t index) {
@ -162,7 +158,6 @@ class OpenTelemetryServerCallTracer : public grpc_core::ServerCallTracer {
absl::Time start_time_;
absl::Duration elapsed_time_;
grpc_core::Slice path_;
std::unique_ptr<LabelsIterable> injected_labels_;
bool registered_method_;
ActivePluginOptionsView active_plugin_options_view_;
// TODO(yashykt): It's wasteful to do this per call. When we re-haul the stats
@ -175,10 +170,6 @@ void OpenTelemetryServerCallTracer::RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) {
path_ =
recv_initial_metadata->get_pointer(grpc_core::HttpPathMetadata())->Ref();
if (OpenTelemetryPluginState().labels_injector != nullptr) {
injected_labels_ = OpenTelemetryPluginState().labels_injector->GetLabels(
recv_initial_metadata);
}
active_plugin_options_view_.ForEach(
[&](const InternalOpenTelemetryPluginOption& plugin_option,
size_t index) {
@ -198,7 +189,7 @@ void OpenTelemetryServerCallTracer::RecordReceivedInitialMetadata(
// We might not have all the injected labels that we want at this point, so
// avoid recording a subset of injected labels here.
OpenTelemetryPluginState().server.call.started->Add(
1, KeyValueIterable(/*injected_labels_iterable=*/nullptr, {},
1, KeyValueIterable(/*injected_labels_from_plugin_options=*/{},
additional_labels,
/*active_plugin_options_view=*/nullptr, {},
/*is_client=*/false));
@ -221,8 +212,7 @@ void OpenTelemetryServerCallTracer::RecordEnd(
grpc_status_code_to_string(final_info->final_status)}}};
// Currently we do not have any optional labels on the server side.
KeyValueIterable labels(
injected_labels_.get(), injected_labels_from_plugin_options_,
additional_labels,
injected_labels_from_plugin_options_, additional_labels,
/*active_plugin_options_view=*/nullptr, /*optional_labels_span=*/{},
/*is_client=*/false);
if (OpenTelemetryPluginState().server.call.duration != nullptr) {

@ -121,6 +121,33 @@ class TestScenario {
XdsBootstrapSource bootstrap_source_;
};
// A PluginOption that injects `ServiceMeshLabelsInjector`. (This is different
// from CsmOpenTelemetryPluginOption since it does not restrict itself to just
// CSM channels and servers.)
class MeshLabelsPluginOption
: public grpc::internal::InternalOpenTelemetryPluginOption {
public:
explicit MeshLabelsPluginOption(
const opentelemetry::sdk::common::AttributeMap& map)
: labels_injector_(
std::make_unique<grpc::internal::ServiceMeshLabelsInjector>(map)) {}
bool IsActiveOnClientChannel(absl::string_view /*target*/) const override {
return true;
}
bool IsActiveOnServer(const grpc_core::ChannelArgs& /*args*/) const override {
return true;
}
const grpc::internal::LabelsInjector* labels_injector() const override {
return labels_injector_.get();
}
private:
std::unique_ptr<grpc::internal::ServiceMeshLabelsInjector> labels_injector_;
};
class MetadataExchangeTest
: public OpenTelemetryPluginEnd2EndTest,
public ::testing::WithParamInterface<TestScenario> {
@ -149,9 +176,8 @@ class MetadataExchangeTest
OpenTelemetryPluginEnd2EndTest::Init(std::move(
Options()
.set_metric_names(std::move(metric_names))
.set_labels_injector(
std::make_unique<grpc::internal::ServiceMeshLabelsInjector>(
GetParam().GetTestResource().GetAttributes()))
.add_plugin_option(std::make_unique<MeshLabelsPluginOption>(
GetParam().GetTestResource().GetAttributes()))
.set_labels_to_inject(std::move(labels_to_inject))
.set_target_selector(
[enable_client_side_injector](absl::string_view /*target*/) {

@ -775,19 +775,15 @@ class CustomPluginOption
};
TEST_F(OpenTelemetryPluginOptionEnd2EndTest, Basic) {
std::vector<
std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
plugin_option_list;
plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ true, /*enabled_on_server*/ true,
std::make_pair("key", "value")));
Init(
std::move(Options()
.set_metric_names({grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName})
.set_plugin_options(std::move(plugin_option_list))));
.add_plugin_option(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ true, /*enabled_on_server*/ true,
std::make_pair("key", "value")))));
SendRPC();
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
@ -812,19 +808,15 @@ TEST_F(OpenTelemetryPluginOptionEnd2EndTest, Basic) {
}
TEST_F(OpenTelemetryPluginOptionEnd2EndTest, ClientOnlyPluginOption) {
std::vector<
std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
plugin_option_list;
plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ true, /*enabled_on_server*/ false,
std::make_pair("key", "value")));
Init(
std::move(Options()
.set_metric_names({grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName})
.set_plugin_options(std::move(plugin_option_list))));
.add_plugin_option(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ true, /*enabled_on_server*/ false,
std::make_pair("key", "value")))));
SendRPC();
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
@ -850,19 +842,15 @@ TEST_F(OpenTelemetryPluginOptionEnd2EndTest, ClientOnlyPluginOption) {
}
TEST_F(OpenTelemetryPluginOptionEnd2EndTest, ServerOnlyPluginOption) {
std::vector<
std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
plugin_option_list;
plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ false, /*enabled_on_server*/ true,
std::make_pair("key", "value")));
Init(
std::move(Options()
.set_metric_names({grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName})
.set_plugin_options(std::move(plugin_option_list))));
.add_plugin_option(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ false, /*enabled_on_server*/ true,
std::make_pair("key", "value")))));
SendRPC();
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
@ -889,32 +877,27 @@ TEST_F(OpenTelemetryPluginOptionEnd2EndTest, ServerOnlyPluginOption) {
TEST_F(OpenTelemetryPluginOptionEnd2EndTest,
MultipleEnabledAndDisabledPluginOptions) {
std::vector<
std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
plugin_option_list;
plugin_option_list.reserve(5);
plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ true, /*enabled_on_server*/ true,
std::make_pair("key1", "value1")));
plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ true, /*enabled_on_server*/ false,
std::make_pair("key2", "value2")));
plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ true, /*enabled_on_server*/ false,
std::make_pair("key3", "value3")));
plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ false, /*enabled_on_server*/ true,
std::make_pair("key4", "value4")));
plugin_option_list.emplace_back(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ false, /*enabled_on_server*/ true,
std::make_pair("key5", "value5")));
Init(
std::move(Options()
.set_metric_names({grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName})
.set_plugin_options(std::move(plugin_option_list))));
.add_plugin_option(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ true, /*enabled_on_server*/ true,
std::make_pair("key1", "value1")))
.add_plugin_option(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ true, /*enabled_on_server*/ false,
std::make_pair("key2", "value2")))
.add_plugin_option(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ true, /*enabled_on_server*/ false,
std::make_pair("key3", "value3")))
.add_plugin_option(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ false, /*enabled_on_server*/ true,
std::make_pair("key4", "value4")))
.add_plugin_option(std::make_unique<CustomPluginOption>(
/*enabled_on_client*/ false, /*enabled_on_server*/ true,
std::make_pair("key5", "value5")))));
SendRPC();
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<

@ -105,7 +105,6 @@ void OpenTelemetryPluginEnd2EndTest::Init(Options config) {
meter_provider->AddMetricReader(reader_);
ot_builder.SetMeterProvider(std::move(meter_provider));
}
ot_builder.SetLabelsInjector(std::move(config.labels_injector));
ot_builder.SetTargetSelector(std::move(config.target_selector));
ot_builder.SetServerSelector(std::move(config.server_selector));
ot_builder.SetTargetAttributeFilter(

@ -68,12 +68,6 @@ class OpenTelemetryPluginEnd2EndTest : public ::testing::Test {
return *this;
}
Options& set_labels_injector(
std::unique_ptr<grpc::internal::LabelsInjector> injector) {
labels_injector = std::move(injector);
return *this;
}
Options& set_use_meter_provider(bool flag) {
use_meter_provider = flag;
return *this;
@ -111,11 +105,10 @@ class OpenTelemetryPluginEnd2EndTest : public ::testing::Test {
return *this;
}
Options& set_plugin_options(
std::vector<
std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>>
options) {
plugin_options = std::move(options);
Options& add_plugin_option(
std::unique_ptr<grpc::internal::InternalOpenTelemetryPluginOption>
option) {
plugin_options.push_back(std::move(option));
return *this;
}

Loading…
Cancel
Save