Merge branch 'master' into endpoint-infusion

pull/35957/head
AJ Heller 11 months ago
commit caa1d1f761
  1. 3
      CMakeLists.txt
  2. 1
      build_autogenerated.yaml
  3. 3
      src/core/BUILD
  4. 14
      src/core/lib/channel/metrics.h
  5. 8
      src/core/lib/experiments/experiments.yaml
  6. 2
      src/cpp/ext/otel/BUILD
  7. 330
      src/cpp/ext/otel/otel_plugin.cc
  8. 75
      src/cpp/ext/otel/otel_plugin.h
  9. 9
      src/python/grpcio_tests/tests_py3_only/interop/Dockerfile.client
  10. 9
      src/python/grpcio_tests/tests_py3_only/interop/Dockerfile.server
  11. 2
      templates/CMakeLists.txt.template
  12. 3
      test/core/service_config/service_config_test.cc
  13. 3
      test/core/surface/channel_init_test.cc
  14. 479
      test/cpp/ext/otel/otel_plugin_test.cc
  15. 48
      test/cpp/ext/otel/otel_test_library.cc
  16. 26
      test/cpp/ext/otel/otel_test_library.h
  17. 1
      test/cpp/grpclb/BUILD
  18. 5
      test/cpp/grpclb/grpclb_api_test.cc
  19. 1
      tools/distrib/sanitize.sh
  20. 7
      tools/internal_ci/linux/psm-csm-python.sh
  21. 1
      tools/interop_matrix/client_matrix.py

3
CMakeLists.txt generated

