[OTel] Add API to allow filtering target attribute on client side call/attempt metrics (#34285)

Based on https://github.com/grpc/proposal/pull/380
pull/34289/head
Yash Tibrewal 2 years ago committed by GitHub
parent 3d1f242abe
commit 1d136fd05f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      src/cpp/ext/otel/otel_client_filter.cc
  2. 10
      src/cpp/ext/otel/otel_plugin.cc
  3. 15
      src/cpp/ext/otel/otel_plugin.h
  4. 74
      test/cpp/ext/otel/otel_plugin_test.cc
  5. 5
      test/cpp/ext/otel/otel_test_library.cc
  6. 4
      test/cpp/ext/otel/otel_test_library.h

@ -27,6 +27,7 @@
#include <string>
#include <utility>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
@ -70,8 +71,14 @@ const grpc_channel_filter OpenTelemetryClientFilter::kFilter =
absl::StatusOr<OpenTelemetryClientFilter> OpenTelemetryClientFilter::Create(
const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) {
return OpenTelemetryClientFilter(
args.GetOwnedString(GRPC_ARG_SERVER_URI).value_or(""));
std::string target = args.GetOwnedString(GRPC_ARG_SERVER_URI).value_or("");
// Use the original target string only if a filter on the attribute is not
// registered or if the filter returns true, otherwise use "other".
if (OTelPluginState().target_attribute_filter == nullptr ||
OTelPluginState().target_attribute_filter(target)) {
return OpenTelemetryClientFilter(std::move(target));
}
return OpenTelemetryClientFilter("other");
}
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>

@ -131,6 +131,14 @@ OpenTelemetryPluginBuilder& OpenTelemetryPluginBuilder::SetTargetSelector(
return *this;
}
OpenTelemetryPluginBuilder&
OpenTelemetryPluginBuilder::SetTargetAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter) {
target_attribute_filter_ = std::move(target_attribute_filter);
return *this;
}
void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
meter_provider = meter_provider_;
@ -182,6 +190,8 @@ void OpenTelemetryPluginBuilder::BuildAndRegisterGlobal() {
OTelServerCallRcvdTotalCompressedMessageSizeInstrumentName()));
}
g_otel_plugin_state_->labels_injector = std::move(labels_injector_);
g_otel_plugin_state_->target_attribute_filter =
std::move(target_attribute_filter_);
g_otel_plugin_state_->meter_provider = std::move(meter_provider);
grpc_core::ServerCallTracerFactory::RegisterGlobal(
new grpc::internal::OpenTelemetryServerCallTracerFactory);

@ -97,6 +97,8 @@ struct OTelPluginState {
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>
meter_provider;
std::unique_ptr<LabelsInjector> labels_injector;
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter;
};
const struct OTelPluginState& OTelPluginState();
@ -133,12 +135,21 @@ class OpenTelemetryPluginBuilder {
OpenTelemetryPluginBuilder& SetLabelsInjector(
std::unique_ptr<LabelsInjector> labels_injector);
// If set, \a target_selector is called once per channel to decide whether to
// If set, \a target_selector is called per channel to decide whether to
// collect metrics on that target or not.
OpenTelemetryPluginBuilder& SetTargetSelector(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_selector);
// If set, \a target_attribute_filter is called per channel to decide whether
// to record the target attribute on client or to replace it with "other".
// This helps reduce the cardinality on metrics in cases where many channels
// are created with different targets in the same binary (which might happen
// for example, if the channel target string uses IP addresses directly).
OpenTelemetryPluginBuilder& SetTargetAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter);
void BuildAndRegisterGlobal();
// The base set of metrics -
@ -155,6 +166,8 @@ class OpenTelemetryPluginBuilder {
private:
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider_;
std::unique_ptr<LabelsInjector> labels_injector_;
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter_;
absl::flat_hash_set<std::string> metrics_;
absl::AnyInvocable<bool(absl::string_view /*target*/) const> target_selector_;
};

