[OTel C++] Fix race when adding and removing callbacks (#37485)

Split off from https://github.com/grpc/grpc/pull/37425

We are adding and removing callbacks on the OpenTelemetry Async Instruments without synchronization. This opens us to races where we have an AddCallback and RemoveCallback operation happening at the same time. The correct result after these operations is to still have a callback registered with OpenTelemetry at the end, but the two operations could race and we could just decide to remove the OpenTelemetry callback.

The fix delays removing OpenTelemetry callbacks to plugin destruction time.

Closes #37485

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37485 from yashykt:FixRaceOTelGauge 016b0a41b5
PiperOrigin-RevId: 663492598
pull/37507/head
Yash Tibrewal 6 months ago committed by Copybara-Service
parent 8b4a8ebdf9
commit 605a15e7eb
  1. 67
      src/cpp/ext/otel/otel_plugin.cc
  2. 1
      src/cpp/ext/otel/otel_plugin.h
  3. 120
      test/cpp/ext/otel/otel_plugin_test.cc

@ -563,6 +563,38 @@ OpenTelemetryPluginImpl::OpenTelemetryPluginImpl(
});
}
OpenTelemetryPluginImpl::~OpenTelemetryPluginImpl() {
for (const auto& instrument_data : instruments_data_) {
grpc_core::Match(
instrument_data.instrument, [](const Disabled&) {},
[](const std::unique_ptr<opentelemetry::metrics::Counter<double>>&) {},
[](const std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>>&) {
},
[](const std::unique_ptr<
opentelemetry::metrics::Histogram<uint64_t>>&) {},
[](const std::unique_ptr<opentelemetry::metrics::Histogram<double>>&) {
},
[](const std::unique_ptr<CallbackGaugeState<int64_t>>& state) {
CHECK(state->caches.empty());
if (state->ot_callback_registered) {
state->instrument->RemoveCallback(
&CallbackGaugeState<int64_t>::CallbackGaugeCallback,
state.get());
state->ot_callback_registered = false;
}
},
[](const std::unique_ptr<CallbackGaugeState<double>>& state) {
CHECK(state->caches.empty());
if (state->ot_callback_registered) {
state->instrument->RemoveCallback(
&CallbackGaugeState<double>::CallbackGaugeCallback,
state.get());
state->ot_callback_registered = false;
}
});
}
}
namespace {
constexpr absl::string_view kLocality = "grpc.lb.locality";
}
@ -823,9 +855,6 @@ void OpenTelemetryPluginImpl::AddCallback(
void OpenTelemetryPluginImpl::RemoveCallback(
grpc_core::RegisteredMetricCallback* callback) {
std::vector<
absl::variant<CallbackGaugeState<int64_t>*, CallbackGaugeState<double>*>>
gauges_that_need_to_remove_callback;
{
grpc_core::MutexLock lock(&mu_);
callback_timestamps_.erase(callback);
@ -848,11 +877,6 @@ void OpenTelemetryPluginImpl::RemoveCallback(
CHECK_NE(callback_gauge_state, nullptr);
CHECK((*callback_gauge_state)->ot_callback_registered);
CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u);
if ((*callback_gauge_state)->caches.empty()) {
gauges_that_need_to_remove_callback.push_back(
callback_gauge_state->get());
(*callback_gauge_state)->ot_callback_registered = false;
}
break;
}
case grpc_core::GlobalInstrumentsRegistry::ValueType::kDouble: {
@ -867,11 +891,6 @@ void OpenTelemetryPluginImpl::RemoveCallback(
CHECK_NE(callback_gauge_state, nullptr);
CHECK((*callback_gauge_state)->ot_callback_registered);
CHECK_EQ((*callback_gauge_state)->caches.erase(callback), 1u);
if ((*callback_gauge_state)->caches.empty()) {
gauges_that_need_to_remove_callback.push_back(
callback_gauge_state->get());
(*callback_gauge_state)->ot_callback_registered = false;
}
break;
}
default:
@ -880,21 +899,13 @@ void OpenTelemetryPluginImpl::RemoveCallback(
}
}
}
// RemoveCallback internally grabs OpenTelemetry's observable_registry's
// lock. So we need to call it without our plugin lock otherwise we may
// deadlock.
for (const auto& gauge : gauges_that_need_to_remove_callback) {
grpc_core::Match(
gauge,
[](CallbackGaugeState<int64_t>* gauge) {
gauge->instrument->RemoveCallback(
&CallbackGaugeState<int64_t>::CallbackGaugeCallback, gauge);
},
[](CallbackGaugeState<double>* gauge) {
gauge->instrument->RemoveCallback(
&CallbackGaugeState<double>::CallbackGaugeCallback, gauge);
});
}
// Note that we are not removing the callback from OpenTelemetry immediately,
// and instead remove it when the plugin is destroyed. We just have a single
// callback per OpenTelemetry instrument which is a small number. If we decide
// to remove the callback immediately at this point, we need to make sure that
// 1) the callback is removed without holding mu_ and 2) we make sure that
// this does not race against a possible `AddCallback` operation. A potential
// way to do this is to use WorkSerializer.
}
template <typename ValueType>

@ -223,6 +223,7 @@ class OpenTelemetryPluginImpl
absl::AnyInvocable<
bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
channel_scope_filter);
~OpenTelemetryPluginImpl() override;
private:
class ClientCallTracer;

