[CSM O11Y] CSM Service Label Plumbing from LB Policies to CallAttemptTracer (#35210)

<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes #35210

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35210 from yijiem:csm-service-label 6a6a7d1774
PiperOrigin-RevId: 597641393
pull/35529/head
Yijie Ma 1 year ago committed by Copybara-Service
parent 22682a78f6
commit 77ad5a786e
  1. 2
      CMakeLists.txt
  2. 6
      build_autogenerated.yaml
  3. 7
      src/core/ext/filters/client_channel/client_channel.cc
  4. 2
      src/core/ext/filters/client_channel/client_channel_internal.h
  5. 10
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  6. 32
      src/core/ext/xds/xds_cluster.cc
  7. 2
      src/core/ext/xds/xds_cluster.h
  8. 7
      src/core/lib/channel/call_tracer.cc
  9. 10
      src/core/lib/channel/call_tracer.h
  10. 42
      src/cpp/ext/csm/metadata_exchange.cc
  11. 14
      src/cpp/ext/csm/metadata_exchange.h
  12. 3
      src/cpp/ext/filters/census/open_census_call_tracer.h
  13. 40
      src/cpp/ext/otel/key_value_iterable.h
  14. 7
      src/cpp/ext/otel/otel_call_tracer.h
  15. 20
      src/cpp/ext/otel/otel_client_filter.cc
  16. 26
      src/cpp/ext/otel/otel_plugin.h
  17. 13
      src/cpp/ext/otel/otel_server_call_tracer.cc
  18. 40
      src/proto/grpc/testing/xds/v3/base.proto
  19. 7
      src/proto/grpc/testing/xds/v3/cluster.proto
  20. 4
      src/python/grpcio_observability/grpc_observability/client_call_tracer.h
  21. 1
      test/core/channel/BUILD
  22. 102
      test/core/channel/call_tracer_test.cc
  23. 4
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  24. 5
      test/core/end2end/tests/http2_stats.cc
  25. 10
      test/core/util/BUILD
  26. 82
      test/core/util/fake_stats_plugin.cc
  27. 192
      test/core/util/fake_stats_plugin.h
  28. 90
      test/core/xds/xds_cluster_resource_type_test.cc
  29. 1
      test/cpp/end2end/xds/BUILD
  30. 39
      test/cpp/end2end/xds/xds_cluster_end2end_test.cc
  31. 39
      test/cpp/ext/csm/metadata_exchange_test.cc
  32. 28
      test/cpp/ext/otel/otel_plugin_test.cc
  33. 59
      test/cpp/ext/otel/otel_test_library.cc
  34. 2
      test/cpp/ext/otel/otel_test_library.h

2
CMakeLists.txt generated

@ -7809,6 +7809,7 @@ if(gRPC_BUILD_TESTS)
add_executable(call_tracer_test add_executable(call_tracer_test
test/core/channel/call_tracer_test.cc test/core/channel/call_tracer_test.cc
test/core/util/fake_stats_plugin.cc
) )
target_compile_features(call_tracer_test PUBLIC cxx_std_14) target_compile_features(call_tracer_test PUBLIC cxx_std_14)
target_include_directories(call_tracer_test target_include_directories(call_tracer_test
@ -27043,6 +27044,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/string.grpc.pb.h
test/core/util/fake_stats_plugin.cc
test/cpp/end2end/connection_attempt_injector.cc test/cpp/end2end/connection_attempt_injector.cc
test/cpp/end2end/test_service_impl.cc test/cpp/end2end/test_service_impl.cc
test/cpp/end2end/xds/xds_cluster_end2end_test.cc test/cpp/end2end/xds/xds_cluster_end2end_test.cc

@ -6418,9 +6418,11 @@ targets:
gtest: true gtest: true
build: test build: test
language: c++ language: c++
headers: [] headers:
- test/core/util/fake_stats_plugin.h
src: src:
- test/core/channel/call_tracer_test.cc - test/core/channel/call_tracer_test.cc
- test/core/util/fake_stats_plugin.cc
deps: deps:
- gtest - gtest
- grpc_test_util - grpc_test_util
@ -18016,6 +18018,7 @@ targets:
run: false run: false
language: c++ language: c++
headers: headers:
- test/core/util/fake_stats_plugin.h
- test/core/util/scoped_env_var.h - test/core/util/scoped_env_var.h
- test/cpp/end2end/connection_attempt_injector.h - test/cpp/end2end/connection_attempt_injector.h
- test/cpp/end2end/counted_service.h - test/cpp/end2end/counted_service.h
@ -18056,6 +18059,7 @@ targets:
- src/proto/grpc/testing/xds/v3/route.proto - src/proto/grpc/testing/xds/v3/route.proto
- src/proto/grpc/testing/xds/v3/router.proto - src/proto/grpc/testing/xds/v3/router.proto
- src/proto/grpc/testing/xds/v3/string.proto - src/proto/grpc/testing/xds/v3/string.proto
- test/core/util/fake_stats_plugin.cc
- test/cpp/end2end/connection_attempt_injector.cc - test/cpp/end2end/connection_attempt_injector.cc
- test/cpp/end2end/test_service_impl.cc - test/cpp/end2end/test_service_impl.cc
- test/cpp/end2end/xds/xds_cluster_end2end_test.cc - test/cpp/end2end/xds/xds_cluster_end2end_test.cc

@ -2603,6 +2603,8 @@ class ClientChannel::LoadBalancedCall::LbCallState
ServiceConfigCallData::CallAttributeInterface* GetCallAttribute( ServiceConfigCallData::CallAttributeInterface* GetCallAttribute(
UniqueTypeName type) const override; UniqueTypeName type) const override;
ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const override;
private: private:
LoadBalancedCall* lb_call_; LoadBalancedCall* lb_call_;
}; };
@ -2694,6 +2696,11 @@ ClientChannel::LoadBalancedCall::LbCallState::GetCallAttribute(
return service_config_call_data->GetCallAttribute(type); return service_config_call_data->GetCallAttribute(type);
} }
ClientCallTracer::CallAttemptTracer*
ClientChannel::LoadBalancedCall::LbCallState::GetCallAttemptTracer() const {
return lb_call_->call_attempt_tracer();
}
// //
// ClientChannel::LoadBalancedCall::BackendMetricAccessor // ClientChannel::LoadBalancedCall::BackendMetricAccessor
// //

@ -25,6 +25,7 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/context.h" #include "src/core/lib/channel/context.h"
#include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/load_balancing/lb_policy.h" #include "src/core/lib/load_balancing/lb_policy.h"
@ -49,6 +50,7 @@ class ClientChannelLbCallState : public LoadBalancingPolicy::CallState {
public: public:
virtual ServiceConfigCallData::CallAttributeInterface* GetCallAttribute( virtual ServiceConfigCallData::CallAttributeInterface* GetCallAttribute(
UniqueTypeName type) const = 0; UniqueTypeName type) const = 0;
virtual ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const = 0;
}; };
// Internal type for ServiceConfigCallData. Handles call commits. // Internal type for ServiceConfigCallData. Handles call commits.