@ -309,6 +309,8 @@ if(MSVC)
set(_gRPC_C_CXX_FLAGS "${_gRPC_C_CXX_FLAGS} /utf-8")
# Inconsistent object sizes can cause stack corruption and should be treated as an error
set(_gRPC_C_CXX_FLAGS "${_gRPC_C_CXX_FLAGS} /we4789")
# To decrease the size of PDB files
set(CMAKE_EXE_LINKER_FLAGS "/opt:ref /opt:icf /pdbcompress")
endif()
if (MINGW)
add_definitions(-D_WIN32_WINNT=0x600)
@ -16137,6 +16139,7 @@ target_include_directories(grpclb_api_test
target_link_libraries(grpclb_api_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
absl::check
grpc++_test_util
)

@ -10855,6 +10855,7 @@ targets:
- test/cpp/grpclb/grpclb_api_test.cc
deps:
- gtest
- absl/log:check
- grpc++_test_util
- name: grpclb_end2end_test
gtest: true

@ -1207,6 +1207,7 @@ grpc_cc_library(
public_hdrs = [
"lib/transport/handshaker_factory.h",
],
visibility = ["@grpc:alt_grpc_base_legacy"],
deps = [
"channel_args",
"iomgr_fwd",
@ -1223,6 +1224,7 @@ grpc_cc_library(
public_hdrs = [
"lib/transport/handshaker_registry.h",
],
visibility = ["@grpc:alt_grpc_base_legacy"],
deps = [
"channel_args",
"handshaker_factory",
@ -1627,6 +1629,7 @@ grpc_cc_library(
hdrs = [
"lib/iomgr/iomgr_fwd.h",
],
visibility = ["@grpc:alt_grpc_base_legacy"],
deps = ["//:gpr_platform"],
)

@ -87,9 +87,10 @@ class GlobalInstrumentsRegistry {
struct GlobalDoubleHistogramHandle : public GlobalInstrumentHandle {};
struct GlobalInt64GaugeHandle : public GlobalInstrumentHandle {};
struct GlobalDoubleGaugeHandle : public GlobalInstrumentHandle {};
struct GlobalCallbackHandle : public GlobalInstrumentHandle {};
struct GlobalCallbackInt64GaugeHandle : public GlobalCallbackHandle {};
struct GlobalCallbackDoubleGaugeHandle : public GlobalCallbackHandle {};
struct GlobalCallbackInt64GaugeHandle : public GlobalInstrumentHandle {};
struct GlobalCallbackDoubleGaugeHandle : public GlobalInstrumentHandle {};
using GlobalCallbackHandle = absl::variant<GlobalCallbackInt64GaugeHandle,
GlobalCallbackDoubleGaugeHandle>;
// Creates instrument in the GlobalInstrumentsRegistry.
static GlobalUInt64CounterHandle RegisterUInt64Counter(
@ -317,13 +318,16 @@ class GlobalStatsPluginRegistry {
// Registers a callback to be used to populate callback metrics.
// The callback will update the specified metrics. The callback
// will be invoked no more often than min_interval.
// will be invoked no more often than min_interval. Multiple callbacks may
// be registered for the same metrics, as long as no two callbacks report
// data for the same set of labels in which case the behavior is undefined.
//
// The returned object is a handle that allows the caller to control
// the lifetime of the callback; when the returned object is
// destroyed, the callback is de-registered. The returned object
// must not outlive the StatsPluginGroup object that created it.
std::unique_ptr<RegisteredMetricCallback> RegisterCallback(
GRPC_MUST_USE_RESULT std::unique_ptr<RegisteredMetricCallback>
RegisterCallback(
absl::AnyInvocable<void(CallbackMetricReporter&)> callback,
std::vector<GlobalInstrumentsRegistry::GlobalCallbackHandle> metrics,
Duration min_interval = Duration::Seconds(5));

@ -55,7 +55,7 @@
- name: canary_client_privacy
description:
If set, canary client privacy
expiry: 2024/04/01
expiry: 2024/08/01
owner: alishananda@google.com
test_tags: []
allow_in_fuzzing_config: false
@ -69,7 +69,7 @@
- name: client_privacy
description:
If set, client privacy
expiry: 2024/04/01
expiry: 2024/08/01
owner: alishananda@google.com
test_tags: []
allow_in_fuzzing_config: false
@ -95,7 +95,7 @@
uses_polling: true
- name: free_large_allocator
description: If set, return all free bytes from a "big" allocator
expiry: 2024/04/01
expiry: 2024/08/01
owner: alishananda@google.com
test_tags: [resource_quota_test]
- name: http2_stats_fix
@ -187,7 +187,7 @@
- name: server_privacy
description:
If set, server privacy
expiry: 2024/04/01
expiry: 2024/08/01
owner: alishananda@google.com
test_tags: []
allow_in_fuzzing_config: false

@ -44,6 +44,7 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/container:flat_hash_map",
"absl/container:flat_hash_set",
"absl/functional:any_invocable",
"absl/status",
@ -75,6 +76,7 @@ grpc_cc_library(
"//src/core:channel_stack_type",
"//src/core:context",
"//src/core:error",
"//src/core:match",
"//src/core:metadata_batch",
"//src/core:metrics",
"//src/core:slice",

@ -29,6 +29,7 @@
#include "opentelemetry/metrics/sync_instruments.h"
#include "opentelemetry/nostd/shared_ptr.h"
#include "opentelemetry/nostd/unique_ptr.h"
#include "opentelemetry/nostd/variant.h"
#include <grpc/support/log.h>
#include <grpcpp/ext/otel_plugin.h>
@ -38,6 +39,7 @@
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/cpp/ext/otel/key_value_iterable.h"
#include "src/cpp/ext/otel/otel_client_call_tracer.h"
@ -109,13 +111,18 @@ class OpenTelemetryPlugin::NPCMetricsKeyValueIterable
return false;
}
}
for (size_t i = 0; i < optional_label_keys_.size(); ++i) {
// Since we are saving the optional label values as std::string for callback
// gauges, we want to minimize memory usage by filtering out the disabled
// optional label values.
bool filtered = optional_label_values_.size() < optional_label_keys_.size();
for (size_t i = 0, j = 0; i < optional_label_keys_.size(); ++i) {
if (!optional_labels_bits_.test(i)) {
if (!filtered) ++j;
continue;
}
if (!callback(
AbslStrViewToOpenTelemetryStrView(optional_label_keys_[i]),
AbslStrViewToOpenTelemetryStrView(optional_label_values_[i]))) {
AbslStrViewToOpenTelemetryStrView(optional_label_values_[j++]))) {
return false;
}
}
@ -236,6 +243,91 @@ absl::Status OpenTelemetryPluginBuilderImpl::BuildAndRegisterGlobal() {
return absl::OkStatus();
}
OpenTelemetryPlugin::CallbackMetricReporter::CallbackMetricReporter(
OpenTelemetryPlugin* ot_plugin, grpc_core::RegisteredMetricCallback* key)
: ot_plugin_(ot_plugin), key_(key) {
// Since we are updating the timestamp and updating the cache for all
// registered instruments in a RegisteredMetricCallback, we will need to
// clear all the cache cells for this RegisteredMetricCallback first, so
// that if a particular combination of labels was previously present but
// is no longer present, we won't continue to report it.
for (const auto& handle : key->metrics()) {
grpc_core::Match(
handle,
[&](const grpc_core::GlobalInstrumentsRegistry::
GlobalCallbackInt64GaugeHandle& handle) {
auto& callback_gauge_state =
absl::get<std::unique_ptr<CallbackGaugeState<int64_t>>>(
ot_plugin_->instruments_data_.at(handle.index).instrument);
callback_gauge_state->caches[key].clear();
},
[&](const grpc_core::GlobalInstrumentsRegistry::
GlobalCallbackDoubleGaugeHandle& handle) {
auto& callback_gauge_state =
absl::get<std::unique_ptr<CallbackGaugeState<double>>>(
ot_plugin_->instruments_data_.at(handle.index).instrument);
callback_gauge_state->caches[key].clear();
});
}
}
void OpenTelemetryPlugin::CallbackMetricReporter::Report(
grpc_core::GlobalInstrumentsRegistry::GlobalCallbackInt64GaugeHandle handle,
int64_t value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
const auto& instrument_data = ot_plugin_->instruments_data_.at(handle.index);
auto* callback_gauge_state =
absl::get_if<std::unique_ptr<CallbackGaugeState<int64_t>>>(
&instrument_data.instrument);
GPR_ASSERT(callback_gauge_state != nullptr);
const auto& descriptor =
grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
GPR_ASSERT(descriptor.label_keys.size() == label_values.size());
GPR_ASSERT(descriptor.optional_label_keys.size() == optional_values.size());
auto& cell = (*callback_gauge_state)->caches.at(key_);
std::vector<std::string> key;
key.reserve(label_values.size() +
instrument_data.optional_labels_bits.count());
for (const absl::string_view value : label_values) {
key.emplace_back(value);
}
for (size_t i = 0; i < optional_values.size(); ++i) {
if (instrument_data.optional_labels_bits.test(i)) {
key.emplace_back(optional_values[i]);
}
}
cell.insert_or_assign(std::move(key), value);
}
void OpenTelemetryPlugin::CallbackMetricReporter::Report(
grpc_core::GlobalInstrumentsRegistry::GlobalCallbackDoubleGaugeHandle
handle,
double value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values) {
const auto& instrument_data = ot_plugin_->instruments_data_.at(handle.index);
auto* callback_gauge_state =
absl::get_if<std::unique_ptr<CallbackGaugeState<double>>>(
&instrument_data.instrument);
GPR_ASSERT(callback_gauge_state != nullptr);
const auto& descriptor =
grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor(handle);
GPR_ASSERT(descriptor.label_keys.size() == label_values.size());
GPR_ASSERT(descriptor.optional_label_keys.size() == optional_values.size());
auto& cell = (*callback_gauge_state)->caches.at(key_);
std::vector<std::string> key;
key.reserve(label_values.size() +
instrument_data.optional_labels_bits.count());
for (const absl::string_view value : label_values) {
key.emplace_back(value);
}
for (size_t i = 0; i < optional_values.size(); ++i) {
if (instrument_data.optional_labels_bits.test(i)) {
key.emplace_back(optional_values[i]);
}
}
cell.insert_or_assign(std::move(key), value);
}
OpenTelemetryPlugin::OpenTelemetryPlugin(
const absl::flat_hash_set<std::string>& metrics,
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
@ -388,26 +480,42 @@ OpenTelemetryPlugin::OpenTelemetryPlugin(
descriptor.value_type));
}
break;
// TODO(yashkt, yijiem): implement gauges.
case grpc_core::GlobalInstrumentsRegistry::InstrumentType::kGauge:
switch (descriptor.value_type) {
case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64:
break;
case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble:
break;
default:
grpc_core::Crash(
absl::StrFormat("Unknown or unsupported value type: %d",
descriptor.value_type));
}
grpc_core::Crash(
"Non-callback gauge is not supported and will be deleted in "
"the future.");
break;
case grpc_core::GlobalInstrumentsRegistry::InstrumentType::
kCallbackGauge:
switch (descriptor.value_type) {
case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64:
case grpc_core::GlobalInstrumentsRegistry::ValueType::kInt64: {
auto observable_state =
std::make_unique<CallbackGaugeState<int64_t>>();
observable_state->id = descriptor.index;
observable_state->ot_plugin = this;
observable_state->instrument =
meter->CreateInt64ObservableGauge(
std::string(descriptor.name),
std::string(descriptor.description),
std::string(descriptor.unit));
instruments_data_[descriptor.index].instrument =
std::move(observable_state);
break;
case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble:
}
case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: {
auto observable_state =
std::make_unique<CallbackGaugeState<double>>();
observable_state->id = descriptor.index;
observable_state->ot_plugin = this;
observable_state->instrument =
meter->CreateDoubleObservableGauge(
std::string(descriptor.name),
std::string(descriptor.description),
std::string(descriptor.unit));
instruments_data_[descriptor.index].instrument =
std::move(observable_state);
break;
}
default:
grpc_core::Crash(
absl::StrFormat("Unknown or unsupported value type: %d",
@ -435,6 +543,7 @@ OpenTelemetryPlugin::IsEnabledForChannel(
}
return {false, nullptr};
}
std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
OpenTelemetryPlugin::IsEnabledForServer(
const grpc_core::ChannelArgs& args) const {
@ -469,6 +578,7 @@ void OpenTelemetryPlugin::AddCounter(
descriptor.optional_label_keys, optional_values,
instrument_data.optional_labels_bits));
}
void OpenTelemetryPlugin::AddCounter(
grpc_core::GlobalInstrumentsRegistry::GlobalDoubleCounterHandle handle,
double value, absl::Span<const absl::string_view> label_values,
@ -492,6 +602,7 @@ void OpenTelemetryPlugin::AddCounter(
descriptor.optional_label_keys, optional_values,
instrument_data.optional_labels_bits));
}
void OpenTelemetryPlugin::RecordHistogram(
grpc_core::GlobalInstrumentsRegistry::GlobalUInt64HistogramHandle handle,
uint64_t value, absl::Span<const absl::string_view> label_values,
@ -517,6 +628,7 @@ void OpenTelemetryPlugin::RecordHistogram(
instrument_data.optional_labels_bits),
opentelemetry::context::Context{});
}
void OpenTelemetryPlugin::RecordHistogram(
grpc_core::GlobalInstrumentsRegistry::GlobalDoubleHistogramHandle handle,
double value, absl::Span<const absl::string_view> label_values,
@ -543,6 +655,193 @@ void OpenTelemetryPlugin::RecordHistogram(
opentelemetry::context::Context{});
}
void OpenTelemetryPlugin::AddCallback(
grpc_core::RegisteredMetricCallback* callback) {
std::vector<
absl::variant<CallbackGaugeState<int64_t>*, CallbackGaugeState<double>*>>
gauges_that_need_to_add_callback;
{
grpc_core::MutexLock lock(&mu_);
callback_timestamps_.emplace(callback, grpc_core::Timestamp::InfPast());
for (const auto& handle : callback->metrics()) {
grpc_core::Match(
handle,
[&](const grpc_core::GlobalInstrumentsRegistry::
GlobalCallbackInt64GaugeHandle& handle) {
const auto& instrument_data = instruments_data_.at(handle.index);
if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
// This instrument is disabled.
return;
}
auto* callback_gauge_state =
absl::get_if<std::unique_ptr<CallbackGaugeState<int64_t>>>(
&instrument_data.instrument);
GPR_ASSERT(callback_gauge_state != nullptr);
(*callback_gauge_state)
->caches.emplace(callback,
CallbackGaugeState<int64_t>::Cache{});
if (!std::exchange((*callback_gauge_state)->ot_callback_registered,
true)) {
gauges_that_need_to_add_callback.push_back(
callback_gauge_state->get());
}
},
[&](const grpc_core::GlobalInstrumentsRegistry::
GlobalCallbackDoubleGaugeHandle& handle) {
const auto& instrument_data = instruments_data_.at(handle.index);
if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
// This instrument is disabled.
return;
}
auto* callback_gauge_state =
absl::get_if<std::unique_ptr<CallbackGaugeState<double>>>(
&instrument_data.instrument);
GPR_ASSERT(callback_gauge_state != nullptr);
(*callback_gauge_state)
->caches.emplace(callback, CallbackGaugeState<double>::Cache{});
if (!std::exchange((*callback_gauge_state)->ot_callback_registered,
true)) {
gauges_that_need_to_add_callback.push_back(
callback_gauge_state->get());
}
});
}
}
// AddCallback internally grabs OpenTelemetry's observable_registry's lock. So
// we need to call it without our plugin lock otherwise we may deadlock.
for (const auto& gauge : gauges_that_need_to_add_callback) {
grpc_core::Match(
gauge,
[](CallbackGaugeState<int64_t>* gauge) {
gauge->instrument->AddCallback(
&CallbackGaugeState<int64_t>::CallbackGaugeCallback, gauge);
},
[](CallbackGaugeState<double>* gauge) {
gauge->instrument->AddCallback(
&CallbackGaugeState<double>::CallbackGaugeCallback, gauge);
});
}
}
void OpenTelemetryPlugin::RemoveCallback(
grpc_core::RegisteredMetricCallback* callback) {
std::vector<
absl::variant<CallbackGaugeState<int64_t>*, CallbackGaugeState<double>*>>
gauges_that_need_to_remove_callback;
{
grpc_core::MutexLock lock(&mu_);
callback_timestamps_.erase(callback);
for (const auto& handle : callback->metrics()) {
grpc_core::Match(
handle,
[&](const grpc_core::GlobalInstrumentsRegistry::
GlobalCallbackInt64GaugeHandle& handle) {
const auto& instrument_data = instruments_data_.at(handle.index);
if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
// This instrument is disabled.
return;
}
auto* callback_gauge_state =
absl::get_if<std::unique_ptr<CallbackGaugeState<int64_t>>>(
&instrument_data.instrument);
GPR_ASSERT(callback_gauge_state != nullptr);
GPR_ASSERT((*callback_gauge_state)->ot_callback_registered);
GPR_ASSERT((*callback_gauge_state)->caches.erase(callback) == 1);
if ((*callback_gauge_state)->caches.empty()) {
gauges_that_need_to_remove_callback.push_back(
callback_gauge_state->get());
(*callback_gauge_state)->ot_callback_registered = false;
}
},
[&](const grpc_core::GlobalInstrumentsRegistry::
GlobalCallbackDoubleGaugeHandle& handle) {
const auto& instrument_data = instruments_data_.at(handle.index);
if (absl::holds_alternative<Disabled>(instrument_data.instrument)) {
// This instrument is disabled.
return;
}
auto* callback_gauge_state =
absl::get_if<std::unique_ptr<CallbackGaugeState<double>>>(
&instrument_data.instrument);
GPR_ASSERT(callback_gauge_state != nullptr);
GPR_ASSERT((*callback_gauge_state)->ot_callback_registered);
GPR_ASSERT((*callback_gauge_state)->caches.erase(callback) == 1);
if ((*callback_gauge_state)->caches.empty()) {
gauges_that_need_to_remove_callback.push_back(
callback_gauge_state->get());
(*callback_gauge_state)->ot_callback_registered = false;
}
});
}
}
// RemoveCallback internally grabs OpenTelemetry's observable_registry's lock.
// So we need to call it without our plugin lock otherwise we may deadlock.
for (const auto& gauge : gauges_that_need_to_remove_callback) {
grpc_core::Match(
gauge,
[](CallbackGaugeState<int64_t>* gauge) {
gauge->instrument->RemoveCallback(
&CallbackGaugeState<int64_t>::CallbackGaugeCallback, gauge);
},
[](CallbackGaugeState<double>* gauge) {
gauge->instrument->RemoveCallback(
&CallbackGaugeState<double>::CallbackGaugeCallback, gauge);
});
}
}
template <typename ValueType>
void OpenTelemetryPlugin::CallbackGaugeState<ValueType>::Observe(
opentelemetry::metrics::ObserverResult& result, const Cache& cache) {
const auto& descriptor =
grpc_core::GlobalInstrumentsRegistry::GetInstrumentDescriptor({id});
for (const auto& pair : cache) {
GPR_ASSERT(pair.first.size() <= (descriptor.label_keys.size() +
descriptor.optional_label_keys.size()));
auto& instrument_data = ot_plugin->instruments_data_.at(id);
opentelemetry::nostd::get<opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObserverResultT<ValueType>>>(result)
->Observe(pair.second,
NPCMetricsKeyValueIterable(
descriptor.label_keys,
absl::FixedArray<absl::string_view>(
pair.first.begin(),
pair.first.begin() + descriptor.label_keys.size()),
descriptor.optional_label_keys,
absl::FixedArray<absl::string_view>(
pair.first.begin() + descriptor.label_keys.size(),
pair.first.end()),
instrument_data.optional_labels_bits));
}
}
// OpenTelemetry calls our callback with its observable_registry's lock held.
template <typename ValueType>
void OpenTelemetryPlugin::CallbackGaugeState<ValueType>::CallbackGaugeCallback(
opentelemetry::metrics::ObserverResult result, void* arg) {
auto* callback_gauge_state = static_cast<CallbackGaugeState<ValueType>*>(arg);
auto now = grpc_core::Timestamp::Now();
grpc_core::MutexLock plugin_lock(&callback_gauge_state->ot_plugin->mu_);
for (auto& elem : callback_gauge_state->caches) {
auto* registered_metric_callback = elem.first;
auto iter = callback_gauge_state->ot_plugin->callback_timestamps_.find(
registered_metric_callback);
GPR_ASSERT(iter !=
callback_gauge_state->ot_plugin->callback_timestamps_.end());
if (now - iter->second < registered_metric_callback->min_interval()) {
// Use cached value.
callback_gauge_state->Observe(result, elem.second);
continue;
}
// Otherwise update and use the cache.
iter->second = now;
CallbackMetricReporter reporter(callback_gauge_state->ot_plugin,
registered_metric_callback);
registered_metric_callback->Run(reporter);
callback_gauge_state->Observe(result, elem.second);
}
}
grpc_core::ClientCallTracer* OpenTelemetryPlugin::GetClientCallTracer(
const grpc_core::Slice& path, bool registered_method,
std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config) {
@ -553,6 +852,7 @@ grpc_core::ClientCallTracer* OpenTelemetryPlugin::GetClientCallTracer(
std::static_pointer_cast<OpenTelemetryPlugin::ClientScopeConfig>(
scope_config));
}
grpc_core::ServerCallTracer* OpenTelemetryPlugin::GetServerCallTracer(
std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config) {
return grpc_core::GetContext<grpc_core::Arena>()

@ -29,11 +29,14 @@
#include <string>
#include <utility>
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/functional/any_invocable.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "opentelemetry/metrics/async_instruments.h"
#include "opentelemetry/metrics/meter_provider.h"
#include "opentelemetry/metrics/observer_result.h"
#include "opentelemetry/metrics/sync_instruments.h"
#include "opentelemetry/nostd/shared_ptr.h"
@ -335,6 +338,33 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
} call;
};
// This object should be used inline.
class CallbackMetricReporter : public grpc_core::CallbackMetricReporter {
public:
CallbackMetricReporter(OpenTelemetryPlugin* ot_plugin,
grpc_core::RegisteredMetricCallback* key)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(ot_plugin->mu_);
void Report(
grpc_core::GlobalInstrumentsRegistry::GlobalCallbackInt64GaugeHandle
handle,
int64_t value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(
CallbackGaugeState<int64_t>::ot_plugin->mu_) override;
void Report(
grpc_core::GlobalInstrumentsRegistry::GlobalCallbackDoubleGaugeHandle
handle,
double value, absl::Span<const absl::string_view> label_values,
absl::Span<const absl::string_view> optional_values)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(
CallbackGaugeState<double>::ot_plugin->mu_) override;
private:
OpenTelemetryPlugin* ot_plugin_;
grpc_core::RegisteredMetricCallback* key_;
};
// StatsPlugin:
std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
IsEnabledForChannel(
@ -365,11 +395,10 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
grpc_core::GlobalInstrumentsRegistry::GlobalDoubleGaugeHandle /*handle*/,
double /*value*/, absl::Span<const absl::string_view> /*label_values*/,
absl::Span<const absl::string_view> /*optional_values*/) override {}
// TODO(yashkt, yijiem): implement async instrument.
void AddCallback(grpc_core::RegisteredMetricCallback* /*callback*/) override {
}
void RemoveCallback(
grpc_core::RegisteredMetricCallback* /*callback*/) override {}
void AddCallback(grpc_core::RegisteredMetricCallback* callback)
ABSL_LOCKS_EXCLUDED(mu_) override;
void RemoveCallback(grpc_core::RegisteredMetricCallback* callback)
ABSL_LOCKS_EXCLUDED(mu_) override;
grpc_core::ClientCallTracer* GetClientCallTracer(
const grpc_core::Slice& path, bool registered_method,
std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig> scope_config)
@ -395,6 +424,34 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
return plugin_options_;
}
template <typename ValueType>
struct CallbackGaugeState {
// It's possible to set values for multiple sets of labels at the same time
// in a single callback. Key is a vector of label values and enabled
// optional label values.
using Cache = absl::flat_hash_map<std::vector<std::string>, ValueType>;
grpc_core::GlobalInstrumentsRegistry::InstrumentID id;
opentelemetry::nostd::shared_ptr<
opentelemetry::metrics::ObservableInstrument>
instrument;
bool ot_callback_registered ABSL_GUARDED_BY(ot_plugin->mu_);
// instrument1 ----- RegisteredMetricCallback1
// x
// instrument2 ----- RegisteredMetricCallback2
// One instrument can be registered by multiple callbacks.
absl::flat_hash_map<grpc_core::RegisteredMetricCallback*, Cache> caches
ABSL_GUARDED_BY(ot_plugin->mu_);
OpenTelemetryPlugin* ot_plugin;
static void CallbackGaugeCallback(
opentelemetry::metrics::ObserverResult result, void* arg)
ABSL_LOCKS_EXCLUDED(ot_plugin->mu_);
void Observe(opentelemetry::metrics::ObserverResult& result,
const Cache& cache)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(ot_plugin->mu_);
};
// Instruments for per-call metrics.
ClientMetrics client_;
ServerMetrics server_;
@ -404,7 +461,9 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
Disabled, std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>,
std::unique_ptr<opentelemetry::metrics::Counter<double>>,
std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>,
std::unique_ptr<opentelemetry::metrics::Histogram<double>>>;
std::unique_ptr<opentelemetry::metrics::Histogram<double>>,
std::unique_ptr<CallbackGaugeState<int64_t>>,
std::unique_ptr<CallbackGaugeState<double>>>;
static constexpr int kOptionalLabelsSizeLimit = 64;
using OptionalLabelsBitSet = std::bitset<kOptionalLabelsSizeLimit>;
struct InstrumentData {
@ -412,6 +471,10 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
OptionalLabelsBitSet optional_labels_bits;
};
std::vector<InstrumentData> instruments_data_;
grpc_core::Mutex mu_;
absl::flat_hash_map<grpc_core::RegisteredMetricCallback*,
grpc_core::Timestamp>
callback_timestamps_ ABSL_GUARDED_BY(mu_);
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
meter_provider_;
absl::AnyInvocable<bool(const grpc_core::ChannelArgs& /*args*/) const>

@ -11,6 +11,10 @@ COPY . .
RUN tools/bazel build -c dbg //src/python/grpcio_tests/tests_py3_only/interop:xds_interop_client
RUN cp -rL /workdir/bazel-bin/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client* /artifacts/
ENV TINI_VERSION v0.19.0
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini
RUN chmod +x /tini
FROM python:3.9-slim-bookworm
ENV GRPC_VERBOSITY="DEBUG"
@ -25,4 +29,7 @@ RUN ln -s /usr/bin/python3 /usr/bin/python
COPY --from=0 /artifacts ./
ENTRYPOINT ["/xds_interop_client"]
# tini serves as PID 1 and enables the server to properly respond to signals.
COPY --from=0 /tini /tini
ENTRYPOINT ["/tini", "-g", "-vv", "--", "/xds_interop_client"]

@ -11,6 +11,10 @@ COPY . .
RUN tools/bazel build -c dbg //src/python/grpcio_tests/tests_py3_only/interop:xds_interop_server
RUN cp -rL /workdir/bazel-bin/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_server* /artifacts/
ENV TINI_VERSION v0.19.0
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini
RUN chmod +x /tini
FROM python:3.9-slim-bookworm
ENV GRPC_VERBOSITY="DEBUG"
@ -25,4 +29,7 @@ RUN ln -s /usr/bin/python3 /usr/bin/python
COPY --from=0 /artifacts ./
ENTRYPOINT ["/xds_interop_server"]
# tini serves as PID 1 and enables the server to properly respond to signals.
COPY --from=0 /tini /tini
ENTRYPOINT ["/tini", "-g", "-vv", "--", "/xds_interop_server"]

@ -424,6 +424,8 @@
set(_gRPC_C_CXX_FLAGS "<%text>${_gRPC_C_CXX_FLAGS}</%text> /utf-8")
# Inconsistent object sizes can cause stack corruption and should be treated as an error
set(_gRPC_C_CXX_FLAGS "<%text>${_gRPC_C_CXX_FLAGS}</%text> /we4789")
# To decrease the size of PDB files
set(CMAKE_EXE_LINKER_FLAGS "/opt:ref /opt:icf /pdbcompress")
endif()
if (MINGW)
add_definitions(-D_WIN32_WINNT=0x600)

@ -368,7 +368,8 @@ TEST_F(ServiceConfigTest, Parser2ErrorInvalidValue) {
<< service_config.status();
}
TEST(ServiceConfigParserTest, DoubleRegistration) {
TEST(ServiceConfigParserDeathTest, DoubleRegistration) {
GTEST_FLAG_SET(death_test_style, "threadsafe");
CoreConfiguration::Reset();
ASSERT_DEATH_IF_SUPPORTED(
CoreConfiguration::WithSubstituteBuilder builder(

@ -174,7 +174,8 @@ TEST(ChannelInitTest, CanAddBeforeAllOnce) {
std::vector<std::string>({"foo", "bar", "baz", "aaa"}));
}
TEST(ChannelInitTest, CanAddBeforeAllTwice) {
TEST(ChannelInitDeathTest, CanAddBeforeAllTwice) {
GTEST_FLAG_SET(death_test_style, "threadsafe");
ChannelInit::Builder b;
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("foo")).BeforeAll();
b.RegisterFilter(GRPC_CLIENT_CHANNEL, FilterNamed("bar")).BeforeAll();

@ -18,11 +18,15 @@
#include "src/cpp/ext/otel/otel_plugin.h"
#include <atomic>
#include <chrono>
#include <ratio>
#include <type_traits>
#include "absl/functional/any_invocable.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "opentelemetry/common/timestamp.h"
#include "opentelemetry/metrics/provider.h"
#include "opentelemetry/nostd/variant.h"
#include "opentelemetry/sdk/common/attribute_utils.h"
@ -70,46 +74,129 @@ MATCHER_P4(AttributesEq, label_keys, label_values, optional_label_keys,
}
template <typename T>
auto IntOrDoubleEq(T result) {
return ::testing::Eq(result);
}
template <>
auto IntOrDoubleEq(double result) {
return ::testing::DoubleEq(result);
}
struct Extract;
template <template <typename> class T, typename U>
struct Extract<const T<U>> {
using Type = U;
};
MATCHER_P(CounterResultEq, result, "") {
MATCHER_P(CounterResultEq, value_matcher, "") {
return ::testing::ExplainMatchResult(
::testing::VariantWith<opentelemetry::sdk::metrics::SumPointData>(
::testing::Field(&opentelemetry::sdk::metrics::SumPointData::value_,
::testing::VariantWith<
typename Extract<decltype(value_matcher)>::Type>(
value_matcher))),
arg.point_data, result_listener);
}
MATCHER_P4(HistogramResultEq, sum_matcher, min_matcher, max_matcher, count,
"") {
return ::testing::ExplainMatchResult(
::testing::VariantWith<
opentelemetry::sdk::metrics::HistogramPointData>(::testing::AllOf(
::testing::Field(
&opentelemetry::sdk::metrics::HistogramPointData::sum_,
::testing::VariantWith<
typename Extract<decltype(sum_matcher)>::Type>(sum_matcher)),
::testing::Field(
&opentelemetry::sdk::metrics::SumPointData::value_,
::testing::VariantWith<std::remove_cv_t<decltype(result)>>(
IntOrDoubleEq(result)))),
&opentelemetry::sdk::metrics::HistogramPointData::min_,
::testing::VariantWith<
typename Extract<decltype(min_matcher)>::Type>(min_matcher)),
::testing::Field(
&opentelemetry::sdk::metrics::HistogramPointData::max_,
::testing::VariantWith<
typename Extract<decltype(max_matcher)>::Type>(max_matcher)),
::testing::Field(
&opentelemetry::sdk::metrics::HistogramPointData::count_,
::testing::Eq(count)))),
arg.point_data, result_listener);
}
MATCHER_P4(HistogramResultEq, sum, min, max, count, "") {
MATCHER_P(GaugeResultIs, value_matcher, "") {
return ::testing::ExplainMatchResult(
::testing::VariantWith<opentelemetry::sdk::metrics::HistogramPointData>(
::testing::VariantWith<opentelemetry::sdk::metrics::LastValuePointData>(
::testing::AllOf(
::testing::Field(
&opentelemetry::sdk::metrics::HistogramPointData::sum_,
::testing::VariantWith<std::remove_cv_t<decltype(sum)>>(
IntOrDoubleEq(sum))),
::testing::Field(
&opentelemetry::sdk::metrics::HistogramPointData::min_,
::testing::VariantWith<std::remove_cv_t<decltype(min)>>(
IntOrDoubleEq(min))),
::testing::Field(
&opentelemetry::sdk::metrics::HistogramPointData::max_,
::testing::VariantWith<std::remove_cv_t<decltype(max)>>(
IntOrDoubleEq(max))),
::testing::Field(
&opentelemetry::sdk::metrics::HistogramPointData::count_,
::testing::Eq(count)))),
&opentelemetry::sdk::metrics::LastValuePointData::value_,
::testing::VariantWith<
typename Extract<decltype(value_matcher)>::Type>(
value_matcher)),
::testing::Field(&opentelemetry::sdk::metrics::
LastValuePointData::is_lastvalue_valid_,
::testing::IsTrue()))),
arg.point_data, result_listener);
}
// This check might subject to system clock adjustment.
MATCHER_P(GaugeResultLaterThan, prev_timestamp, "") {
return ::testing::ExplainMatchResult(
::testing::VariantWith<opentelemetry::sdk::metrics::LastValuePointData>(
::testing::Field(
&opentelemetry::sdk::metrics::LastValuePointData::sample_ts_,
::testing::Property(
&opentelemetry::common::SystemTimestamp::time_since_epoch,
::testing::Gt(prev_timestamp.time_since_epoch())))),
arg.point_data, result_listener);
}
MATCHER_P7(GaugeDataIsIncrementalForSpecificMetricAndLabelSet, metric_name,
label_key, label_value, optional_label_key, optional_label_value,
default_value, greater_than, "") {
std::unordered_map<std::string,
opentelemetry::sdk::common::OwnedAttributeValue>
label_map;
PopulateLabelMap(label_key, label_value, &label_map);
PopulateLabelMap(optional_label_key, optional_label_value, &label_map);
opentelemetry::common::SystemTimestamp prev_timestamp;
auto prev_value = default_value;
size_t prev_index = 0;
auto& data = arg.at(metric_name);
bool result = true;
for (size_t i = 1; i < data.size(); ++i) {
if (::testing::Matches(::testing::UnorderedElementsAreArray(
data[i - 1].attributes.GetAttributes()))(label_map)) {
// Update the previous value for the same associated label values.
prev_value = opentelemetry::nostd::get<decltype(prev_value)>(
opentelemetry::nostd::get<
opentelemetry::sdk::metrics::LastValuePointData>(
data[i - 1].point_data)
.value_);
prev_index = i - 1;
prev_timestamp = opentelemetry::nostd::get<
opentelemetry::sdk::metrics::LastValuePointData>(
data[i - 1].point_data)
.sample_ts_;
}
if (!::testing::Matches(::testing::UnorderedElementsAreArray(
data[i].attributes.GetAttributes()))(label_map)) {
// Skip values that do not have the same associated label values.
continue;
}
*result_listener << " Comparing data[" << i << "] with data[" << prev_index
<< "] ";
if (greater_than) {
result &= ::testing::ExplainMatchResult(
::testing::AllOf(
AttributesEq(label_key, label_value, optional_label_key,
optional_label_value),
GaugeResultIs(::testing::Gt(prev_value)),
GaugeResultLaterThan(prev_timestamp)),
data[i], result_listener);
} else {
result &= ::testing::ExplainMatchResult(
::testing::AllOf(
AttributesEq(label_key, label_value, optional_label_key,
optional_label_value),
GaugeResultIs(::testing::Ge(prev_value)),
GaugeResultLaterThan(prev_timestamp)),
data[i], result_listener);
}
}
return result;
}
TEST(OpenTelemetryPluginBuildTest, ApiDependency) {
opentelemetry::metrics::Provider::GetMeterProvider();
}
@ -1049,12 +1136,13 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest, RecordUInt64Counter) {
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); });
EXPECT_THAT(data, ::testing::ElementsAre(::testing::Pair(
kMetricName, ::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues,
kOptionalLabelKeys,
kOptionalLabelValues),
CounterResultEq(kCounterResult))))));
EXPECT_THAT(data,
::testing::ElementsAre(::testing::Pair(
kMetricName,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
CounterResultEq(::testing::Eq(kCounterResult)))))));
}
TEST_F(OpenTelemetryPluginNPCMetricsTest, RecordDoubleCounter) {
@ -1094,12 +1182,13 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest, RecordDoubleCounter) {
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); });
EXPECT_THAT(data, ::testing::ElementsAre(::testing::Pair(
kMetricName, ::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues,
kOptionalLabelKeys,
kOptionalLabelValues),
CounterResultEq(kCounterResult))))));
EXPECT_THAT(data,
::testing::ElementsAre(::testing::Pair(
kMetricName,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
CounterResultEq(::testing::DoubleEq(kCounterResult)))))));
}
TEST_F(OpenTelemetryPluginNPCMetricsTest, RecordUInt64Histogram) {
@ -1142,13 +1231,14 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest, RecordUInt64Histogram) {
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); });
EXPECT_THAT(data,
::testing::ElementsAre(::testing::Pair(
kMetricName,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
HistogramResultEq(kSum, kMin, kMax, kCount))))));
EXPECT_THAT(
data, ::testing::ElementsAre(::testing::Pair(
kMetricName,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
HistogramResultEq(::testing::Eq(kSum), ::testing::Eq(kMin),
::testing::Eq(kMax), kCount))))));
}
TEST_F(OpenTelemetryPluginNPCMetricsTest, RecordDoubleHistogram) {
@ -1198,7 +1288,9 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest, RecordDoubleHistogram) {
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
HistogramResultEq(kSum, kMin, kMax, kCount))))));
HistogramResultEq(::testing::DoubleEq(kSum),
::testing::DoubleEq(kMin),
::testing::DoubleEq(kMax), kCount))))));
}
TEST_F(OpenTelemetryPluginNPCMetricsTest,
@ -1254,7 +1346,9 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
HistogramResultEq(kSum, kMin, kMax, kCount))))));
HistogramResultEq(::testing::DoubleEq(kSum),
::testing::DoubleEq(kMin),
::testing::DoubleEq(kMax), kCount))))));
}
// Now build and register another OpenTelemetryPlugin using the test fixture
// and record histogram.
@ -1290,7 +1384,9 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
HistogramResultEq(kSum, kMin, kMax, kCount))))));
HistogramResultEq(::testing::DoubleEq(kSum),
::testing::DoubleEq(kMin),
::testing::DoubleEq(kMax), kCount))))));
// Verify that the first plugin gets the data as well.
data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
@ -1304,7 +1400,9 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kOptionalLabelKeys,
kOptionalLabelValues),
HistogramResultEq(kSum, kMin, kMax, kCount))))));
HistogramResultEq(::testing::DoubleEq(kSum),
::testing::DoubleEq(kMin),
::testing::DoubleEq(kMax), kCount))))));
}
TEST_F(OpenTelemetryPluginNPCMetricsTest,
@ -1322,15 +1420,16 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest,
constexpr std::array<absl::string_view, 4> kOptionalLabelKeys = {
"optional_label_key_1", "optional_label_key_2", "optional_label_key_3",
"optional_label_key_4"};
constexpr std::array<absl::string_view, 2> kActualOptionalLabelKeys = {
"optional_label_key_1", "optional_label_key_2"};
constexpr std::array<absl::string_view, 3> kActualOptionalLabelKeys = {
"optional_label_key_1", "optional_label_key_2", "optional_label_key_4"};
constexpr std::array<absl::string_view, 2> kLabelValues = {"label_value_1",
"label_value_2"};
constexpr std::array<absl::string_view, 4> kOptionalLabelValues = {
"optional_label_value_1", "optional_label_value_2",
"optional_label_value_3", "optional_label_value_4"};
constexpr std::array<absl::string_view, 2> kActualOptionalLabelValues = {
"optional_label_value_1", "optional_label_value_2"};
constexpr std::array<absl::string_view, 3> kActualOptionalLabelValues = {
"optional_label_value_1", "optional_label_value_2",
"optional_label_value_4"};
auto handle = grpc_core::GlobalInstrumentsRegistry::RegisterDoubleHistogram(
kMetricName, "A simple double histogram.", "unit", kLabelKeys,
kOptionalLabelKeys, /*enable_by_default=*/true);
@ -1342,7 +1441,8 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest,
GRPC_ARG_SERVER_SELECTOR_VALUE;
})
.add_optional_label(kOptionalLabelKeys[0])
.add_optional_label(kOptionalLabelKeys[1])));
.add_optional_label(kOptionalLabelKeys[1])
.add_optional_label(kOptionalLabelKeys[3])));
grpc_core::ChannelArgs args;
args = args.Set(GRPC_ARG_SERVER_SELECTOR_KEY, GRPC_ARG_SERVER_SELECTOR_VALUE);
auto stats_plugins =
@ -1363,7 +1463,274 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest,
::testing::ElementsAre(::testing::AllOf(
AttributesEq(kLabelKeys, kLabelValues, kActualOptionalLabelKeys,
kActualOptionalLabelValues),
HistogramResultEq(kSum, kMin, kMax, kCount))))));
HistogramResultEq(::testing::DoubleEq(kSum),
::testing::DoubleEq(kMin),
::testing::DoubleEq(kMax), kCount))))));
}
using OpenTelemetryPluginCallbackMetricsTest = OpenTelemetryPluginEnd2EndTest;
// The callback minimal interval is longer than the OT reporting interval, so we
// expect to collect duplicated (cached) values.
TEST_F(OpenTelemetryPluginCallbackMetricsTest,
ReportDurationLongerThanCollectDuration) {
constexpr absl::string_view kInt64CallbackGaugeMetric =
"int64_callback_gauge";
constexpr absl::string_view kDoubleCallbackGaugeMetric =
"double_callback_gauge";
constexpr std::array<absl::string_view, 2> kLabelKeys = {"label_key_1",
"label_key_2"};
constexpr std::array<absl::string_view, 2> kOptionalLabelKeys = {
"optional_label_key_1", "optional_label_key_2"};
constexpr std::array<absl::string_view, 2> kLabelValuesSet1 = {
"label_value_set_1", "label_value_set_1"};
constexpr std::array<absl::string_view, 2> kOptionalLabelValuesSet1 = {
"optional_label_value_set_1", "optional_label_value_set_1"};
constexpr std::array<absl::string_view, 2> kLabelValuesSet2 = {
"label_value_set_2", "label_value_set_2"};
constexpr std::array<absl::string_view, 2> kOptionalLabelValuesSet2 = {
"optional_label_value_set_2", "optional_label_value_set_2"};
auto integer_gauge_handle =
grpc_core::GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
kInt64CallbackGaugeMetric, "An int64 callback gauge.", "unit",
kLabelKeys, kOptionalLabelKeys,
/*enable_by_default=*/true);
auto double_gauge_handle =
grpc_core::GlobalInstrumentsRegistry::RegisterCallbackDoubleGauge(
kDoubleCallbackGaugeMetric, "A double callback gauge.", "unit",
kLabelKeys, kOptionalLabelKeys,
/*enable_by_default=*/true);
Init(std::move(Options()
.set_metric_names({kInt64CallbackGaugeMetric,
kDoubleCallbackGaugeMetric})
.add_optional_label(kOptionalLabelKeys[0])
.add_optional_label(kOptionalLabelKeys[1])));
auto stats_plugins =
grpc_core::GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
grpc_core::experimental::StatsPluginChannelScope(
"dns:///localhost:8080", ""));
// Multiple callbacks for the same metrics, each reporting different label
// values.
int report_count_1 = 0;
int64_t int_value_1 = 1;
double double_value_1 = 0.5;
auto registered_metric_callback_1 = stats_plugins.RegisterCallback(
[&](grpc_core::CallbackMetricReporter& reporter) {
++report_count_1;
reporter.Report(integer_gauge_handle, int_value_1, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(integer_gauge_handle, int_value_1++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
reporter.Report(double_gauge_handle, double_value_1, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(double_gauge_handle, double_value_1++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
},
{integer_gauge_handle, double_gauge_handle},
grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor());
int report_count_2 = 0;
int64_t int_value_2 = 1;
double double_value_2 = 0.5;
auto registered_metric_callback_2 = stats_plugins.RegisterCallback(
[&](grpc_core::CallbackMetricReporter& reporter) {
++report_count_2;
reporter.Report(integer_gauge_handle, int_value_2, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(integer_gauge_handle, int_value_2++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
reporter.Report(double_gauge_handle, double_value_2, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(double_gauge_handle, double_value_2++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
},
{integer_gauge_handle, double_gauge_handle},
grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor());
constexpr int kIterations = 100;
MetricsCollectorThread collector{
this, grpc_core::Duration::Milliseconds(10) * grpc_test_slowdown_factor(),
kIterations,
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) {
return !data.contains(kInt64CallbackGaugeMetric) ||
!data.contains(kDoubleCallbackGaugeMetric);
}};
absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
data = collector.Stop();
// Verify that data is incremental with duplications (cached values).
EXPECT_LT(report_count_1, kIterations);
EXPECT_LT(report_count_2, kIterations);
EXPECT_EQ(data[kInt64CallbackGaugeMetric].size(),
data[kDoubleCallbackGaugeMetric].size());
// Verify labels.
ASSERT_THAT(
data,
::testing::UnorderedElementsAre(
::testing::Pair(
kInt64CallbackGaugeMetric,
::testing::Each(::testing::AnyOf(
AttributesEq(kLabelKeys, kLabelValuesSet1, kOptionalLabelKeys,
kOptionalLabelValuesSet1),
AttributesEq(kLabelKeys, kLabelValuesSet2, kOptionalLabelKeys,
kOptionalLabelValuesSet2)))),
::testing::Pair(
kDoubleCallbackGaugeMetric,
::testing::Each(::testing::AnyOf(
AttributesEq(kLabelKeys, kLabelValuesSet1, kOptionalLabelKeys,
kOptionalLabelValuesSet1),
AttributesEq(kLabelKeys, kLabelValuesSet2, kOptionalLabelKeys,
kOptionalLabelValuesSet2))))));
EXPECT_THAT(data, GaugeDataIsIncrementalForSpecificMetricAndLabelSet(
kInt64CallbackGaugeMetric, kLabelKeys, kLabelValuesSet1,
kOptionalLabelKeys, kOptionalLabelValuesSet1,
int64_t(0), false));
EXPECT_THAT(data, GaugeDataIsIncrementalForSpecificMetricAndLabelSet(
kInt64CallbackGaugeMetric, kLabelKeys, kLabelValuesSet2,
kOptionalLabelKeys, kOptionalLabelValuesSet2,
int64_t(0), false));
EXPECT_THAT(data,
GaugeDataIsIncrementalForSpecificMetricAndLabelSet(
kDoubleCallbackGaugeMetric, kLabelKeys, kLabelValuesSet1,
kOptionalLabelKeys, kOptionalLabelValuesSet1, 0.0, false));
EXPECT_THAT(data,
GaugeDataIsIncrementalForSpecificMetricAndLabelSet(
kDoubleCallbackGaugeMetric, kLabelKeys, kLabelValuesSet2,
kOptionalLabelKeys, kOptionalLabelValuesSet2, 0.0, false));
}
// The callback minimal interval is shorter than the OT reporting interval, so
// for each collect we should go update the cache and report the latest values.
TEST_F(OpenTelemetryPluginCallbackMetricsTest,
ReportDurationShorterThanCollectDuration) {
constexpr absl::string_view kInt64CallbackGaugeMetric =
"yet_another_int64_callback_gauge";
constexpr absl::string_view kDoubleCallbackGaugeMetric =
"yet_another_double_callback_gauge";
constexpr std::array<absl::string_view, 2> kLabelKeys = {"label_key_1",
"label_key_2"};
constexpr std::array<absl::string_view, 2> kOptionalLabelKeys = {
"optional_label_key_1", "optional_label_key_2"};
constexpr std::array<absl::string_view, 2> kLabelValuesSet1 = {
"label_value_set_1", "label_value_set_1"};
constexpr std::array<absl::string_view, 2> kOptionalLabelValuesSet1 = {
"optional_label_value_set_1", "optional_label_value_set_1"};
constexpr std::array<absl::string_view, 2> kLabelValuesSet2 = {
"label_value_set_2", "label_value_set_2"};
constexpr std::array<absl::string_view, 2> kOptionalLabelValuesSet2 = {
"optional_label_value_set_2", "optional_label_value_set_2"};
auto integer_gauge_handle =
grpc_core::GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
kInt64CallbackGaugeMetric, "An int64 callback gauge.", "unit",
kLabelKeys, kOptionalLabelKeys,
/*enable_by_default=*/true);
auto double_gauge_handle =
grpc_core::GlobalInstrumentsRegistry::RegisterCallbackDoubleGauge(
kDoubleCallbackGaugeMetric, "A double callback gauge.", "unit",
kLabelKeys, kOptionalLabelKeys,
/*enable_by_default=*/true);
Init(std::move(Options()
.set_metric_names({kInt64CallbackGaugeMetric,
kDoubleCallbackGaugeMetric})
.add_optional_label(kOptionalLabelKeys[0])
.add_optional_label(kOptionalLabelKeys[1])));
auto stats_plugins =
grpc_core::GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
grpc_core::experimental::StatsPluginChannelScope(
"dns:///localhost:8080", ""));
// Multiple callbacks for the same metrics, each reporting different label
// values.
int report_count_1 = 0;
int64_t int_value_1 = 1;
double double_value_1 = 0.5;
auto registered_metric_callback_1 = stats_plugins.RegisterCallback(
[&](grpc_core::CallbackMetricReporter& reporter) {
++report_count_1;
reporter.Report(integer_gauge_handle, int_value_1, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(integer_gauge_handle, int_value_1++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
reporter.Report(double_gauge_handle, double_value_1, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(double_gauge_handle, double_value_1++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
},
{integer_gauge_handle, double_gauge_handle},
grpc_core::Duration::Milliseconds(10) * grpc_test_slowdown_factor());
int report_count_2 = 0;
int64_t int_value_2 = 1;
double double_value_2 = 0.5;
auto registered_metric_callback_2 = stats_plugins.RegisterCallback(
[&](grpc_core::CallbackMetricReporter& reporter) {
++report_count_2;
reporter.Report(integer_gauge_handle, int_value_2, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(integer_gauge_handle, int_value_2++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
reporter.Report(double_gauge_handle, double_value_2, kLabelValuesSet1,
kOptionalLabelValuesSet1);
reporter.Report(double_gauge_handle, double_value_2++, kLabelValuesSet2,
kOptionalLabelValuesSet2);
},
{integer_gauge_handle, double_gauge_handle},
grpc_core::Duration::Milliseconds(10) * grpc_test_slowdown_factor());
constexpr int kIterations = 100;
MetricsCollectorThread collector{
this,
grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(),
kIterations,
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) {
return !data.contains(kInt64CallbackGaugeMetric) ||
!data.contains(kDoubleCallbackGaugeMetric);
}};
absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>
data = collector.Stop();
// Verify that data is incremental without duplications (cached values).
EXPECT_EQ(report_count_1, kIterations);
EXPECT_EQ(report_count_2, kIterations);
EXPECT_EQ(data[kInt64CallbackGaugeMetric].size(),
data[kDoubleCallbackGaugeMetric].size());
// Verify labels.
ASSERT_THAT(
data,
::testing::UnorderedElementsAre(
::testing::Pair(
kInt64CallbackGaugeMetric,
::testing::Each(::testing::AnyOf(
AttributesEq(kLabelKeys, kLabelValuesSet1, kOptionalLabelKeys,
kOptionalLabelValuesSet1),
AttributesEq(kLabelKeys, kLabelValuesSet2, kOptionalLabelKeys,
kOptionalLabelValuesSet2)))),
::testing::Pair(
kDoubleCallbackGaugeMetric,
::testing::Each(::testing::AnyOf(
AttributesEq(kLabelKeys, kLabelValuesSet1, kOptionalLabelKeys,
kOptionalLabelValuesSet1),
AttributesEq(kLabelKeys, kLabelValuesSet2, kOptionalLabelKeys,
kOptionalLabelValuesSet2))))));
EXPECT_THAT(data, GaugeDataIsIncrementalForSpecificMetricAndLabelSet(
kInt64CallbackGaugeMetric, kLabelKeys, kLabelValuesSet1,
kOptionalLabelKeys, kOptionalLabelValuesSet1,
int64_t(0), true));
EXPECT_THAT(data, GaugeDataIsIncrementalForSpecificMetricAndLabelSet(
kInt64CallbackGaugeMetric, kLabelKeys, kLabelValuesSet2,
kOptionalLabelKeys, kOptionalLabelValuesSet2,
int64_t(0), true));
EXPECT_THAT(data,
GaugeDataIsIncrementalForSpecificMetricAndLabelSet(
kDoubleCallbackGaugeMetric, kLabelKeys, kLabelValuesSet1,
kOptionalLabelKeys, kOptionalLabelValuesSet1, 0.0, true));
EXPECT_THAT(data,
GaugeDataIsIncrementalForSpecificMetricAndLabelSet(
kDoubleCallbackGaugeMetric, kLabelKeys, kLabelValuesSet2,
kOptionalLabelKeys, kOptionalLabelValuesSet2, 0.0, true));
}
TEST(OpenTelemetryPluginMetricsEnablingDisablingTest, TestEnableDisableAPIs) {

@ -18,6 +18,8 @@
#include "test/cpp/ext/otel/otel_test_library.h"
#include <atomic>
#include "absl/functional/any_invocable.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
@ -83,6 +85,52 @@ const grpc_channel_filter AddServiceLabelsFilter::kFilter =
grpc_core::FilterEndpoint::kClient>(
"add_service_labels_filter");
OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::MetricsCollectorThread(
OpenTelemetryPluginEnd2EndTest* test, grpc_core::Duration interval,
int iterations,
std::function<
bool(const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&)>
predicate)
: test_(test),
interval_(interval),
iterations_(iterations),
predicate_(std::move(predicate)),
thread_(&MetricsCollectorThread::Run, this) {}
OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::
~MetricsCollectorThread() {
if (!finished_) {
thread_.join();
}
}
void OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::Run() {
int i = 0;
while (i++ < iterations_ || (iterations_ == -1 && !finished_)) {
auto data_points = test_->ReadCurrentMetricsData(predicate_);
for (auto data : data_points) {
auto iter = data_points_.find(data.first);
if (iter == data_points_.end()) {
data_points_[data.first] = std::move(data.second);
} else {
for (auto point : data.second) {
iter->second.push_back(std::move(point));
}
}
}
absl::SleepFor(absl::Milliseconds(interval_.millis()));
}
}
const OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::ResultType&
OpenTelemetryPluginEnd2EndTest::MetricsCollectorThread::Stop() {
finished_ = true;
thread_.join();
return data_points_;
}
void OpenTelemetryPluginEnd2EndTest::Init(Options config) {
grpc_core::CoreConfiguration::Reset();
ChannelArguments channel_args;

@ -21,6 +21,9 @@
#include <grpc/support/port_platform.h>
#include <atomic>
#include <thread>
#include "absl/functional/any_invocable.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
@ -146,6 +149,29 @@ class OpenTelemetryPluginEnd2EndTest : public ::testing::Test {
absl::flat_hash_set<absl::string_view> optional_label_keys;
};
class MetricsCollectorThread {
public:
using ResultType = absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>;
MetricsCollectorThread(OpenTelemetryPluginEnd2EndTest* test,
grpc_core::Duration interval, int iterations,
std::function<bool(const ResultType&)> predicate);
~MetricsCollectorThread();
const ResultType& Stop();
private:
void Run();
OpenTelemetryPluginEnd2EndTest* test_;
grpc_core::Duration interval_;
int iterations_;
std::function<bool(const ResultType&)> predicate_;
ResultType data_points_;
std::atomic_bool finished_{false};
std::thread thread_;
};
// Note that we can't use SetUp() here since we want to send in parameters.
void Init(Options config);

@ -25,6 +25,7 @@ grpc_cc_test(
name = "grpclb_api_test",
srcs = ["grpclb_api_test.cc"],
external_deps = [
"absl/log:check",
"gtest",
],
deps = [

@ -18,6 +18,7 @@
#include <gtest/gtest.h>
#include "absl/log/check.h"
#include "google/protobuf/duration.upb.h"
#include "upb/mem/arena.hpp"
@ -45,7 +46,7 @@ class GrpclbTest : public ::testing::Test {
std::string Ip4ToPackedString(const char* ip_str) {
struct in_addr ip4;
GPR_ASSERT(inet_pton(AF_INET, ip_str, &ip4) == 1);
CHECK(inet_pton(AF_INET, ip_str, &ip4) == 1);
return std::string(reinterpret_cast<const char*>(&ip4), sizeof(ip4));
}
@ -59,7 +60,7 @@ std::string PackedStringToIp(const grpc_core::GrpcLbServer& server) {
} else {
abort();
}
GPR_ASSERT(inet_ntop(af, (void*)server.ip_addr, ip_str, 46) != nullptr);
CHECK(inet_ntop(af, (void*)server.ip_addr, ip_str, 46) != nullptr);
return ip_str;
}

@ -23,7 +23,6 @@ tools/distrib/check_naked_includes.py --fix || true
tools/distrib/check_copyright.py --fix
tools/distrib/add-iwyu.py
tools/distrib/check_trailing_newlines.sh --fix
tools/run_tests/sanity/check_port_platform.py --fix
tools/run_tests/sanity/check_include_style.py --fix || true
tools/distrib/check_namespace_qualification.py --fix
tools/distrib/black_code.sh

@ -19,8 +19,7 @@ set -eo pipefail
readonly GITHUB_REPOSITORY_NAME="grpc"
readonly TEST_DRIVER_INSTALL_SCRIPT_URL="https://raw.githubusercontent.com/${TEST_DRIVER_REPO_OWNER:-grpc}/psm-interop/${TEST_DRIVER_BRANCH:-main}/.kokoro/psm_interop_kokoro_lib.sh"
## xDS test server/client Docker images
## We're only testing Python client for now
readonly SERVER_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/cpp-server"
readonly SERVER_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/python-server"
readonly CLIENT_IMAGE_NAME="gcr.io/grpc-testing/xds-interop/python-client"
readonly FORCE_IMAGE_BUILD="${FORCE_IMAGE_BUILD:-0}"
readonly BUILD_APP_PATH="interop-testing/build/install/grpc-interop-testing"
@ -48,7 +47,7 @@ build_test_app_docker_images() {
.
docker build \
-f tools/dockerfile/interoptest/grpc_interop_cxx_xds/Dockerfile.xds_server \
-f src/python/grpcio_tests/tests_py3_only/interop/Dockerfile.server \
-t "${SERVER_IMAGE_NAME}:${GIT_COMMIT}" \
.
@ -176,7 +175,9 @@ main() {
local failed_tests=0
test_suites=(
"gamma.gamma_baseline_test"
"gamma.affinity_session_drain_test"
"gamma.affinity_test"
"app_net_ssa_test"
)
for test in "${test_suites[@]}"; do
run_test $test || (( ++failed_tests ))

@ -440,6 +440,7 @@ LANG_RELEASE_MATRIX = {
("v1.59.1", ReleaseInfo()),
("v1.60.1", ReleaseInfo()),
("v1.61.0", ReleaseInfo()),
("v1.63.0", ReleaseInfo()),
]
),
"python": OrderedDict(

Loading…
Cancel
Save