@ -341,6 +341,80 @@ TEST_F(OTelPluginEnd2EndTest, TargetSelectorReturnsFalse) {
ASSERT_TRUE(data.empty());
}
// Test that a target attribute filter returning true records metrics with the
// target as is on the channel.
TEST_F(OTelPluginEnd2EndTest, TargetAttributeFilterReturnsTrue) {
Init({grpc::internal::OTelClientAttemptStartedInstrumentName()}, /*resource=*/
opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false,
/*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
/*target_attribute_filter=*/[](absl::string_view /*target*/) {
return true;
});
SendRPC();
const char* kMetricName = "grpc.client.attempt.started";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); });
ASSERT_EQ(data[kMetricName].size(), 1);
auto point_data = absl::get_if<opentelemetry::sdk::metrics::SumPointData>(
&data[kMetricName][0].point_data);
ASSERT_NE(point_data, nullptr);
auto client_started_value = absl::get_if<int64_t>(&point_data->value_);
ASSERT_NE(client_started_value, nullptr);
EXPECT_EQ(*client_started_value, 1);
const auto& attributes = data[kMetricName][0].attributes.GetAttributes();
EXPECT_EQ(attributes.size(), 2);
const auto* method_value =
absl::get_if<std::string>(&attributes.at("grpc.method"));
ASSERT_NE(method_value, nullptr);
EXPECT_EQ(*method_value, kMethodName);
const auto* target_value =
absl::get_if<std::string>(&attributes.at("grpc.target"));
ASSERT_NE(target_value, nullptr);
EXPECT_EQ(*target_value, canonical_server_address_);
}
// Test that a target attribute filter returning false records metrics with the
// target as "other".
TEST_F(OTelPluginEnd2EndTest, TargetAttributeFilterReturnsFalse) {
Init({grpc::internal::OTelClientAttemptStartedInstrumentName()}, /*resource=*/
opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false,
/*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
/*target_attribute_filter=*/
[server_address = canonical_server_address_](
absl::string_view /*target*/) { return false; });
SendRPC();
const char* kMetricName = "grpc.client.attempt.started";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
/*data*/) { return false; });
ASSERT_EQ(data[kMetricName].size(), 1);
auto point_data = absl::get_if<opentelemetry::sdk::metrics::SumPointData>(
&data[kMetricName][0].point_data);
ASSERT_NE(point_data, nullptr);
auto client_started_value = absl::get_if<int64_t>(&point_data->value_);
ASSERT_NE(client_started_value, nullptr);
EXPECT_EQ(*client_started_value, 1);
const auto& attributes = data[kMetricName][0].attributes.GetAttributes();
EXPECT_EQ(attributes.size(), 2);
const auto* method_value =
absl::get_if<std::string>(&attributes.at("grpc.method"));
ASSERT_NE(method_value, nullptr);
EXPECT_EQ(*method_value, kMethodName);
const auto* target_value =
absl::get_if<std::string>(&attributes.at("grpc.target"));
ASSERT_NE(target_value, nullptr);
EXPECT_EQ(*target_value, "other");
}
} // namespace
} // namespace testing
} // namespace grpc

@ -41,7 +41,9 @@ void OTelPluginEnd2EndTest::Init(
std::unique_ptr<grpc::internal::LabelsInjector> labels_injector,
bool test_no_meter_provider,
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_selector) {
target_selector,
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter) {
// We are resetting the MeterProvider and OpenTelemetry plugin at the start
// of each test to avoid test results from one test carrying over to another
// test. (Some measurements can get arbitrarily delayed.)
@ -63,6 +65,7 @@ void OTelPluginEnd2EndTest::Init(
}
ot_builder.SetLabelsInjector(std::move(labels_injector));
ot_builder.SetTargetSelector(std::move(target_selector));
ot_builder.SetTargetAttributeFilter(std::move(target_attribute_filter));
ot_builder.BuildAndRegisterGlobal();
grpc_init();
grpc::ServerBuilder builder;

@ -63,7 +63,9 @@ class OTelPluginEnd2EndTest : public ::testing::Test {
std::unique_ptr<grpc::internal::LabelsInjector> labels_injector = nullptr,
bool test_no_meter_provider = false,
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_selector =
target_selector = absl::AnyInvocable<bool(absl::string_view) const>(),
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter =
absl::AnyInvocable<bool(absl::string_view) const>());
void TearDown() override;

Loading…
Cancel
Save