@ -37,6 +37,7 @@
#include <grpc/impl/connectivity_state.h> #include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/client_channel_internal.h"
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" #include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
@ -76,6 +77,8 @@ TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb");
namespace { namespace {
using OptionalLabelComponent =
ClientCallTracer::CallAttemptTracer::OptionalLabelComponent;
using XdsConfig = XdsDependencyManager::XdsConfig; using XdsConfig = XdsDependencyManager::XdsConfig;
// //
@ -215,6 +218,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_; RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
uint32_t max_concurrent_requests_; uint32_t max_concurrent_requests_;
std::shared_ptr<std::map<std::string, std::string>> service_labels_;
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_; RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
RefCountedPtr<XdsClusterDropStats> drop_stats_; RefCountedPtr<XdsClusterDropStats> drop_stats_;
RefCountedPtr<SubchannelPicker> picker_; RefCountedPtr<SubchannelPicker> picker_;
@ -358,6 +362,7 @@ XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
: call_counter_(xds_cluster_impl_lb->call_counter_), : call_counter_(xds_cluster_impl_lb->call_counter_),
max_concurrent_requests_( max_concurrent_requests_(
xds_cluster_impl_lb->cluster_resource_->max_concurrent_requests), xds_cluster_impl_lb->cluster_resource_->max_concurrent_requests),
service_labels_(xds_cluster_impl_lb->cluster_resource_->telemetry_labels),
drop_config_(xds_cluster_impl_lb->drop_config_), drop_config_(xds_cluster_impl_lb->drop_config_),
drop_stats_(xds_cluster_impl_lb->drop_stats_), drop_stats_(xds_cluster_impl_lb->drop_stats_),
picker_(std::move(picker)) { picker_(std::move(picker)) {
@ -369,6 +374,11 @@ XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick( LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
LoadBalancingPolicy::PickArgs args) { LoadBalancingPolicy::PickArgs args) {
auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
if (call_state->GetCallAttemptTracer() != nullptr) {
call_state->GetCallAttemptTracer()->AddOptionalLabels(
OptionalLabelComponent::kXdsServiceLabels, service_labels_);
}
// Handle EDS drops. // Handle EDS drops.
const std::string* drop_category; const std::string* drop_category;
if (drop_config_ != nullptr && drop_config_->ShouldDrop(&drop_category)) { if (drop_config_ != nullptr && drop_config_->ShouldDrop(&drop_category)) {

@ -48,6 +48,7 @@
#include "envoy/extensions/upstreams/http/v3/http_protocol_options.upb.h" #include "envoy/extensions/upstreams/http/v3/http_protocol_options.upb.h"
#include "google/protobuf/any.upb.h" #include "google/protobuf/any.upb.h"
#include "google/protobuf/duration.upb.h" #include "google/protobuf/duration.upb.h"
#include "google/protobuf/struct.upb.h"
#include "google/protobuf/wrappers.upb.h" #include "google/protobuf/wrappers.upb.h"
#include "upb/base/string_view.h" #include "upb/base/string_view.h"
#include "upb/text/encode.h" #include "upb/text/encode.h"
@ -703,6 +704,37 @@ absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse(
cds_update->override_host_statuses.Add( cds_update->override_host_statuses.Add(
XdsHealthStatus(XdsHealthStatus::kHealthy)); XdsHealthStatus(XdsHealthStatus::kHealthy));
} }
// Record telemetry labels (if any).
const envoy_config_core_v3_Metadata* metadata =
envoy_config_cluster_v3_Cluster_metadata(cluster);
if (metadata != nullptr) {
google_protobuf_Struct* telemetry_labels_struct;
if (envoy_config_core_v3_Metadata_filter_metadata_get(
metadata,
StdStringToUpbString(
absl::string_view("com.google.csm.telemetry_labels")),
&telemetry_labels_struct)) {
auto telemetry_labels =
std::make_shared<std::map<std::string, std::string>>();
size_t iter = kUpb_Map_Begin;
const google_protobuf_Struct_FieldsEntry* fields_entry;
while ((fields_entry = google_protobuf_Struct_fields_next(
telemetry_labels_struct, &iter)) != nullptr) {
// Adds any entry whose value is a string to telemetry_labels.
const google_protobuf_Value* value =
google_protobuf_Struct_FieldsEntry_value(fields_entry);
if (google_protobuf_Value_has_string_value(value)) {
telemetry_labels->emplace(
UpbStringToStdString(
google_protobuf_Struct_FieldsEntry_key(fields_entry)),
UpbStringToStdString(google_protobuf_Value_string_value(value)));
}
}
if (!telemetry_labels->empty()) {
cds_update->telemetry_labels = std::move(telemetry_labels);
}
}
}
// Return result. // Return result.
if (!errors.ok()) { if (!errors.ok()) {
return errors.status(absl::StatusCode::kInvalidArgument, return errors.status(absl::StatusCode::kInvalidArgument,

@ -101,6 +101,8 @@ struct XdsClusterResource : public XdsResourceType::ResourceData {
XdsHealthStatusSet override_host_statuses; XdsHealthStatusSet override_host_statuses;
std::shared_ptr<std::map<std::string, std::string>> telemetry_labels;
bool operator==(const XdsClusterResource& other) const { bool operator==(const XdsClusterResource& other) const {
return type == other.type && lb_policy_config == other.lb_policy_config && return type == other.type && lb_policy_config == other.lb_policy_config &&
lrs_load_reporting_server == other.lrs_load_reporting_server && lrs_load_reporting_server == other.lrs_load_reporting_server &&

@ -146,6 +146,13 @@ class DelegatingClientCallTracer : public ClientCallTracer {
std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override { std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
return nullptr; return nullptr;
} }
void AddOptionalLabels(
OptionalLabelComponent component,
std::shared_ptr<std::map<std::string, std::string>> labels) override {
for (auto* tracer : tracers_) {
tracer->AddOptionalLabels(component, labels);
}
}
std::string TraceId() override { return tracers_[0]->TraceId(); } std::string TraceId() override { return tracers_[0]->TraceId(); }
std::string SpanId() override { return tracers_[0]->SpanId(); } std::string SpanId() override { return tracers_[0]->SpanId(); }
bool IsSampled() override { return tracers_[0]->IsSampled(); } bool IsSampled() override { return tracers_[0]->IsSampled(); }

@ -128,6 +128,11 @@ class ClientCallTracer : public CallTracerAnnotationInterface {
// as transparent retry attempts.) // as transparent retry attempts.)
class CallAttemptTracer : public CallTracerInterface { class CallAttemptTracer : public CallTracerInterface {
public: public:
enum class OptionalLabelComponent : std::uint8_t {
kXdsServiceLabels = 0,
kSize = 1, // keep last
};
~CallAttemptTracer() override {} ~CallAttemptTracer() override {}
// TODO(yashykt): The following two methods `RecordReceivedTrailingMetadata` // TODO(yashykt): The following two methods `RecordReceivedTrailingMetadata`
// and `RecordEnd` should be moved into CallTracerInterface. // and `RecordEnd` should be moved into CallTracerInterface.
@ -140,6 +145,11 @@ class ClientCallTracer : public CallTracerAnnotationInterface {
// Should be the last API call to the object. Once invoked, the tracer // Should be the last API call to the object. Once invoked, the tracer
// library is free to destroy the object. // library is free to destroy the object.
virtual void RecordEnd(const gpr_timespec& latency) = 0; virtual void RecordEnd(const gpr_timespec& latency) = 0;
// Adds optional labels to be reported by the underlying tracer in a call.
virtual void AddOptionalLabels(
OptionalLabelComponent component,
std::shared_ptr<std::map<std::string, std::string>> labels) = 0;
}; };
~ClientCallTracer() override {} ~ClientCallTracer() override {}

@ -42,6 +42,7 @@
#include <grpc/slice.h> #include <grpc/slice.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/gprpp/env.h" #include "src/core/lib/gprpp/env.h"
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/load_file.h" #include "src/core/lib/iomgr/load_file.h"
@ -49,10 +50,14 @@
#include "src/core/lib/json/json_object_loader.h" #include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/json/json_reader.h" #include "src/core/lib/json/json_reader.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
#include "src/cpp/ext/otel/key_value_iterable.h"
namespace grpc { namespace grpc {
namespace internal { namespace internal {
using OptionalLabelComponent =
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelComponent;
namespace { namespace {
// The keys that will be used in the Metadata Exchange between local and remote. // The keys that will be used in the Metadata Exchange between local and remote.
@ -427,5 +432,42 @@ void ServiceMeshLabelsInjector::AddLabels(
serialized_labels_to_send_.Ref()); serialized_labels_to_send_.Ref());
} }
bool ServiceMeshLabelsInjector::AddOptionalLabels(
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_span,
opentelemetry::nostd::function_ref<
bool(opentelemetry::nostd::string_view,
opentelemetry::common::AttributeValue)>
callback) const {
// According to the CSM Observability Metric spec, if the control plane fails
// to provide these labels, the client will set their values to "unknown".
// These default values are set below.
absl::string_view service_name = "unknown";
absl::string_view service_namespace = "unknown";
// Performs JSON label name format to CSM Observability Metric spec format
// conversion.
if (optional_labels_span.size() >
static_cast<size_t>(OptionalLabelComponent::kXdsServiceLabels)) {
const auto& optional_labels = optional_labels_span[static_cast<size_t>(
OptionalLabelComponent::kXdsServiceLabels)];
if (optional_labels != nullptr) {
auto it = optional_labels->find("service_name");
if (it != optional_labels->end()) service_name = it->second;
it = optional_labels->find("service_namespace");
if (it != optional_labels->end()) service_namespace = it->second;
}
}
return callback("csm.service_name",
AbslStrViewToOpenTelemetryStrView(service_name)) &&
callback("csm.service_namespace_name",
AbslStrViewToOpenTelemetryStrView(service_namespace));
}
size_t ServiceMeshLabelsInjector::GetOptionalLabelsSize(
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>)
const {
return 2;
}
} // namespace internal } // namespace internal
} // namespace grpc } // namespace grpc

@ -50,6 +50,20 @@ class ServiceMeshLabelsInjector : public LabelsInjector {
void AddLabels(grpc_metadata_batch* outgoing_initial_metadata, void AddLabels(grpc_metadata_batch* outgoing_initial_metadata,
LabelsIterable* labels_from_incoming_metadata) const override; LabelsIterable* labels_from_incoming_metadata) const override;
// Add optional labels to the traced calls.
bool AddOptionalLabels(
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_span,
opentelemetry::nostd::function_ref<
bool(opentelemetry::nostd::string_view,
opentelemetry::common::AttributeValue)>
callback) const override;
// Gets the size of the actual optional labels.
size_t GetOptionalLabelsSize(
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_span) const override;
private: private:
std::vector<std::pair<absl::string_view, std::string>> local_labels_; std::vector<std::pair<absl::string_view, std::string>> local_labels_;
grpc_core::Slice serialized_labels_to_send_; grpc_core::Slice serialized_labels_to_send_;

@ -103,6 +103,9 @@ class OpenCensusCallTracer : public grpc_core::ClientCallTracer {
void RecordAnnotation(absl::string_view annotation) override; void RecordAnnotation(absl::string_view annotation) override;
void RecordAnnotation(const Annotation& annotation) override; void RecordAnnotation(const Annotation& annotation) override;
std::shared_ptr<grpc_core::TcpTracerInterface> StartNewTcpTrace() override; std::shared_ptr<grpc_core::TcpTracerInterface> StartNewTcpTrace() override;
void AddOptionalLabels(
OptionalLabelComponent,
std::shared_ptr<std::map<std::string, std::string>>) override {}
experimental::CensusContext* context() { return &context_; } experimental::CensusContext* context() { return &context_; }

@ -53,11 +53,16 @@ class KeyValueIterable : public opentelemetry::common::KeyValueIterable {
const std::vector<std::unique_ptr<LabelsIterable>>& const std::vector<std::unique_ptr<LabelsIterable>>&
injected_labels_from_plugin_options, injected_labels_from_plugin_options,
absl::Span<const std::pair<absl::string_view, absl::string_view>> absl::Span<const std::pair<absl::string_view, absl::string_view>>
additional_labels) additional_labels,
const ActivePluginOptionsView* active_plugin_options_view,
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_span)
: injected_labels_iterable_(injected_labels_iterable), : injected_labels_iterable_(injected_labels_iterable),
injected_labels_from_plugin_options_( injected_labels_from_plugin_options_(
injected_labels_from_plugin_options), injected_labels_from_plugin_options),
additional_labels_(additional_labels) {} additional_labels_(additional_labels),
active_plugin_options_view_(active_plugin_options_view),
optional_labels_(optional_labels_span) {}
bool ForEachKeyValue(opentelemetry::nostd::function_ref< bool ForEachKeyValue(opentelemetry::nostd::function_ref<
bool(opentelemetry::nostd::string_view, bool(opentelemetry::nostd::string_view,
@ -72,6 +77,21 @@ class KeyValueIterable : public opentelemetry::common::KeyValueIterable {
} }
} }
} }
if (OpenTelemetryPluginState().labels_injector != nullptr &&
!OpenTelemetryPluginState().labels_injector->AddOptionalLabels(
optional_labels_, callback)) {
return false;
}
if (active_plugin_options_view_ != nullptr &&
!active_plugin_options_view_->ForEach(
[callback, this](
const InternalOpenTelemetryPluginOption& plugin_option,
size_t /*index*/) {
return plugin_option.labels_injector()->AddOptionalLabels(
optional_labels_, callback);
})) {
return false;
}
for (const auto& plugin_option_injected_iterable : for (const auto& plugin_option_injected_iterable :
injected_labels_from_plugin_options_) { injected_labels_from_plugin_options_) {
if (plugin_option_injected_iterable != nullptr) { if (plugin_option_injected_iterable != nullptr) {
@ -104,6 +124,19 @@ class KeyValueIterable : public opentelemetry::common::KeyValueIterable {
} }
} }
size += additional_labels_.size(); size += additional_labels_.size();
if (OpenTelemetryPluginState().labels_injector != nullptr) {
size += OpenTelemetryPluginState().labels_injector->GetOptionalLabelsSize(
optional_labels_);
}
if (active_plugin_options_view_ != nullptr) {
active_plugin_options_view_->ForEach(
[&size, this](const InternalOpenTelemetryPluginOption& plugin_option,
size_t /*index*/) {
size += plugin_option.labels_injector()->GetOptionalLabelsSize(
optional_labels_);
return true;
});
}
return size; return size;
} }
@ -113,6 +146,9 @@ class KeyValueIterable : public opentelemetry::common::KeyValueIterable {
injected_labels_from_plugin_options_; injected_labels_from_plugin_options_;
absl::Span<const std::pair<absl::string_view, absl::string_view>> absl::Span<const std::pair<absl::string_view, absl::string_view>>
additional_labels_; additional_labels_;
const ActivePluginOptionsView* active_plugin_options_view_;
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_;
}; };
} // namespace internal } // namespace internal

@ -91,6 +91,9 @@ class OpenTelemetryCallTracer : public grpc_core::ClientCallTracer {
void RecordAnnotation(absl::string_view /*annotation*/) override; void RecordAnnotation(absl::string_view /*annotation*/) override;
void RecordAnnotation(const Annotation& /*annotation*/) override; void RecordAnnotation(const Annotation& /*annotation*/) override;
std::shared_ptr<grpc_core::TcpTracerInterface> StartNewTcpTrace() override; std::shared_ptr<grpc_core::TcpTracerInterface> StartNewTcpTrace() override;
void AddOptionalLabels(OptionalLabelComponent component,
std::shared_ptr<std::map<std::string, std::string>>
optional_labels) override;
private: private:
const OpenTelemetryCallTracer* parent_; const OpenTelemetryCallTracer* parent_;
@ -98,6 +101,10 @@ class OpenTelemetryCallTracer : public grpc_core::ClientCallTracer {
// Start time (for measuring latency). // Start time (for measuring latency).
absl::Time start_time_; absl::Time start_time_;
std::unique_ptr<LabelsIterable> injected_labels_; std::unique_ptr<LabelsIterable> injected_labels_;
// The indices of the array correspond to the OptionalLabelComponent enum.
std::array<std::shared_ptr<std::map<std::string, std::string>>,
static_cast<size_t>(OptionalLabelComponent::kSize)>
optional_labels_array_;
std::vector<std::unique_ptr<LabelsIterable>> std::vector<std::unique_ptr<LabelsIterable>>
injected_labels_from_plugin_options_; injected_labels_from_plugin_options_;
}; };

@ -132,7 +132,9 @@ OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
// avoid recording a subset of injected labels here. // avoid recording a subset of injected labels here.
OpenTelemetryPluginState().client.attempt.started->Add( OpenTelemetryPluginState().client.attempt.started->Add(
1, KeyValueIterable(/*injected_labels_iterable=*/nullptr, {}, 1, KeyValueIterable(/*injected_labels_iterable=*/nullptr, {},
additional_labels)); additional_labels,
/*active_plugin_options_view=*/nullptr,
/*optional_labels_span=*/{}));
} }
} }
@ -150,6 +152,7 @@ void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
injected_labels_from_plugin_options_.push_back( injected_labels_from_plugin_options_.push_back(
labels_injector->GetLabels(recv_initial_metadata)); labels_injector->GetLabels(recv_initial_metadata));
} }
return true;
}); });
} }
@ -166,6 +169,7 @@ void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
if (labels_injector != nullptr) { if (labels_injector != nullptr) {
labels_injector->AddLabels(send_initial_metadata, nullptr); labels_injector->AddLabels(send_initial_metadata, nullptr);
} }
return true;
}); });
} }
@ -206,9 +210,10 @@ void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
{OpenTelemetryStatusKey(), {OpenTelemetryStatusKey(),
grpc_status_code_to_string( grpc_status_code_to_string(
static_cast<grpc_status_code>(status.code()))}}}; static_cast<grpc_status_code>(status.code()))}}};
KeyValueIterable labels(injected_labels_.get(), KeyValueIterable labels(
injected_labels_from_plugin_options_, injected_labels_.get(), injected_labels_from_plugin_options_,
additional_labels); additional_labels, &parent_->parent_->active_plugin_options_view(),
optional_labels_array_);
if (OpenTelemetryPluginState().client.attempt.duration != nullptr) { if (OpenTelemetryPluginState().client.attempt.duration != nullptr) {
OpenTelemetryPluginState().client.attempt.duration->Record( OpenTelemetryPluginState().client.attempt.duration->Record(
absl::ToDoubleSeconds(absl::Now() - start_time_), labels, absl::ToDoubleSeconds(absl::Now() - start_time_), labels,
@ -262,6 +267,13 @@ OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::StartNewTcpTrace() {
return nullptr; return nullptr;
} }
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::AddOptionalLabels(
OptionalLabelComponent component,
std::shared_ptr<std::map<std::string, std::string>> optional_labels) {
optional_labels_array_[static_cast<std::size_t>(component)] =
std::move(optional_labels);
}
// //
// OpenTelemetryCallTracer // OpenTelemetryCallTracer
// //

@ -79,6 +79,23 @@ class LabelsInjector {
virtual void AddLabels( virtual void AddLabels(
grpc_metadata_batch* outgoing_initial_metadata, grpc_metadata_batch* outgoing_initial_metadata,
LabelsIterable* labels_from_incoming_metadata) const = 0; LabelsIterable* labels_from_incoming_metadata) const = 0;
// Adds optional labels to the traced calls. Each entry in the span
// corresponds to the CallAttemptTracer::OptionalLabelComponent enum. Returns
// false when callback returns false.
virtual bool AddOptionalLabels(
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_span,
opentelemetry::nostd::function_ref<
bool(opentelemetry::nostd::string_view,
opentelemetry::common::AttributeValue)>
callback) const = 0;
// Gets the actual size of the optional labels that the Plugin is going to
// produce through the AddOptionalLabels method.
virtual size_t GetOptionalLabelsSize(
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_span) const = 0;
}; };
class InternalOpenTelemetryPluginOption class InternalOpenTelemetryPluginOption
@ -223,14 +240,17 @@ class ActivePluginOptionsView {
}); });
} }
void ForEach( bool ForEach(
absl::FunctionRef<void(const InternalOpenTelemetryPluginOption&, size_t)> absl::FunctionRef<bool(const InternalOpenTelemetryPluginOption&, size_t)>
func) const { func) const {
for (size_t i = 0; i < OpenTelemetryPluginState().plugin_options.size(); for (size_t i = 0; i < OpenTelemetryPluginState().plugin_options.size();
++i) { ++i) {
const auto& plugin_option = OpenTelemetryPluginState().plugin_options[i]; const auto& plugin_option = OpenTelemetryPluginState().plugin_options[i];
if (active_mask_[i]) func(*plugin_option, i); if (active_mask_[i] && !func(*plugin_option, i)) {
return false;
}
} }
return true;
} }
private: private:

@ -95,6 +95,7 @@ class OpenTelemetryServerCallTracer : public grpc_core::ServerCallTracer {
send_initial_metadata, send_initial_metadata,
injected_labels_from_plugin_options_[index].get()); injected_labels_from_plugin_options_[index].get());
} }
return true;
}); });
} }
@ -186,6 +187,7 @@ void OpenTelemetryServerCallTracer::RecordReceivedInitialMetadata(
injected_labels_from_plugin_options_[index] = injected_labels_from_plugin_options_[index] =
labels_injector->GetLabels(recv_initial_metadata); labels_injector->GetLabels(recv_initial_metadata);
} }
return true;
}); });
registered_method_ = registered_method_ =
recv_initial_metadata->get(grpc_core::GrpcRegisteredMethod()) recv_initial_metadata->get(grpc_core::GrpcRegisteredMethod())
@ -197,7 +199,8 @@ void OpenTelemetryServerCallTracer::RecordReceivedInitialMetadata(
// avoid recording a subset of injected labels here. // avoid recording a subset of injected labels here.
OpenTelemetryPluginState().server.call.started->Add( OpenTelemetryPluginState().server.call.started->Add(
1, KeyValueIterable(/*injected_labels_iterable=*/nullptr, {}, 1, KeyValueIterable(/*injected_labels_iterable=*/nullptr, {},
additional_labels)); additional_labels,
/*active_plugin_options_view=*/nullptr, {}));
} }
} }
@ -215,9 +218,11 @@ void OpenTelemetryServerCallTracer::RecordEnd(
{{OpenTelemetryMethodKey(), MethodForStats()}, {{OpenTelemetryMethodKey(), MethodForStats()},
{OpenTelemetryStatusKey(), {OpenTelemetryStatusKey(),
grpc_status_code_to_string(final_info->final_status)}}}; grpc_status_code_to_string(final_info->final_status)}}};
KeyValueIterable labels(injected_labels_.get(), // Currently we do not have any optional labels on the server side.
injected_labels_from_plugin_options_, KeyValueIterable labels(
additional_labels); injected_labels_.get(), injected_labels_from_plugin_options_,
additional_labels,
/*active_plugin_options_view=*/nullptr, /*optional_labels_span=*/{});
if (OpenTelemetryPluginState().server.call.duration != nullptr) { if (OpenTelemetryPluginState().server.call.duration != nullptr) {
OpenTelemetryPluginState().server.call.duration->Record( OpenTelemetryPluginState().server.call.duration->Record(
absl::ToDoubleSeconds(elapsed_time_), labels, absl::ToDoubleSeconds(elapsed_time_), labels,

@ -129,3 +129,43 @@ message TransportSocket {
google.protobuf.Any typed_config = 3; google.protobuf.Any typed_config = 3;
} }
} }
// Metadata provides additional inputs to filters based on matched listeners,
// filter chains, routes and endpoints. It is structured as a map, usually from
// filter name (in reverse DNS format) to metadata specific to the filter. Metadata
// key-values for a filter are merged as connection and request handling occurs,
// with later values for the same key overriding earlier values.
//
// An example use of metadata is providing additional values to
// http_connection_manager in the envoy.http_connection_manager.access_log
// namespace.
//
// Another example use of metadata is to per service config info in cluster metadata, which may get
// consumed by multiple filters.
//
// For load balancing, Metadata provides a means to subset cluster endpoints.
// Endpoints have a Metadata object associated and routes contain a Metadata
// object to match against. There are some well defined metadata used today for
// this purpose:
//
// * ``{"envoy.lb": {"canary": <bool> }}`` This indicates the canary status of an
// endpoint and is also used during header processing
// (x-envoy-upstream-canary) and for stats purposes.
// [#next-major-version: move to type/metadata/v2]
message Metadata {
// Key is the reverse DNS filter name, e.g. com.acme.widget. The ``envoy.*``
// namespace is reserved for Envoy's built-in filters.
// If both ``filter_metadata`` and
// :ref:`typed_filter_metadata <envoy_v3_api_field_config.core.v3.Metadata.typed_filter_metadata>`
// fields are present in the metadata with same keys,
// only ``typed_filter_metadata`` field will be parsed.
map<string, google.protobuf.Struct> filter_metadata = 1;
// Key is the reverse DNS filter name, e.g. com.acme.widget. The ``envoy.*``
// namespace is reserved for Envoy's built-in filters.
// The value is encoded as google.protobuf.Any.
// If both :ref:`filter_metadata <envoy_v3_api_field_config.core.v3.Metadata.filter_metadata>`
// and ``typed_filter_metadata`` fields are present in the metadata with same keys,
// only ``typed_filter_metadata`` field will be parsed.
map<string, google.protobuf.Any> typed_filter_metadata = 2;
}

@ -252,6 +252,13 @@ message Cluster {
// from the LRS stream here.] // from the LRS stream here.]
core.v3.ConfigSource lrs_server = 42; core.v3.ConfigSource lrs_server = 42;
// The Metadata field can be used to provide additional information about the
// cluster. It can be used for stats, logging, and varying filter behavior.
// Fields should use reverse DNS notation to denote which entity within Envoy
// will need the information. For instance, if the metadata is intended for
// the Router filter, the filter name should be specified as ``envoy.filters.http.router``.
core.v3.Metadata metadata = 25;
core.v3.TypedExtensionConfig upstream_config = 48; core.v3.TypedExtensionConfig upstream_config = 48;
} }