@ -1715,14 +1715,8 @@ TEST_F(OpenTelemetryPluginNPCMetricsTest, InstrumentsEnabledTest) {
EXPECT_FALSE(stats_plugins.IsInstrumentEnabled(counter_handle));
}
class OpenTelemetryPluginCallbackMetricsTest
: public OpenTelemetryPluginEnd2EndTest {
protected:
OpenTelemetryPluginCallbackMetricsTest()
: endpoint_config_(grpc_core::ChannelArgs()) {}
grpc_event_engine::experimental::ChannelArgsEndpointConfig endpoint_config_;
};
using OpenTelemetryPluginCallbackMetricsTest =
OpenTelemetryPluginNPCMetricsTest;
// The callback minimal interval is longer than the OT reporting interval, so we
// expect to collect duplicated (cached) values.
@ -1981,6 +1975,116 @@ TEST_F(OpenTelemetryPluginCallbackMetricsTest,
kOptionalLabelKeys, kOptionalLabelValuesSet2, 0.0, true));
}
// Verifies that callbacks are cleaned up when the OpenTelemetry plugin is
// destroyed.
TEST_F(OpenTelemetryPluginCallbackMetricsTest, VerifyCallbacksAreCleanedUp) {
constexpr absl::string_view kInt64CallbackGaugeMetric =
"yet_another_int64_callback_gauge";
constexpr absl::string_view kDoubleCallbackGaugeMetric =
"yet_another_double_callback_gauge";
auto integer_gauge_handle =
grpc_core::GlobalInstrumentsRegistry::RegisterCallbackInt64Gauge(
kInt64CallbackGaugeMetric, "An int64 callback gauge.", "unit",
/*enable_by_default=*/true)
.Build();
auto double_gauge_handle =
grpc_core::GlobalInstrumentsRegistry::RegisterCallbackDoubleGauge(
kDoubleCallbackGaugeMetric, "A double callback gauge.", "unit",
/*enable_by_default=*/true)
.Build();
Init(std::move(Options().set_metric_names(
{kInt64CallbackGaugeMetric, kDoubleCallbackGaugeMetric})));
auto stats_plugins =
grpc_core::GlobalStatsPluginRegistry::GetStatsPluginsForChannel(
grpc_core::experimental::StatsPluginChannelScope(
"dns:///localhost:8080", "", endpoint_config_));
// Multiple callbacks for the same metrics, each reporting different
// label values.
int report_count_1 = 0;
int64_t int_value_1 = 1;
double double_value_1 = 0.5;
auto registered_metric_callback_1 = stats_plugins.RegisterCallback(
[&](grpc_core::CallbackMetricReporter& reporter) {
++report_count_1;
reporter.Report(integer_gauge_handle, int_value_1++, {}, {});
reporter.Report(double_gauge_handle, double_value_1++, {}, {});
},
grpc_core::Duration::Milliseconds(50) * grpc_test_slowdown_factor(),
integer_gauge_handle, double_gauge_handle);
int report_count_2 = 0;
int64_t int_value_2 = 1;
double double_value_2 = 0.5;
auto registered_metric_callback_2 = stats_plugins.RegisterCallback(
[&](grpc_core::CallbackMetricReporter& reporter) {
++report_count_2;
reporter.Report(integer_gauge_handle, int_value_2++, {}, {});
reporter.Report(double_gauge_handle, double_value_2++, {}, {});
},
grpc_core::Duration::Milliseconds(50) * grpc_test_slowdown_factor(),
integer_gauge_handle, double_gauge_handle);
constexpr int kIterations = 50;
{
MetricsCollectorThread collector{
this,
grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(),
kIterations,
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) {
return !data.contains(kInt64CallbackGaugeMetric) ||
!data.contains(kDoubleCallbackGaugeMetric);
}};
}
// Verify that callbacks are invoked
EXPECT_EQ(report_count_1, kIterations);
EXPECT_EQ(report_count_2, kIterations);
// Remove one of the callbacks
registered_metric_callback_1.reset();
{
MetricsCollectorThread new_collector{
this,
grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(),
kIterations,
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return false; }};
}
EXPECT_EQ(report_count_1, kIterations); // No change since previous
EXPECT_EQ(report_count_2, 2 * kIterations); // Gets another kIterations
// Remove the other callback as well
registered_metric_callback_2.reset();
MetricsCollectorThread new_new_collector{
this,
grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(),
kIterations,
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return false; }};
// We shouldn't get any new callbacks
EXPECT_THAT(new_new_collector.Stop(), ::testing::IsEmpty());
EXPECT_EQ(report_count_1, kIterations);
EXPECT_EQ(report_count_2, 2 * kIterations);
// Reset stats plugins as well
grpc_core::GlobalStatsPluginRegistryTestPeer::
ResetGlobalStatsPluginRegistry();
registered_metric_callback_2.reset();
MetricsCollectorThread new_new_new_collector{
this,
grpc_core::Duration::Milliseconds(100) * grpc_test_slowdown_factor(),
kIterations,
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return false; }};
// Still no new callbacks
EXPECT_THAT(new_new_new_collector.Stop(), ::testing::IsEmpty());
EXPECT_EQ(report_count_1, kIterations);
EXPECT_EQ(report_count_2, 2 * kIterations);
}
TEST(OpenTelemetryPluginMetricsEnablingDisablingTest, TestEnableDisableAPIs) {
grpc::internal::OpenTelemetryPluginBuilderImpl builder;
// First disable all metrics

Loading…
Cancel
Save