From 4811e1dd901e41ec9c826078b5af7a299ec32f89 Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Thu, 4 Apr 2024 15:33:36 -0700 Subject: [PATCH 01/10] [lint] Remove reference to check_port_platform.py (#36257) Deleted in https://github.com/grpc/grpc/pull/36234 Closes #36257 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36257 from drfloob:cleanup-portplatform-checker 05fe7a1a255abbfecc065265c7f4949236e36489 PiperOrigin-RevId: 621990331 --- tools/distrib/sanitize.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/distrib/sanitize.sh b/tools/distrib/sanitize.sh index bc48ad50743..eea6b397dba 100755 --- a/tools/distrib/sanitize.sh +++ b/tools/distrib/sanitize.sh @@ -23,7 +23,6 @@ tools/distrib/check_naked_includes.py --fix || true tools/distrib/check_copyright.py --fix tools/distrib/add-iwyu.py tools/distrib/check_trailing_newlines.sh --fix -tools/run_tests/sanity/check_port_platform.py --fix tools/run_tests/sanity/check_include_style.py --fix || true tools/distrib/check_namespace_qualification.py --fix tools/distrib/black_code.sh From 130b948100d1dfb1764738bbc6cd058132f0b51b Mon Sep 17 00:00:00 2001 From: Alisha Nanda Date: Thu, 4 Apr 2024 15:49:28 -0700 Subject: [PATCH 02/10] [experiments] extend expiry (#36246) Closes #36246 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36246 from ananda1066:exps 128795c35d5183d3935103aec946973a3f81ca1e PiperOrigin-RevId: 621994634 --- src/core/lib/experiments/experiments.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 0605df1f293..0efa25c628b 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -55,7 +55,7 @@ - name: canary_client_privacy description: If set, canary client privacy - expiry: 2024/04/01 + expiry: 2024/08/01 owner: alishananda@google.com test_tags: [] allow_in_fuzzing_config: false @@ -69,7 +69,7 @@ - name: client_privacy description: If set, client privacy - expiry: 2024/04/01 + expiry: 2024/08/01 owner: alishananda@google.com test_tags: [] allow_in_fuzzing_config: false @@ -95,7 +95,7 @@ uses_polling: true - name: free_large_allocator description: If set, return all free bytes from a "big" allocator - expiry: 2024/04/01 + expiry: 2024/08/01 owner: alishananda@google.com test_tags: [resource_quota_test] - name: http2_stats_fix @@ -187,7 +187,7 @@ - name: server_privacy description: If set, server privacy - expiry: 2024/04/01 + expiry: 2024/08/01 owner: alishananda@google.com test_tags: [] allow_in_fuzzing_config: false From 4ed5001a271e8df12131e284c65162046e861f30 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 4 Apr 2024 15:54:58 -0700 Subject: [PATCH 03/10] [build] add visibility to handshaker targets PiperOrigin-RevId: 621996203 --- src/core/BUILD | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/core/BUILD b/src/core/BUILD index 9f9b94264b4..2e5ea79b716 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1207,6 +1207,7 @@ grpc_cc_library( public_hdrs = [ "lib/transport/handshaker_factory.h", ], + visibility = ["@grpc:alt_grpc_base_legacy"], deps = [ "channel_args", "iomgr_fwd", @@ -1223,6 +1224,7 @@ grpc_cc_library( public_hdrs = [ "lib/transport/handshaker_registry.h", ], + visibility = ["@grpc:alt_grpc_base_legacy"], deps = [ "channel_args", "handshaker_factory", @@ -1627,6 +1629,7 @@ grpc_cc_library( hdrs = [ "lib/iomgr/iomgr_fwd.h", ], + visibility = ["@grpc:alt_grpc_base_legacy"], deps = ["//:gpr_platform"], ) From d37726298b52dad7e8e926f7c122b57b48afdea8 Mon Sep 17 00:00:00 2001 From: Yijie Ma Date: Fri, 5 Apr 2024 08:11:59 -0700 Subject: [PATCH 04/10] [OpenTelemetry] Implement async gauges (#36182) Closes #36182 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36182 from yijiem:grpc-metrics-async-gauge 8614485f3dddb514b0fbda4977e1425f41312984 PiperOrigin-RevId: 622182710 --- src/core/lib/channel/metrics.h | 14 +- src/cpp/ext/otel/BUILD | 2 + src/cpp/ext/otel/otel_plugin.cc | 330 ++++++++++++++++- src/cpp/ext/otel/otel_plugin.h | 75 +++- test/cpp/ext/otel/otel_plugin_test.cc | 479 ++++++++++++++++++++++--- test/cpp/ext/otel/otel_test_library.cc | 48 +++ test/cpp/ext/otel/otel_test_library.h | 26 ++ 7 files changed, 892 insertions(+), 82 deletions(-) diff --git a/src/core/lib/channel/metrics.h b/src/core/lib/channel/metrics.h index 9bd1ba82972..1dd79696552 100644 --- a/src/core/lib/channel/metrics.h +++ b/src/core/lib/channel/metrics.h @@ -87,9 +87,10 @@ class GlobalInstrumentsRegistry { struct GlobalDoubleHistogramHandle : public GlobalInstrumentHandle {}; struct GlobalInt64GaugeHandle : public GlobalInstrumentHandle {}; struct GlobalDoubleGaugeHandle : public GlobalInstrumentHandle {}; - struct GlobalCallbackHandle : public GlobalInstrumentHandle {}; - struct GlobalCallbackInt64GaugeHandle : public GlobalCallbackHandle {}; - struct GlobalCallbackDoubleGaugeHandle : public GlobalCallbackHandle {}; + struct GlobalCallbackInt64GaugeHandle : public GlobalInstrumentHandle {}; + struct GlobalCallbackDoubleGaugeHandle : public GlobalInstrumentHandle {}; + using GlobalCallbackHandle = absl::variant; // Creates instrument in the GlobalInstrumentsRegistry. static GlobalUInt64CounterHandle RegisterUInt64Counter( @@ -317,13 +318,16 @@ class GlobalStatsPluginRegistry { // Registers a callback to be used to populate callback metrics. // The callback will update the specified metrics. The callback - // will be invoked no more often than min_interval. + // will be invoked no more often than min_interval. Multiple callbacks may + // be registered for the same metrics, as long as no two callbacks report + // data for the same set of labels in which case the behavior is undefined. // // The returned object is a handle that allows the caller to control // the lifetime of the callback; when the returned object is // destroyed, the callback is de-registered. The returned object // must not outlive the StatsPluginGroup object that created it. - std::unique_ptr RegisterCallback( + GRPC_MUST_USE_RESULT std::unique_ptr + RegisterCallback( absl::AnyInvocable callback, std::vector metrics, Duration min_interval = Duration::Seconds(5)); diff --git a/src/cpp/ext/otel/BUILD b/src/cpp/ext/otel/BUILD index 824ae9eea71..588c769652d 100644 --- a/src/cpp/ext/otel/BUILD +++ b/src/cpp/ext/otel/BUILD @@ -44,6 +44,7 @@ grpc_cc_library( ], external_deps = [ "absl/base:core_headers", + "absl/container:flat_hash_map", "absl/container:flat_hash_set", "absl/functional:any_invocable", "absl/status", @@ -75,6 +76,7 @@ grpc_cc_library( "//src/core:channel_stack_type", "//src/core:context", "//src/core:error", + "//src/core:match", "//src/core:metadata_batch", "//src/core:metrics", "//src/core:slice", diff --git a/src/cpp/ext/otel/otel_plugin.cc b/src/cpp/ext/otel/otel_plugin.cc index e4e1e50b0a7..4b9e69892b3 100644 --- a/src/cpp/ext/otel/otel_plugin.cc +++ b/src/cpp/ext/otel/otel_plugin.cc @@ -29,6 +29,7 @@ #include "opentelemetry/metrics/sync_instruments.h" #include "opentelemetry/nostd/shared_ptr.h" #include "opentelemetry/nostd/unique_ptr.h" +#include "opentelemetry/nostd/variant.h" #include #include @@ -38,6 +39,7 @@ #include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/gprpp/match.h" #include "src/core/lib/surface/channel_stack_type.h" #include "src/cpp/ext/otel/key_value_iterable.h" #include "src/cpp/ext/otel/otel_client_call_tracer.h" @@ -109,13 +111,18 @@ class OpenTelemetryPlugin::NPCMetricsKeyValueIterable return false; } } - for (size_t i = 0; i < optional_label_keys_.size(); ++i) { + // Since we are saving the optional label values as std::string for callback + // gauges, we want to minimize memory usage by filtering out the disabled + // optional label values. + bool filtered = optional_label_values_.size() < optional_label_keys_.size(); + for (size_t i = 0, j = 0; i < optional_label_keys_.size(); ++i) { if (!optional_labels_bits_.test(i)) { + if (!filtered) ++j; continue; } if (!callback( AbslStrViewToOpenTelemetryStrView(optional_label_keys_[i]), - AbslStrViewToOpenTelemetryStrView(optional_label_values_[i]))) { + AbslStrViewToOpenTelemetryStrView(optional_label_values_[j++]))) { return false; } } @@ -236,6 +243,91 @@ absl::Status OpenTelemetryPluginBuilderImpl::BuildAndRegisterGlobal() { return absl::OkStatus(); } +OpenTelemetryPlugin::CallbackMetricReporter::CallbackMetricReporter( + OpenTelemetryPlugin* ot_plugin, grpc_core::RegisteredMetricCallback* key) + : ot_plugin_(ot_plugin), key_(key) { + // Since we are updating the timestamp and updating the cache for all + // registered instruments in a RegisteredMetricCallback, we will need to + // clear all the cache cells for this RegisteredMetricCallback first, so + // that if a particular combination of labels was previously present but + // is no longer present, we won't continue to report it. + for (const auto& handle : key->metrics()) { + grpc_core::Match( + handle, + [&](const grpc_core::GlobalInstrumentsRegistry:: + GlobalCallbackInt64GaugeHandle& handle) { + auto& callback_gauge_state = + absl::get>>( + ot_plugin_->instruments_data_.at(handle.index).instrument); + callback_gauge_state->caches[key].clear(); + }, + [&](const grpc_core::GlobalInstrumentsRegistry:: + GlobalCallbackDoubleGaugeHandle& handle) { + auto& callback_gauge_state = + absl::get>>( + ot_plugin_->instruments_data_.at(handle.index).instrument); + callback_gauge_state->caches[key].clear(); + }); + } +} + +void OpenTelemetryPlugin::CallbackMetricReporter::Report( + grpc_core::GlobalInstrumentsRegistry::GlobalCallbackInt64GaugeHandle handle, + int64_t value, absl::Span label_values, + absl::Span optional_values) { + const auto& instrument_data = ot_plugin_->instruments_data_.at(handle.index); + auto* callback_gauge_state = + absl::get_if>>( + &instrument_data.instrument); + GPR_ASSERT(callback_gauge_state != nullptr); + const auto& descriptor = + grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle); + GPR_ASSERT(descriptor.label_keys.size() == label_values.size()); + GPR_ASSERT(descriptor.optional_label_keys.size() == optional_values.size()); + auto& cell = (*callback_gauge_state)->caches.at(key_); + std::vector key; + key.reserve(label_values.size() + + instrument_data.optional_labels_bits.count()); + for (const absl::string_view value : label_values) { + key.emplace_back(value); + } + for (size_t i = 0; i < optional_values.size(); ++i) { + if (instrument_data.optional_labels_bits.test(i)) { + key.emplace_back(optional_values[i]); + } + } + cell.insert_or_assign(std::move(key), value); +} + +void OpenTelemetryPlugin::CallbackMetricReporter::Report( + grpc_core::GlobalInstrumentsRegistry::GlobalCallbackDoubleGaugeHandle + handle, + double value, absl::Span label_values, + absl::Span optional_values) { + const auto& instrument_data = ot_plugin_->instruments_data_.at(handle.index); + auto* callback_gauge_state = + absl::get_if>>( + &instrument_data.instrument); + GPR_ASSERT(callback_gauge_state != nullptr); + const auto& descriptor = + grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle); + GPR_ASSERT(descriptor.label_keys.size() == label_values.size()); + GPR_ASSERT(descriptor.optional_label_keys.size() == optional_values.size()); + auto& cell = (*callback_gauge_state)->caches.at(key_); + std::vector key; + key.reserve(label_values.size() + + instrument_data.optional_labels_bits.count()); + for (const absl::string_view value : label_values) { + key.emplace_back(value); + } + for (size_t i = 0; i < optional_values.size(); ++i) { + if (instrument_data.optional_labels_bits.test(i)) { + key.emplace_back(optional_values[i]); + } + } + cell.insert_or_assign(std::move(key), value); +} + OpenTelemetryPlugin::OpenTelemetryPlugin( const absl::flat_hash_set& metrics, opentelemetry::nostd::shared_ptr @@ -388,26 +480,42 @@ OpenTelemetryPlugin::OpenTelemetryPlugin( descriptor.value_type)); } break; - // TODO(yashkt, yijiem): implement gauges. case grpc_core::GlobalInstrumentsRegistry::InstrumentType::kGauge: - switch (descriptor.value_type) { - case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: - break; - case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: - break; - default: - grpc_core::Crash( - absl::StrFormat("Unknown or unsupported value type: %d", - descriptor.value_type)); - } + grpc_core::Crash( + "Non-callback gauge is not supported and will be deleted in " + "the future."); break; case grpc_core::GlobalInstrumentsRegistry::InstrumentType:: kCallbackGauge: switch (descriptor.value_type) { - case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: + case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: { + auto observable_state = + std::make_unique>(); + observable_state->id = descriptor.index; + observable_state->ot_plugin = this; + observable_state->instrument = + meter->CreateInt64ObservableGauge( + std::string(descriptor.name), + std::string(descriptor.description), + std::string(descriptor.unit)); + instruments_data_[descriptor.index].instrument = + std::move(observable_state); break; - case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: + } + case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: { + auto observable_state = + std::make_unique>(); + observable_state->id = descriptor.index; + observable_state->ot_plugin = this; + observable_state->instrument = + meter->CreateDoubleObservableGauge( + std::string(descriptor.name), + std::string(descriptor.description), + std::string(descriptor.unit)); + instruments_data_[descriptor.index].instrument = + std::move(observable_state); break; + } default: grpc_core::Crash( absl::StrFormat("Unknown or unsupported value type: %d", @@ -435,6 +543,7 @@ OpenTelemetryPlugin::IsEnabledForChannel( } return {false, nullptr}; } + std::pair> OpenTelemetryPlugin::IsEnabledForServer( const grpc_core::ChannelArgs& args) const { @@ -469,6 +578,7 @@ void OpenTelemetryPlugin::AddCounter( descriptor.optional_label_keys, optional_values, instrument_data.optional_labels_bits)); } + void OpenTelemetryPlugin::AddCounter( grpc_core::GlobalInstrumentsRegistry::GlobalDoubleCounterHandle handle, double value, absl::Span label_values, @@ -492,6 +602,7 @@ void OpenTelemetryPlugin::AddCounter( descriptor.optional_label_keys, optional_values, instrument_data.optional_labels_bits)); } + void OpenTelemetryPlugin::RecordHistogram( grpc_core::GlobalInstrumentsRegistry::GlobalUInt64HistogramHandle handle, uint64_t value, absl::Span label_values, @@ -517,6 +628,7 @@ void OpenTelemetryPlugin::RecordHistogram( instrument_data.optional_labels_bits), opentelemetry::context::Context{}); } + void OpenTelemetryPlugin::RecordHistogram( grpc_core::GlobalInstrumentsRegistry::GlobalDoubleHistogramHandle handle, double value, absl::Span label_values, @@ -543,6 +655,193 @@ void OpenTelemetryPlugin::RecordHistogram( opentelemetry::context::Context{}); } +void OpenTelemetryPlugin::AddCallback( + grpc_core::RegisteredMetricCallback* callback) { + std::vector< + absl::variant*, CallbackGaugeState*>> + gauges_that_need_to_add_callback; + { + grpc_core::MutexLock lock(&mu_); + callback_timestamps_.emplace(callback, grpc_core::Timestamp::InfPast()); + for (const auto& handle : callback->metrics()) { + grpc_core::Match( + handle, + [&](const grpc_core::GlobalInstrumentsRegistry:: + GlobalCallbackInt64GaugeHandle& handle) { + const auto& instrument_data = instruments_data_.at(handle.index); + if (absl::holds_alternative(instrument_data.instrument)) { + // This instrument is disabled. + return; + } + auto* callback_gauge_state = + absl::get_if>>( + &instrument_data.instrument); + GPR_ASSERT(callback_gauge_state != nullptr); + (*callback_gauge_state) + ->caches.emplace(callback, + CallbackGaugeState::Cache{}); + if (!std::exchange((*callback_gauge_state)->ot_callback_registered, + true)) { + gauges_that_need_to_add_callback.push_back( + callback_gauge_state->get()); + } + }, + [&](const grpc_core::GlobalInstrumentsRegistry:: + GlobalCallbackDoubleGaugeHandle& handle) { + const auto& instrument_data = instruments_data_.at(handle.index); + if (absl::holds_alternative(instrument_data.instrument)) { + // This instrument is disabled. + return; + } + auto* callback_gauge_state = + absl::get_if>>( + &instrument_data.instrument); + GPR_ASSERT(callback_gauge_state != nullptr); + (*callback_gauge_state) + ->caches.emplace(callback, CallbackGaugeState::Cache{}); + if (!std::exchange((*callback_gauge_state)->ot_callback_registered, + true)) { + gauges_that_need_to_add_callback.push_back( + callback_gauge_state->get()); + } + }); + } + } + // AddCallback internally grabs OpenTelemetry's observable_registry's lock. So + // we need to call it without our plugin lock otherwise we may deadlock. + for (const auto& gauge : gauges_that_need_to_add_callback) { + grpc_core::Match( + gauge, + [](CallbackGaugeState* gauge) { + gauge->instrument->AddCallback( + &CallbackGaugeState::CallbackGaugeCallback, gauge); + }, + [](CallbackGaugeState* gauge) { + gauge->instrument->AddCallback( + &CallbackGaugeState::CallbackGaugeCallback, gauge); + }); + } +} + +void OpenTelemetryPlugin::RemoveCallback( + grpc_core::RegisteredMetricCallback* callback) { + std::vector< + absl::variant*, CallbackGaugeState*>> + gauges_that_need_to_remove_callback; + { + grpc_core::MutexLock lock(&mu_); + callback_timestamps_.erase(callback); + for (const auto& handle : callback->metrics()) { + grpc_core::Match( + handle, + [&](const grpc_core::GlobalInstrumentsRegistry:: + GlobalCallbackInt64GaugeHandle& handle) { + const auto& instrument_data = instruments_data_.at(handle.index); + if (absl::holds_alternative(instrument_data.instrument)) { + // This instrument is disabled. + return; + } + auto* callback_gauge_state = + absl::get_if>>( + &instrument_data.instrument); + GPR_ASSERT(callback_gauge_state != nullptr); + GPR_ASSERT((*callback_gauge_state)->ot_callback_registered); + GPR_ASSERT((*callback_gauge_state)->caches.erase(callback) == 1); + if ((*callback_gauge_state)->caches.empty()) { + gauges_that_need_to_remove_callback.push_back( + callback_gauge_state->get()); + (*callback_gauge_state)->ot_callback_registered = false; + } + }, + [&](const grpc_core::GlobalInstrumentsRegistry:: + GlobalCallbackDoubleGaugeHandle& handle) { + const auto& instrument_data = instruments_data_.at(handle.index); + if (absl::holds_alternative(instrument_data.instrument)) { + // This instrument is disabled. + return; + } + auto* callback_gauge_state = + absl::get_if>>( + &instrument_data.instrument); + GPR_ASSERT(callback_gauge_state != nullptr); + GPR_ASSERT((*callback_gauge_state)->ot_callback_registered); + GPR_ASSERT((*callback_gauge_state)->caches.erase(callback) == 1); + if ((*callback_gauge_state)->caches.empty()) { + gauges_that_need_to_remove_callback.push_back( + callback_gauge_state->get()); + (*callback_gauge_state)->ot_callback_registered = false; + } + }); + } + } + // RemoveCallback internally grabs OpenTelemetry's observable_registry's lock. + // So we need to call it without our plugin lock otherwise we may deadlock. + for (const auto& gauge : gauges_that_need_to_remove_callback) { + grpc_core::Match( + gauge, + [](CallbackGaugeState* gauge) { + gauge->instrument->RemoveCallback( + &CallbackGaugeState::CallbackGaugeCallback, gauge); + }, + [](CallbackGaugeState* gauge) { + gauge->instrument->RemoveCallback( + &CallbackGaugeState::CallbackGaugeCallback, gauge); + }); + } +} + +template +void OpenTelemetryPlugin::CallbackGaugeState::Observe( + opentelemetry::metrics::ObserverResult& result, const Cache& cache) { + const auto& descriptor = + grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor({id}); + for (const auto& pair : cache) { + GPR_ASSERT(pair.first.size() <= (descriptor.label_keys.size() + + descriptor.optional_label_keys.size())); + auto& instrument_data = ot_plugin->instruments_data_.at(id); + opentelemetry::nostd::get>>(result) + ->Observe(pair.second, + NPCMetricsKeyValueIterable( + descriptor.label_keys, + absl::FixedArray( + pair.first.begin(), + pair.first.begin() + descriptor.label_keys.size()), + descriptor.optional_label_keys, + absl::FixedArray( + pair.first.begin() + descriptor.label_keys.size(), + pair.first.end()), + instrument_data.optional_labels_bits)); + } +} + +// OpenTelemetry calls our callback with its observable_registry's lock held. +template +void OpenTelemetryPlugin::CallbackGaugeState::CallbackGaugeCallback( + opentelemetry::metrics::ObserverResult result, void* arg) { + auto* callback_gauge_state = static_cast*>(arg); + auto now = grpc_core::Timestamp::Now(); + grpc_core::MutexLock plugin_lock(&callback_gauge_state->ot_plugin->mu_); + for (auto& elem : callback_gauge_state->caches) { + auto* registered_metric_callback = elem.first; + auto iter = callback_gauge_state->ot_plugin->callback_timestamps_.find( + registered_metric_callback); + GPR_ASSERT(iter != + callback_gauge_state->ot_plugin->callback_timestamps_.end()); + if (now - iter->second < registered_metric_callback->min_interval()) { + // Use cached value. + callback_gauge_state->Observe(result, elem.second); + continue; + } + // Otherwise update and use the cache. + iter->second = now; + CallbackMetricReporter reporter(callback_gauge_state->ot_plugin, + registered_metric_callback); + registered_metric_callback->Run(reporter); + callback_gauge_state->Observe(result, elem.second); + } +} + grpc_core::ClientCallTracer* OpenTelemetryPlugin::GetClientCallTracer( const grpc_core::Slice& path, bool registered_method, std::shared_ptr scope_config) { @@ -553,6 +852,7 @@ grpc_core::ClientCallTracer* OpenTelemetryPlugin::GetClientCallTracer( std::static_pointer_cast( scope_config)); } + grpc_core::ServerCallTracer* OpenTelemetryPlugin::GetServerCallTracer( std::shared_ptr scope_config) { return grpc_core::GetContext() diff --git a/src/cpp/ext/otel/otel_plugin.h b/src/cpp/ext/otel/otel_plugin.h index 2a1ba0e144f..3468e167632 100644 --- a/src/cpp/ext/otel/otel_plugin.h +++ b/src/cpp/ext/otel/otel_plugin.h @@ -29,11 +29,14 @@ #include #include +#include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "absl/functional/any_invocable.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" +#include "opentelemetry/metrics/async_instruments.h" #include "opentelemetry/metrics/meter_provider.h" +#include "opentelemetry/metrics/observer_result.h" #include "opentelemetry/metrics/sync_instruments.h" #include "opentelemetry/nostd/shared_ptr.h" @@ -335,6 +338,33 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin { } call; }; + // This object should be used inline. + class CallbackMetricReporter : public grpc_core::CallbackMetricReporter { + public: + CallbackMetricReporter(OpenTelemetryPlugin* ot_plugin, + grpc_core::RegisteredMetricCallback* key) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(ot_plugin->mu_); + + void Report( + grpc_core::GlobalInstrumentsRegistry::GlobalCallbackInt64GaugeHandle + handle, + int64_t value, absl::Span label_values, + absl::Span optional_values) + ABSL_EXCLUSIVE_LOCKS_REQUIRED( + CallbackGaugeState::ot_plugin->mu_) override; + void Report( + grpc_core::GlobalInstrumentsRegistry::GlobalCallbackDoubleGaugeHandle + handle, + double value, absl::Span label_values, + absl::Span optional_values) + ABSL_EXCLUSIVE_LOCKS_REQUIRED( + CallbackGaugeState::ot_plugin->mu_) override; + + private: + OpenTelemetryPlugin* ot_plugin_; + grpc_core::RegisteredMetricCallback* key_; + }; + // StatsPlugin: std::pair> IsEnabledForChannel( @@ -365,11 +395,10 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin { grpc_core::GlobalInstrumentsRegistry::GlobalDoubleGaugeHandle /*handle*/, double /*value*/, absl::Span /*label_values*/, absl::Span /*optional_values*/) override {} - // TODO(yashkt, yijiem): implement async instrument. - void AddCallback(grpc_core::RegisteredMetricCallback* /*callback*/) override { - } - void RemoveCallback( - grpc_core::RegisteredMetricCallback* /*callback*/) override {} + void AddCallback(grpc_core::RegisteredMetricCallback* callback) + ABSL_LOCKS_EXCLUDED(mu_) override; + void RemoveCallback(grpc_core::RegisteredMetricCallback* callback) + ABSL_LOCKS_EXCLUDED(mu_) override; grpc_core::ClientCallTracer* GetClientCallTracer( const grpc_core::Slice& path, bool registered_method, std::shared_ptr scope_config) @@ -395,6 +424,34 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin { return plugin_options_; } + template + struct CallbackGaugeState { + // It's possible to set values for multiple sets of labels at the same time + // in a single callback. Key is a vector of label values and enabled + // optional label values. + using Cache = absl::flat_hash_map, ValueType>; + grpc_core::GlobalInstrumentsRegistry::InstrumentID id; + opentelemetry::nostd::shared_ptr< + opentelemetry::metrics::ObservableInstrument> + instrument; + bool ot_callback_registered ABSL_GUARDED_BY(ot_plugin->mu_); + // instrument1 ----- RegisteredMetricCallback1 + // x + // instrument2 ----- RegisteredMetricCallback2 + // One instrument can be registered by multiple callbacks. + absl::flat_hash_map caches + ABSL_GUARDED_BY(ot_plugin->mu_); + OpenTelemetryPlugin* ot_plugin; + + static void CallbackGaugeCallback( + opentelemetry::metrics::ObserverResult result, void* arg) + ABSL_LOCKS_EXCLUDED(ot_plugin->mu_); + + void Observe(opentelemetry::metrics::ObserverResult& result, + const Cache& cache) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(ot_plugin->mu_); + }; + // Instruments for per-call metrics. ClientMetrics client_; ServerMetrics server_; @@ -404,7 +461,9 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin { Disabled, std::unique_ptr>, std::unique_ptr>, std::unique_ptr>, - std::unique_ptr>>; + std::unique_ptr>, + std::unique_ptr>, + std::unique_ptr>>; static constexpr int kOptionalLabelsSizeLimit = 64; using OptionalLabelsBitSet = std::bitset; struct InstrumentData { @@ -412,6 +471,10 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin { OptionalLabelsBitSet optional_labels_bits; }; std::vector instruments_data_; + grpc_core::Mutex mu_; + absl::flat_hash_map + callback_timestamps_ ABSL_GUARDED_BY(mu_); opentelemetry::nostd::shared_ptr meter_provider_; absl::AnyInvocable diff --git a/test/cpp/ext/otel/otel_plugin_test.cc b/test/cpp/ext/otel/otel_plugin_test.cc index a39200b1757..7c244f50713 100644 --- a/test/cpp/ext/otel/otel_plugin_test.cc +++ b/test/cpp/ext/otel/otel_plugin_test.cc @@ -18,11 +18,15 @@ #include "src/cpp/ext/otel/otel_plugin.h" +#include +#include +#include #include #include "absl/functional/any_invocable.h" #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "opentelemetry/common/timestamp.h" #include "opentelemetry/metrics/provider.h" #include "opentelemetry/nostd/variant.h" #include "opentelemetry/sdk/common/attribute_utils.h" @@ -70,46 +74,129 @@ MATCHER_P4(AttributesEq, label_keys, label_values, optional_label_keys, } template -auto IntOrDoubleEq(T result) { - return ::testing::Eq(result); -} -template <> -auto IntOrDoubleEq(double result) { - return ::testing::DoubleEq(result); -} +struct Extract; + +template