@ -73,6 +73,10 @@ class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer {
void RecordAnnotation(absl::string_view annotation) override; void RecordAnnotation(absl::string_view annotation) override;
void RecordAnnotation(const Annotation& annotation) override; void RecordAnnotation(const Annotation& annotation) override;
std::shared_ptr<grpc_core::TcpTracerInterface> StartNewTcpTrace() override; std::shared_ptr<grpc_core::TcpTracerInterface> StartNewTcpTrace() override;
void AddOptionalLabels(
OptionalLabelComponent /*component*/,
std::shared_ptr<std::map<std::string, std::string>> /*labels*/)
override {}
private: private:
// Maximum size of trace context is sent on the wire. // Maximum size of trace context is sent on the wire.

@ -27,6 +27,7 @@ grpc_cc_test(
uses_polling = False, uses_polling = False,
deps = [ deps = [
"//:grpc", "//:grpc",
"//test/core/util:fake_stats_plugin",
"//test/core/util:grpc_test_util", "//test/core/util:grpc_test_util",
], ],
) )

@ -18,7 +18,6 @@
#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/call_tracer.h"
#include <memory>
#include <vector> #include <vector>
#include "gtest/gtest.h" #include "gtest/gtest.h"
@ -26,115 +25,16 @@
#include <grpc/event_engine/memory_allocator.h> #include <grpc/event_engine/memory_allocator.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include "src/core/lib/channel/tcp_tracer.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/promise/context.h" #include "src/core/lib/promise/context.h"
#include "src/core/lib/resource_quota/memory_quota.h" #include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/util/fake_stats_plugin.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
namespace grpc_core { namespace grpc_core {
namespace { namespace {
class FakeClientCallTracer : public ClientCallTracer {
public:
class FakeClientCallAttemptTracer
: public ClientCallTracer::CallAttemptTracer {
public:
explicit FakeClientCallAttemptTracer(
std::vector<std::string>* annotation_logger)
: annotation_logger_(annotation_logger) {}
~FakeClientCallAttemptTracer() override {}
void RecordSendInitialMetadata(
grpc_metadata_batch* /*send_initial_metadata*/) override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override {}
void RecordSendMessage(const SliceBuffer& /*send_message*/) override {}
void RecordSendCompressedMessage(
const SliceBuffer& /*send_compressed_message*/) override {}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /*recv_initial_metadata*/) override {}
void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {}
void RecordReceivedDecompressedMessage(
const SliceBuffer& /*recv_decompressed_message*/) override {}
void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
void RecordReceivedTrailingMetadata(
absl::Status /*status*/,
grpc_metadata_batch* /*recv_trailing_metadata*/,
const grpc_transport_stream_stats* /*transport_stream_stats*/)
override {}
void RecordEnd(const gpr_timespec& /*latency*/) override { delete this; }
void RecordAnnotation(absl::string_view annotation) override {
annotation_logger_->push_back(std::string(annotation));
}
void RecordAnnotation(const Annotation& /*annotation*/) override {}
std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
return nullptr;
}
std::string TraceId() override { return ""; }
std::string SpanId() override { return ""; }
bool IsSampled() override { return false; }
private:
std::vector<std::string>* annotation_logger_;
};
explicit FakeClientCallTracer(std::vector<std::string>* annotation_logger)
: annotation_logger_(annotation_logger) {}
~FakeClientCallTracer() override {}
CallAttemptTracer* StartNewAttempt(bool /*is_transparent_retry*/) override {
return GetContext<Arena>()->ManagedNew<FakeClientCallAttemptTracer>(
annotation_logger_);
}
void RecordAnnotation(absl::string_view annotation) override {
annotation_logger_->push_back(std::string(annotation));
}
void RecordAnnotation(const Annotation& /*annotation*/) override {}
std::string TraceId() override { return ""; }
std::string SpanId() override { return ""; }
bool IsSampled() override { return false; }
private:
std::vector<std::string>* annotation_logger_;
};
class FakeServerCallTracer : public ServerCallTracer {
public:
explicit FakeServerCallTracer(std::vector<std::string>* annotation_logger)
: annotation_logger_(annotation_logger) {}
~FakeServerCallTracer() override {}
void RecordSendInitialMetadata(
grpc_metadata_batch* /*send_initial_metadata*/) override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override {}
void RecordSendMessage(const SliceBuffer& /*send_message*/) override {}
void RecordSendCompressedMessage(
const SliceBuffer& /*send_compressed_message*/) override {}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /*recv_initial_metadata*/) override {}
void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {}
void RecordReceivedDecompressedMessage(
const SliceBuffer& /*recv_decompressed_message*/) override {}
void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
void RecordReceivedTrailingMetadata(
grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
void RecordEnd(const grpc_call_final_info* /*final_info*/) override {}
void RecordAnnotation(absl::string_view annotation) override {
annotation_logger_->push_back(std::string(annotation));
}
void RecordAnnotation(const Annotation& /*annotation*/) override {}
std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
return nullptr;
}
std::string TraceId() override { return ""; }
std::string SpanId() override { return ""; }
bool IsSampled() override { return false; }
private:
std::vector<std::string>* annotation_logger_;
};
class CallTracerTest : public ::testing::Test { class CallTracerTest : public ::testing::Test {
protected: protected:
void SetUp() override { void SetUp() override {

@ -671,6 +671,10 @@ class LoadBalancingPolicyTest : public ::testing::Test {
return nullptr; return nullptr;
} }
ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const override {
return nullptr;
}
std::vector<void*> allocations_; std::vector<void*> allocations_;
std::map<UniqueTypeName, ServiceConfigCallData::CallAttributeInterface*> std::map<UniqueTypeName, ServiceConfigCallData::CallAttributeInterface*>
attributes_; attributes_;

@ -97,6 +97,11 @@ class FakeCallTracer : public ClientCallTracer {
void RecordAnnotation(absl::string_view /*annotation*/) override {} void RecordAnnotation(absl::string_view /*annotation*/) override {}
void RecordAnnotation(const Annotation& /*annotation*/) override {} void RecordAnnotation(const Annotation& /*annotation*/) override {}
void AddOptionalLabels(
OptionalLabelComponent /*component*/,
std::shared_ptr<std::map<std::string, std::string>> /*labels*/)
override {}
static grpc_transport_stream_stats transport_stream_stats() { static grpc_transport_stream_stats transport_stream_stats() {
MutexLock lock(g_mu); MutexLock lock(g_mu);
return transport_stream_stats_; return transport_stream_stats_;

@ -491,3 +491,13 @@ grpc_cc_library(
"//src/core:resource_quota", "//src/core:resource_quota",
], ],
) )
grpc_cc_library(
name = "fake_stats_plugin",
srcs = ["fake_stats_plugin.cc"],
hdrs = ["fake_stats_plugin.h"],
deps = [
"//:grpc",
"//src/core:examine_stack",
],
)

@ -0,0 +1,82 @@
// Copyright 2023 The gRPC Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/util/fake_stats_plugin.h"
#include "src/core/lib/config/core_configuration.h"
namespace grpc_core {
class FakeStatsClientFilter : public ChannelFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<FakeStatsClientFilter> Create(
const ChannelArgs& /*args*/, ChannelFilter::Args /*filter_args*/);
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
private:
explicit FakeStatsClientFilter(
FakeClientCallTracerFactory* fake_client_call_tracer_factory);
FakeClientCallTracerFactory* const fake_client_call_tracer_factory_;
};
const grpc_channel_filter FakeStatsClientFilter::kFilter =
MakePromiseBasedFilter<FakeStatsClientFilter, FilterEndpoint::kClient>(
"fake_stats_client");
absl::StatusOr<FakeStatsClientFilter> FakeStatsClientFilter::Create(
const ChannelArgs& args, ChannelFilter::Args /*filter_args*/) {
auto* fake_client_call_tracer_factory =
args.GetPointer<FakeClientCallTracerFactory>(
GRPC_ARG_INJECT_FAKE_CLIENT_CALL_TRACER_FACTORY);
GPR_ASSERT(fake_client_call_tracer_factory != nullptr);
return FakeStatsClientFilter(fake_client_call_tracer_factory);
}
ArenaPromise<ServerMetadataHandle> FakeStatsClientFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
FakeClientCallTracer* client_call_tracer =
fake_client_call_tracer_factory_->CreateFakeClientCallTracer();
if (client_call_tracer != nullptr) {
auto* call_context = GetContext<grpc_call_context_element>();
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value =
client_call_tracer;
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy =
nullptr;
}
return next_promise_factory(std::move(call_args));
}
FakeStatsClientFilter::FakeStatsClientFilter(
FakeClientCallTracerFactory* fake_client_call_tracer_factory)
: fake_client_call_tracer_factory_(fake_client_call_tracer_factory) {}
void RegisterFakeStatsPlugin() {
CoreConfiguration::RegisterBuilder(
[](CoreConfiguration::Builder* builder) mutable {
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_CHANNEL,
&FakeStatsClientFilter::kFilter)
.If([](const ChannelArgs& args) {
return args.GetPointer<FakeClientCallTracerFactory>(
GRPC_ARG_INJECT_FAKE_CLIENT_CALL_TRACER_FACTORY) !=
nullptr;
});
});
}
} // namespace grpc_core

@ -0,0 +1,192 @@
// Copyright 2023 The gRPC Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_TEST_CORE_UTIL_FAKE_STATS_PLUGIN_H
#define GRPC_TEST_CORE_UTIL_FAKE_STATS_PLUGIN_H
#include <memory>
#include <string>
#include <vector>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/channel/tcp_tracer.h"
namespace grpc_core {
// Registers a FakeStatsClientFilter as a client channel filter if there is a
// FakeClientCallTracerFactory in the channel args. This filter will use the
// FakeClientCallTracerFactory to create and inject a FakeClientCallTracer into
// the call context.
// Example usage:
// RegisterFakeStatsPlugin(); // before grpc_init()
//
// // Creates a FakeClientCallTracerFactory and adds it into the channel args.
// FakeClientCallTracerFactory fake_client_call_tracer_factory;
// ChannelArguments channel_args;
// channel_args.SetPointer(GRPC_ARG_INJECT_FAKE_CLIENT_CALL_TRACER_FACTORY,
// &fake_client_call_tracer_factory);
//
// // After the system under test has been executed (e.g. an RPC has been
// // sent), use the FakeClientCallTracerFactory to verify certain
// // expectations.
// EXPECT_THAT(fake_client_call_tracer_factory.GetLastFakeClientCallTracer()
// ->GetLastCallAttemptTracer()
// ->GetOptionalLabels(),
// VerifyCsmServiceLabels());
void RegisterFakeStatsPlugin();
class FakeClientCallTracer : public ClientCallTracer {
public:
class FakeClientCallAttemptTracer
: public ClientCallTracer::CallAttemptTracer {
public:
explicit FakeClientCallAttemptTracer(
std::vector<std::string>* annotation_logger)
: annotation_logger_(annotation_logger) {}
~FakeClientCallAttemptTracer() override {}
void RecordSendInitialMetadata(
grpc_metadata_batch* /*send_initial_metadata*/) override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override {}
void RecordSendMessage(const SliceBuffer& /*send_message*/) override {}
void RecordSendCompressedMessage(
const SliceBuffer& /*send_compressed_message*/) override {}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /*recv_initial_metadata*/) override {}
void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {}
void RecordReceivedDecompressedMessage(
const SliceBuffer& /*recv_decompressed_message*/) override {}
void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
void RecordReceivedTrailingMetadata(
absl::Status /*status*/,
grpc_metadata_batch* /*recv_trailing_metadata*/,
const grpc_transport_stream_stats* /*transport_stream_stats*/)
override {}
void RecordEnd(const gpr_timespec& /*latency*/) override {}
void RecordAnnotation(absl::string_view annotation) override {
annotation_logger_->push_back(std::string(annotation));
}
void RecordAnnotation(const Annotation& /*annotation*/) override {}
std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
return nullptr;
}
void AddOptionalLabels(
OptionalLabelComponent component,
std::shared_ptr<std::map<std::string, std::string>> labels) override {
optional_labels_.emplace(component, std::move(labels));
}
std::string TraceId() override { return ""; }
std::string SpanId() override { return ""; }
bool IsSampled() override { return false; }
const std::map<OptionalLabelComponent,
std::shared_ptr<std::map<std::string, std::string>>>&
GetOptionalLabels() const {
return optional_labels_;
}
private:
std::vector<std::string>* annotation_logger_;
std::map<OptionalLabelComponent,
std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_;
};
explicit FakeClientCallTracer(std::vector<std::string>* annotation_logger)
: annotation_logger_(annotation_logger) {}
~FakeClientCallTracer() override {}
CallAttemptTracer* StartNewAttempt(bool /*is_transparent_retry*/) override {
call_attempt_tracers_.emplace_back(
new FakeClientCallAttemptTracer(annotation_logger_));
return call_attempt_tracers_.back().get();
}
void RecordAnnotation(absl::string_view annotation) override {
annotation_logger_->push_back(std::string(annotation));
}
void RecordAnnotation(const Annotation& /*annotation*/) override {}
std::string TraceId() override { return ""; }
std::string SpanId() override { return ""; }
bool IsSampled() override { return false; }
FakeClientCallAttemptTracer* GetLastCallAttemptTracer() const {
return call_attempt_tracers_.back().get();
}
private:
std::vector<std::string>* annotation_logger_;
std::vector<std::unique_ptr<FakeClientCallAttemptTracer>>
call_attempt_tracers_;
};
#define GRPC_ARG_INJECT_FAKE_CLIENT_CALL_TRACER_FACTORY \
"grpc.testing.inject_fake_client_call_tracer_factory"
class FakeClientCallTracerFactory {
public:
FakeClientCallTracer* CreateFakeClientCallTracer() {
fake_client_call_tracers_.emplace_back(
new FakeClientCallTracer(&annotation_logger_));
return fake_client_call_tracers_.back().get();
}
FakeClientCallTracer* GetLastFakeClientCallTracer() {
return fake_client_call_tracers_.back().get();
}
private:
std::vector<std::string> annotation_logger_;
std::vector<std::unique_ptr<FakeClientCallTracer>> fake_client_call_tracers_;
};
class FakeServerCallTracer : public ServerCallTracer {
public:
explicit FakeServerCallTracer(std::vector<std::string>* annotation_logger)
: annotation_logger_(annotation_logger) {}
~FakeServerCallTracer() override {}
void RecordSendInitialMetadata(
grpc_metadata_batch* /*send_initial_metadata*/) override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override {}
void RecordSendMessage(const SliceBuffer& /*send_message*/) override {}
void RecordSendCompressedMessage(
const SliceBuffer& /*send_compressed_message*/) override {}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /*recv_initial_metadata*/) override {}
void RecordReceivedMessage(const SliceBuffer& /*recv_message*/) override {}
void RecordReceivedDecompressedMessage(
const SliceBuffer& /*recv_decompressed_message*/) override {}
void RecordCancel(grpc_error_handle /*cancel_error*/) override {}
void RecordReceivedTrailingMetadata(
grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
void RecordEnd(const grpc_call_final_info* /*final_info*/) override {}
void RecordAnnotation(absl::string_view annotation) override {
annotation_logger_->push_back(std::string(annotation));
}
void RecordAnnotation(const Annotation& /*annotation*/) override {}
std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
return nullptr;
}
std::string TraceId() override { return ""; }
std::string SpanId() override { return ""; }
bool IsSampled() override { return false; }
private:
std::vector<std::string>* annotation_logger_;
};
} // namespace grpc_core
#endif // GRPC_TEST_CORE_UTIL_FAKE_STATS_PLUGIN_H

