[OTel C++] Add ability to select channels for stats based on the target (#34273)

@ctiller PTAL for core configuration changes. I converted the type from
std::function to absl::AnyInvocable. Do you think the functor in
RegisteredBuilder should be callable just once or multiple times?
<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/34285/head
Yash Tibrewal 1 year ago committed by GitHub
parent b388a7e250
commit d4ca41d22d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/cpp/ext/otel/BUILD
  2. 28
      src/cpp/ext/otel/otel_plugin.cc
  3. 8
      src/cpp/ext/otel/otel_plugin.h
  4. 53
      test/cpp/ext/otel/otel_plugin_test.cc
  5. 5
      test/cpp/ext/otel/otel_test_library.cc
  6. 5
      test/cpp/ext/otel/otel_test_library.h

@ -45,6 +45,7 @@ grpc_cc_library(
external_deps = [
"absl/base:core_headers",
"absl/container:flat_hash_set",
"absl/functional:any_invocable",
"absl/status",
"absl/status:statusor",
"absl/strings",

@ -22,6 +22,7 @@
#include <limits.h>
#include <type_traits>
#include <utility>
#include "opentelemetry/metrics/meter.h"
@ -31,7 +32,9 @@
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/surface/channel_stack_type.h"
@ -88,6 +91,7 @@ absl::string_view OTelServerCallSentTotalCompressedMessageSizeInstrumentName() {
absl::string_view OTelServerCallRcvdTotalCompressedMessageSizeInstrumentName() {
return "grpc.server.call.rcvd_total_compressed_message_size";
}
//
// OpenTelemetryPluginBuilder
//
@ -120,6 +124,13 @@ OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetLabelsInjector(
return *this;
}
OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetTargetSelector(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_selector) {
target_selector_ = std::move(target_selector);
return *this;
}
void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
meter_provider = meter_provider_;
@ -175,12 +186,21 @@ void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
grpc_core::ServerCallTracerFactory::RegisterGlobal(
new grpc::internal::OpenTelemetryServerCallTracerFactory);
grpc_core::CoreConfiguration::RegisterBuilder(
[](grpc_core::CoreConfiguration::Builder* builder) {
[target_selector = std::move(target_selector_)](
grpc_core::CoreConfiguration::Builder* builder) mutable {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_CHANNEL, /*priority=*/INT_MAX,
[](grpc_core::ChannelStackBuilder* builder) {
builder->PrependFilter(
&grpc::internal::OpenTelemetryClientFilter::kFilter);
[target_selector = std::move(target_selector)](
grpc_core::ChannelStackBuilder* builder) {
// Only register the filter if no channel selector has been set or
// the target selector returns true for the target.
if (target_selector == nullptr ||
target_selector(builder->channel_args()
.GetString(GRPC_ARG_SERVER_URI)
.value_or(""))) {
builder->PrependFilter(
&grpc::internal::OpenTelemetryClientFilter::kFilter);
}
return true;
});
});

@ -29,6 +29,7 @@
#include <utility>
#include "absl/container/flat_hash_set.h"
#include "absl/functional/any_invocable.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "opentelemetry/metrics/meter_provider.h"
@ -132,6 +133,12 @@ class OpenTelemetryPluginBuilder {
OpenTelemetryPluginBuilder& SetLabelsInjector(
std::unique_ptr<LabelsInjector> labels_injector);
// If set, \a target_selector is called once per channel to decide whether to
// collect metrics on that target or not.
OpenTelemetryPluginBuilder& SetTargetSelector(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_selector);
void BuildAndRegisterGlobal();
// The base set of metrics -
@ -149,6 +156,7 @@ class OpenTelemetryPluginBuilder {
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider_;
std::unique_ptr<LabelsInjector> labels_injector_;
absl::flat_hash_set<std::string> metrics_;
absl::AnyInvocable<bool(absl::string_view /*target*/) const> target_selector_;
};
} // namespace internal

@ -288,6 +288,59 @@ TEST_F(OTelPluginEnd2EndTest, NoMeterProviderRegistered) {
SendRPC();
}
// Test that a channel selector returning true records metrics on the channel.
TEST_F(OTelPluginEnd2EndTest, TargetSelectorReturnsTrue) {
Init({grpc::internal::OTelClientAttemptStartedInstrumentName()}, /*resource=*/
opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false,
/*target_selector=*/
[](absl::string_view /*target*/) { return true; });
SendRPC();
const char* kMetricName = "grpc.client.attempt.started";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); });
ASSERT_EQ(data[kMetricName].size(), 1);
auto point_data = absl::get_if<opentelemetry::sdk::metrics::SumPointData>(
&data[kMetricName][0].point_data);
ASSERT_NE(point_data, nullptr);
auto client_started_value = absl::get_if<int64_t>(&point_data->value_);
ASSERT_NE(client_started_value, nullptr);
EXPECT_EQ(*client_started_value, 1);
const auto& attributes = data[kMetricName][0].attributes.GetAttributes();
EXPECT_EQ(attributes.size(), 2);
const auto* method_value =
absl::get_if<std::string>(&attributes.at("grpc.method"));
ASSERT_NE(method_value, nullptr);
EXPECT_EQ(*method_value, kMethodName);
const auto* target_value =
absl::get_if<std::string>(&attributes.at("grpc.target"));
ASSERT_NE(target_value, nullptr);
EXPECT_EQ(*target_value, canonical_server_address_);
}
// Test that a target selector returning false does not record metrics on the
// channel.
TEST_F(OTelPluginEnd2EndTest, TargetSelectorReturnsFalse) {
Init({grpc::internal::OTelClientAttemptStartedInstrumentName()}, /*resource=*/
opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false,
/*target_selector=*/
[server_address = canonical_server_address_](
absl::string_view /*target*/) { return false; });
SendRPC();
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
/*data*/) { return false; });
ASSERT_TRUE(data.empty());
}
} // namespace
} // namespace testing
} // namespace grpc

@ -39,7 +39,9 @@ void OTelPluginEnd2EndTest::Init(
const absl::flat_hash_set<absl::string_view>& metric_names,
opentelemetry::sdk::resource::Resource resource,
std::unique_ptr<grpc::internal::LabelsInjector> labels_injector,
bool test_no_meter_provider) {
bool test_no_meter_provider,
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_selector) {
// We are resetting the MeterProvider and OpenTelemetry plugin at the start
// of each test to avoid test results from one test carrying over to another
// test. (Some measurements can get arbitrarily delayed.)
@ -60,6 +62,7 @@ void OTelPluginEnd2EndTest::Init(
ot_builder.SetMeterProvider(std::move(meter_provider));
}
ot_builder.SetLabelsInjector(std::move(labels_injector));
ot_builder.SetTargetSelector(std::move(target_selector));
ot_builder.BuildAndRegisterGlobal();
grpc_init();
grpc::ServerBuilder builder;

@ -61,7 +61,10 @@ class OTelPluginEnd2EndTest : public ::testing::Test {
opentelemetry::sdk::resource::Resource resource =
opentelemetry::sdk::resource::Resource::Create({}),
std::unique_ptr<grpc::internal::LabelsInjector> labels_injector = nullptr,
bool test_no_meter_provider = false);
bool test_no_meter_provider = false,
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_selector =
absl::AnyInvocable<bool(absl::string_view) const>());
void TearDown() override;

Loading…
Cancel
Save