@ -20,6 +20,7 @@
#include <google/protobuf/any.pb.h> #include <google/protobuf/any.pb.h>
#include <google/protobuf/duration.pb.h> #include <google/protobuf/duration.pb.h>
#include <google/protobuf/struct.pb.h>
#include <google/protobuf/wrappers.pb.h> #include <google/protobuf/wrappers.pb.h>
#include "absl/status/status.h" #include "absl/status/status.h"
@ -1613,6 +1614,95 @@ TEST_F(HostOverrideStatusTest, CanExplicitlySetToEmpty) {
EXPECT_EQ(resource.override_host_statuses.ToString(), "{}"); EXPECT_EQ(resource.override_host_statuses.ToString(), "{}");
} }
using TelemetryLabelTest = XdsClusterTest;
TEST_F(TelemetryLabelTest, ValidServiceLabelsConfig) {
Cluster cluster;
cluster.set_type(cluster.EDS);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata();
auto& label_map =
*filter_map["com.google.csm.telemetry_labels"].mutable_fields();
*label_map["service_name"].mutable_string_value() = "abc";
*label_map["service_namespace"].mutable_string_value() = "xyz";
std::string serialized_resource;
ASSERT_TRUE(cluster.SerializeToString(&serialized_resource));
auto* resource_type = XdsClusterResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_THAT(*resource.telemetry_labels,
::testing::UnorderedElementsAre(
::testing::Pair("service_name", "abc"),
::testing::Pair("service_namespace", "xyz")));
}
TEST_F(TelemetryLabelTest, MissingMetadataField) {
Cluster cluster;
cluster.set_type(cluster.EDS);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
std::string serialized_resource;
ASSERT_TRUE(cluster.SerializeToString(&serialized_resource));
auto* resource_type = XdsClusterResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_EQ(resource.telemetry_labels, nullptr);
}
TEST_F(TelemetryLabelTest, MissingCsmFilterMetadataField) {
Cluster cluster;
cluster.set_type(cluster.EDS);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata();
auto& label_map = *filter_map["some_key"].mutable_fields();
*label_map["some_value"].mutable_string_value() = "abc";
std::string serialized_resource;
ASSERT_TRUE(cluster.SerializeToString(&serialized_resource));
auto* resource_type = XdsClusterResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_EQ(resource.telemetry_labels, nullptr);
}
TEST_F(TelemetryLabelTest, IgnoreNonStringEntries) {
Cluster cluster;
cluster.set_type(cluster.EDS);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata();
auto& label_map =
*filter_map["com.google.csm.telemetry_labels"].mutable_fields();
label_map["bool_value"].set_bool_value(true);
label_map["number_value"].set_number_value(3.14);
*label_map["string_value"].mutable_string_value() = "abc";
label_map["null_value"].set_null_value(::google::protobuf::NULL_VALUE);
auto& list_value_values =
*label_map["list_value"].mutable_list_value()->mutable_values();
*list_value_values.Add()->mutable_string_value() = "efg";
list_value_values.Add()->set_number_value(3.14);
auto& struct_value_fields =
*label_map["struct_value"].mutable_struct_value()->mutable_fields();
struct_value_fields["bool_value"].set_bool_value(false);
std::string serialized_resource;
ASSERT_TRUE(cluster.SerializeToString(&serialized_resource));
auto* resource_type = XdsClusterResourceType::Get();
auto decode_result =
resource_type->Decode(decode_context_, serialized_resource);
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_THAT(
*resource.telemetry_labels,
::testing::UnorderedElementsAre(::testing::Pair("string_value", "abc")));
}
} // namespace } // namespace
} // namespace testing } // namespace testing
} // namespace grpc_core } // namespace grpc_core

@ -169,6 +169,7 @@ grpc_cc_test(
"//:gpr", "//:gpr",
"//:grpc", "//:grpc",
"//:grpc++", "//:grpc++",
"//test/core/util:fake_stats_plugin",
"//test/core/util:grpc_test_util", "//test/core/util:grpc_test_util",
"//test/core/util:scoped_env_var", "//test/core/util:scoped_env_var",
"//test/cpp/end2end:connection_attempt_injector", "//test/cpp/end2end:connection_attempt_injector",

@ -25,9 +25,12 @@
#include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/config/config_vars.h" #include "src/core/lib/config/config_vars.h"
#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/surface/call.h"
#include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h" #include "src/proto/grpc/testing/xds/v3/orca_load_report.pb.h"
#include "test/core/util/fake_stats_plugin.h"
#include "test/core/util/scoped_env_var.h" #include "test/core/util/scoped_env_var.h"
#include "test/cpp/end2end/connection_attempt_injector.h" #include "test/cpp/end2end/connection_attempt_injector.h"
#include "test/cpp/end2end/xds/xds_end2end_test_lib.h" #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
@ -42,6 +45,8 @@ using ::envoy::config::core::v3::HealthStatus;
using ::envoy::type::v3::FractionalPercent; using ::envoy::type::v3::FractionalPercent;
using ClientStats = LrsServiceImpl::ClientStats; using ClientStats = LrsServiceImpl::ClientStats;
using OptionalLabelComponent =
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelComponent;
constexpr char kLbDropType[] = "lb"; constexpr char kLbDropType[] = "lb";
constexpr char kThrottleDropType[] = "throttle"; constexpr char kThrottleDropType[] = "throttle";
@ -304,6 +309,39 @@ TEST_P(CdsTest, ClusterChangeAfterAdsCallFails) {
WaitForBackend(DEBUG_LOCATION, 1); WaitForBackend(DEBUG_LOCATION, 1);
} }
TEST_P(CdsTest, VerifyCsmServiceLabelsParsing) {
// Injects a fake client call tracer factory. Try keep this at top.
grpc_core::FakeClientCallTracerFactory fake_client_call_tracer_factory;
CreateAndStartBackends(1);
// Populates EDS resources.
EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
// Populates service labels to CDS resources.
auto cluster = default_cluster_;
auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata();
auto& label_map =
*filter_map["com.google.csm.telemetry_labels"].mutable_fields();
*label_map["service_name"].mutable_string_value() = "myservice";
*label_map["service_namespace"].mutable_string_value() = "mynamespace";
balancer_->ads_service()->SetCdsResource(cluster);
ChannelArguments channel_args;
channel_args.SetPointer(GRPC_ARG_INJECT_FAKE_CLIENT_CALL_TRACER_FACTORY,
&fake_client_call_tracer_factory);
ResetStub(/*failover_timeout_ms=*/0, &channel_args);
// Sends an RPC and verifies that the service labels are recorded in the fake
// client call tracer.
CheckRpcSendOk(DEBUG_LOCATION);
EXPECT_THAT(fake_client_call_tracer_factory.GetLastFakeClientCallTracer()
->GetLastCallAttemptTracer()
->GetOptionalLabels(),
::testing::ElementsAre(::testing::Pair(
OptionalLabelComponent::kXdsServiceLabels,
::testing::Pointee(::testing::ElementsAre(
::testing::Pair("service_name", "myservice"),
::testing::Pair("service_namespace", "mynamespace"))))));
balancer_->Shutdown();
}
// //
// CDS deletion tests // CDS deletion tests
// //
@ -1874,6 +1912,7 @@ int main(int argc, char** argv) {
// Workaround Apple CFStream bug // Workaround Apple CFStream bug
grpc_core::SetEnv("grpc_cfstream", "0"); grpc_core::SetEnv("grpc_cfstream", "0");
#endif #endif
grpc_core::RegisterFakeStatsPlugin();
grpc_init(); grpc_init();
grpc::testing::ConnectionAttemptInjector::Init(); grpc::testing::ConnectionAttemptInjector::Init();
const auto result = RUN_ALL_TESTS(); const auto result = RUN_ALL_TESTS();

@ -113,7 +113,8 @@ class MetadataExchangeTest
public ::testing::WithParamInterface<TestScenario> { public ::testing::WithParamInterface<TestScenario> {
protected: protected:
void Init(const absl::flat_hash_set<absl::string_view>& metric_names, void Init(const absl::flat_hash_set<absl::string_view>& metric_names,
bool enable_client_side_injector = true) { bool enable_client_side_injector = true,
const std::map<std::string, std::string>& labels_to_inject = {}) {
const char* kBootstrap = const char* kBootstrap =
"{\"node\": {\"id\": " "{\"node\": {\"id\": "
"\"projects/1234567890/networks/mesh:mesh-id/nodes/" "\"projects/1234567890/networks/mesh:mesh-id/nodes/"
@ -137,7 +138,7 @@ class MetadataExchangeTest
/*labels_injector=*/ /*labels_injector=*/
std::make_unique<grpc::internal::ServiceMeshLabelsInjector>( std::make_unique<grpc::internal::ServiceMeshLabelsInjector>(
GetParam().GetTestResource().GetAttributes()), GetParam().GetTestResource().GetAttributes()),
/*test_no_meter_provider=*/false, /*test_no_meter_provider=*/false, labels_to_inject,
/*target_selector=*/ /*target_selector=*/
[enable_client_side_injector](absl::string_view /*target*/) { [enable_client_side_injector](absl::string_view /*target*/) {
return enable_client_side_injector; return enable_client_side_injector;
@ -156,11 +157,19 @@ class MetadataExchangeTest
void VerifyServiceMeshAttributes( void VerifyServiceMeshAttributes(
const std::map<std::string, const std::map<std::string,
opentelemetry::sdk::common::OwnedAttributeValue>& opentelemetry::sdk::common::OwnedAttributeValue>&
attributes) { attributes,
bool verify_client_only_attributes = true) {
EXPECT_EQ( EXPECT_EQ(
absl::get<std::string>(attributes.at("csm.workload_canonical_service")), absl::get<std::string>(attributes.at("csm.workload_canonical_service")),
"canonical_service"); "canonical_service");
EXPECT_EQ(absl::get<std::string>(attributes.at("csm.mesh_id")), "mesh-id"); EXPECT_EQ(absl::get<std::string>(attributes.at("csm.mesh_id")), "mesh-id");
if (verify_client_only_attributes) {
EXPECT_EQ(absl::get<std::string>(attributes.at("csm.service_name")),
"unknown");
EXPECT_EQ(
absl::get<std::string>(attributes.at("csm.service_namespace_name")),
"unknown");
}
switch (GetParam().type()) { switch (GetParam().type()) {
case TestScenario::ResourceType::kGke: case TestScenario::ResourceType::kGke:
EXPECT_EQ( EXPECT_EQ(
@ -295,7 +304,8 @@ TEST_P(MetadataExchangeTest, ServerCallDuration) {
const auto& attributes = data[kMetricName][0].attributes.GetAttributes(); const auto& attributes = data[kMetricName][0].attributes.GetAttributes();
EXPECT_EQ(absl::get<std::string>(attributes.at("grpc.method")), kMethodName); EXPECT_EQ(absl::get<std::string>(attributes.at("grpc.method")), kMethodName);
EXPECT_EQ(absl::get<std::string>(attributes.at("grpc.status")), "OK"); EXPECT_EQ(absl::get<std::string>(attributes.at("grpc.status")), "OK");
VerifyServiceMeshAttributes(attributes); VerifyServiceMeshAttributes(attributes,
/*verify_client_only_attributes=*/false);
} }
// Test that the server records unknown when the client does not send metadata // Test that the server records unknown when the client does not send metadata
@ -328,6 +338,27 @@ TEST_P(MetadataExchangeTest, ClientDoesNotSendMetadata) {
"unknown"); "unknown");
} }
TEST_P(MetadataExchangeTest, VerifyCsmServiceLabels) {
Init(/*metric_names=*/{grpc::experimental::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName},
/*enable_client_side_injector=*/true,
// Injects CSM service labels to be recorded in the call.
{{"service_name", "myservice"}, {"service_namespace", "mynamespace"}});
SendRPC();
const char* kMetricName = "grpc.client.attempt.duration";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains(kMetricName); });
ASSERT_EQ(data[kMetricName].size(), 1);
const auto& attributes = data[kMetricName][0].attributes.GetAttributes();
EXPECT_EQ(absl::get<std::string>(attributes.at("csm.service_name")),
"myservice");
EXPECT_EQ(absl::get<std::string>(attributes.at("csm.service_namespace_name")),
"mynamespace");
}
INSTANTIATE_TEST_SUITE_P( INSTANTIATE_TEST_SUITE_P(
MetadataExchange, MetadataExchangeTest, MetadataExchange, MetadataExchangeTest,
::testing::Values( ::testing::Values(

@ -309,6 +309,7 @@ TEST_F(OpenTelemetryPluginEnd2EndTest, TargetSelectorReturnsTrue) {
opentelemetry::sdk::resource::Resource::Create({}), opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr, /*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false, /*test_no_meter_provider=*/false,
/*labels_to_inject=*/{},
/*target_selector=*/ /*target_selector=*/
[](absl::string_view /*target*/) { return true; }); [](absl::string_view /*target*/) { return true; });
SendRPC(); SendRPC();
@ -345,6 +346,7 @@ TEST_F(OpenTelemetryPluginEnd2EndTest, TargetSelectorReturnsFalse) {
opentelemetry::sdk::resource::Resource::Create({}), opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr, /*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false, /*test_no_meter_provider=*/false,
/*labels_to_inject=*/{},
/*target_selector=*/ /*target_selector=*/
[](absl::string_view /*target*/) { return false; }); [](absl::string_view /*target*/) { return false; });
SendRPC(); SendRPC();
@ -364,6 +366,7 @@ TEST_F(OpenTelemetryPluginEnd2EndTest, TargetAttributeFilterReturnsTrue) {
opentelemetry::sdk::resource::Resource::Create({}), opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr, /*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false, /*test_no_meter_provider=*/false,
/*labels_to_inject=*/{},
/*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(), /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
/*target_attribute_filter=*/[](absl::string_view /*target*/) { /*target_attribute_filter=*/[](absl::string_view /*target*/) {
return true; return true;
@ -402,6 +405,7 @@ TEST_F(OpenTelemetryPluginEnd2EndTest, TargetAttributeFilterReturnsFalse) {
opentelemetry::sdk::resource::Resource::Create({}), opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr, /*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false, /*test_no_meter_provider=*/false,
/*labels_to_inject=*/{},
/*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(), /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
/*target_attribute_filter=*/ /*target_attribute_filter=*/
[server_address = canonical_server_address_]( [server_address = canonical_server_address_](
@ -469,6 +473,7 @@ TEST_F(OpenTelemetryPluginEnd2EndTest,
/*resource=*/opentelemetry::sdk::resource::Resource::Create({}), /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr, /*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false, /*test_no_meter_provider=*/false,
/*labels_to_inject=*/{},
/*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(), /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
/*target_attribute_filter=*/ /*target_attribute_filter=*/
absl::AnyInvocable<bool(absl::string_view) const>(), absl::AnyInvocable<bool(absl::string_view) const>(),
@ -508,6 +513,7 @@ TEST_F(OpenTelemetryPluginEnd2EndTest,
/*resource=*/opentelemetry::sdk::resource::Resource::Create({}), /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr, /*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false, /*test_no_meter_provider=*/false,
/*labels_to_inject=*/{},
/*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(), /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
/*target_attribute_filter=*/ /*target_attribute_filter=*/
absl::AnyInvocable<bool(absl::string_view) const>(), absl::AnyInvocable<bool(absl::string_view) const>(),
@ -575,6 +581,7 @@ TEST_F(OpenTelemetryPluginEnd2EndTest,
/*resource=*/opentelemetry::sdk::resource::Resource::Create({}), /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr, /*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false, /*test_no_meter_provider=*/false,
/*labels_to_inject=*/{},
/*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(), /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
/*target_attribute_filter=*/ /*target_attribute_filter=*/
absl::AnyInvocable<bool(absl::string_view) const>(), absl::AnyInvocable<bool(absl::string_view) const>(),
@ -613,6 +620,7 @@ TEST_F(OpenTelemetryPluginEnd2EndTest,
/*resource=*/opentelemetry::sdk::resource::Resource::Create({}), /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr, /*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false, /*test_no_meter_provider=*/false,
/*labels_to_inject=*/{},
/*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(), /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
/*target_attribute_filter=*/ /*target_attribute_filter=*/
absl::AnyInvocable<bool(absl::string_view) const>(), absl::AnyInvocable<bool(absl::string_view) const>(),
@ -685,6 +693,22 @@ class CustomLabelInjector : public grpc::internal::LabelsInjector {
grpc::internal::LabelsIterable* /*labels_from_incoming_metadata*/) grpc::internal::LabelsIterable* /*labels_from_incoming_metadata*/)
const override {} const override {}
bool AddOptionalLabels(
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
/*optional_labels_span*/,
opentelemetry::nostd::function_ref<
bool(opentelemetry::nostd::string_view,
opentelemetry::common::AttributeValue)>
/*callback*/) const override {
return true;
}
size_t GetOptionalLabelsSize(
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
/*optional_labels_span*/) const override {
return 0;
}
private: private:
std::pair<std::string, std::string> label_; std::pair<std::string, std::string> label_;
}; };
@ -731,6 +755,7 @@ TEST_F(OpenTelemetryPluginOptionEnd2EndTest, Basic) {
/*resource=*/opentelemetry::sdk::resource::Resource::Create({}), /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr, /*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false, /*test_no_meter_provider=*/false,
/*labels_to_inject=*/{},
/*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(), /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
/*target_attribute_filter=*/ /*target_attribute_filter=*/
absl::AnyInvocable<bool(absl::string_view) const>(), absl::AnyInvocable<bool(absl::string_view) const>(),
@ -772,6 +797,7 @@ TEST_F(OpenTelemetryPluginOptionEnd2EndTest, ClientOnlyPluginOption) {
/*resource=*/opentelemetry::sdk::resource::Resource::Create({}), /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr, /*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false, /*test_no_meter_provider=*/false,
/*labels_to_inject=*/{},
/*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(), /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
/*target_attribute_filter=*/ /*target_attribute_filter=*/
absl::AnyInvocable<bool(absl::string_view) const>(), absl::AnyInvocable<bool(absl::string_view) const>(),
@ -814,6 +840,7 @@ TEST_F(OpenTelemetryPluginOptionEnd2EndTest, ServerOnlyPluginOption) {
/*resource=*/opentelemetry::sdk::resource::Resource::Create({}), /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr, /*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false, /*test_no_meter_provider=*/false,
/*labels_to_inject=*/{},
/*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(), /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
/*target_attribute_filter=*/ /*target_attribute_filter=*/
absl::AnyInvocable<bool(absl::string_view) const>(), absl::AnyInvocable<bool(absl::string_view) const>(),
@ -870,6 +897,7 @@ TEST_F(OpenTelemetryPluginOptionEnd2EndTest,
/*resource=*/opentelemetry::sdk::resource::Resource::Create({}), /*resource=*/opentelemetry::sdk::resource::Resource::Create({}),
/*labels_injector=*/nullptr, /*labels_injector=*/nullptr,
/*test_no_meter_provider=*/false, /*test_no_meter_provider=*/false,
/*labels_to_inject=*/{},
/*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(), /*target_selector=*/absl::AnyInvocable<bool(absl::string_view) const>(),
/*target_attribute_filter=*/ /*target_attribute_filter=*/
absl::AnyInvocable<bool(absl::string_view) const>(), absl::AnyInvocable<bool(absl::string_view) const>(),

@ -29,6 +29,7 @@
#include <grpcpp/grpcpp.h> #include <grpcpp/grpcpp.h>
#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/notification.h" #include "src/core/lib/gprpp/notification.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
@ -38,11 +39,55 @@
namespace grpc { namespace grpc {
namespace testing { namespace testing {
#define GRPC_ARG_LABELS_TO_INJECT "grpc.testing.labels_to_inject"
// A subchannel filter that adds the service labels for test to the
// CallAttemptTracer in a call.
class AddServiceLabelsFilter : public grpc_core::ChannelFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<AddServiceLabelsFilter> Create(
const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) {
return AddServiceLabelsFilter(
args.GetPointer<const std::map<std::string, std::string>>(
GRPC_ARG_LABELS_TO_INJECT));
}
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise(
grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) override {
using CallAttemptTracer = grpc_core::ClientCallTracer::CallAttemptTracer;
auto* call_context = grpc_core::GetContext<grpc_call_context_element>();
auto* call_tracer = static_cast<CallAttemptTracer*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
EXPECT_NE(call_tracer, nullptr);
call_tracer->AddOptionalLabels(
CallAttemptTracer::OptionalLabelComponent::kXdsServiceLabels,
std::make_shared<std::map<std::string, std::string>>(
*labels_to_inject_));
return next_promise_factory(std::move(call_args));
}
private:
explicit AddServiceLabelsFilter(
const std::map<std::string, std::string>* labels_to_inject)
: labels_to_inject_(labels_to_inject) {}
const std::map<std::string, std::string>* labels_to_inject_;
};
const grpc_channel_filter AddServiceLabelsFilter::kFilter =
grpc_core::MakePromiseBasedFilter<AddServiceLabelsFilter,
grpc_core::FilterEndpoint::kClient>(
"add_service_labels_filter");
void OpenTelemetryPluginEnd2EndTest::Init( void OpenTelemetryPluginEnd2EndTest::Init(
const absl::flat_hash_set<absl::string_view>& metric_names, const absl::flat_hash_set<absl::string_view>& metric_names,
opentelemetry::sdk::resource::Resource resource, opentelemetry::sdk::resource::Resource resource,
std::unique_ptr<grpc::internal::LabelsInjector> labels_injector, std::unique_ptr<grpc::internal::LabelsInjector> labels_injector,
bool test_no_meter_provider, bool test_no_meter_provider,
const std::map<std::string, std::string>& labels_to_inject,
absl::AnyInvocable<bool(absl::string_view /*target*/) const> absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_selector, target_selector,
absl::AnyInvocable<bool(absl::string_view /*target*/) const> absl::AnyInvocable<bool(absl::string_view /*target*/) const>
@ -83,6 +128,16 @@ void OpenTelemetryPluginEnd2EndTest::Init(
ot_builder.AddPluginOption(std::move(option)); ot_builder.AddPluginOption(std::move(option));
} }
ot_builder.BuildAndRegisterGlobal(); ot_builder.BuildAndRegisterGlobal();
ChannelArguments channel_args;
if (!labels_to_inject.empty()) {
labels_to_inject_ = labels_to_inject;
grpc_core::CoreConfiguration::RegisterBuilder(
[](grpc_core::CoreConfiguration::Builder* builder) mutable {
builder->channel_init()->RegisterFilter(
GRPC_CLIENT_SUBCHANNEL, &AddServiceLabelsFilter::kFilter);
});
channel_args.SetPointer(GRPC_ARG_LABELS_TO_INJECT, &labels_to_inject_);
}
grpc_init(); grpc_init();
grpc::ServerBuilder builder; grpc::ServerBuilder builder;
int port; int port;
@ -96,8 +151,8 @@ void OpenTelemetryPluginEnd2EndTest::Init(
server_address_ = absl::StrCat("localhost:", port); server_address_ = absl::StrCat("localhost:", port);
canonical_server_address_ = absl::StrCat("dns:///", server_address_); canonical_server_address_ = absl::StrCat("dns:///", server_address_);
auto channel = auto channel = grpc::CreateCustomChannel(
grpc::CreateChannel(server_address_, grpc::InsecureChannelCredentials()); server_address_, grpc::InsecureChannelCredentials(), channel_args);
stub_ = EchoTestService::NewStub(channel); stub_ = EchoTestService::NewStub(channel);
generic_stub_ = std::make_unique<GenericStub>(std::move(channel)); generic_stub_ = std::make_unique<GenericStub>(std::move(channel));
} }

@ -63,6 +63,7 @@ class OpenTelemetryPluginEnd2EndTest : public ::testing::Test {
opentelemetry::sdk::resource::Resource::Create({}), opentelemetry::sdk::resource::Resource::Create({}),
std::unique_ptr<grpc::internal::LabelsInjector> labels_injector = nullptr, std::unique_ptr<grpc::internal::LabelsInjector> labels_injector = nullptr,
bool test_no_meter_provider = false, bool test_no_meter_provider = false,
const std::map<std::string, std::string>& labels_to_inject = {},
absl::AnyInvocable<bool(absl::string_view /*target*/) const> absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_selector = absl::AnyInvocable<bool(absl::string_view) const>(), target_selector = absl::AnyInvocable<bool(absl::string_view) const>(),
absl::AnyInvocable<bool(absl::string_view /*target*/) const> absl::AnyInvocable<bool(absl::string_view /*target*/) const>
@ -94,6 +95,7 @@ class OpenTelemetryPluginEnd2EndTest : public ::testing::Test {
const absl::string_view kMethodName = "grpc.testing.EchoTestService/Echo"; const absl::string_view kMethodName = "grpc.testing.EchoTestService/Echo";
const absl::string_view kGenericMethodName = "foo/bar"; const absl::string_view kGenericMethodName = "foo/bar";
std::map<std::string, std::string> labels_to_inject_;
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader_; std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader_;
std::string server_address_; std::string server_address_;
std::string canonical_server_address_; std::string canonical_server_address_;

Loading…
Cancel
Save