From 70839a9b190c3105d360c8f7f9dc505dc91ca948 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 8 Apr 2024 15:50:04 -0700 Subject: [PATCH] [OTel C++] Add experimental optional locality label available to client per-attempt metrics (#36254) As per https://github.com/grpc/proposal/pull/419, the experimental optional label `grpc.lb.locality` is added to the follow per-call metrics - * grpc.client.attempt.duration * grpc.client.attempt.sent_total_compressed_message_size * grpc.client.attempt.rcvd_total_compressed_message_size Closes #36254 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36254 from yashykt:OTelOptionalLabelsOnPerCall c5390c99a11c854bb15bb86434d38ec7329719e8 PiperOrigin-RevId: 622973959 --- BUILD | 3 + src/core/BUILD | 3 + src/core/ext/xds/xds_client.cc | 2 +- src/core/ext/xds/xds_client_stats.cc | 4 +- src/core/ext/xds/xds_client_stats.h | 18 +- src/core/ext/xds/xds_cluster.cc | 18 +- src/core/ext/xds/xds_cluster.h | 7 +- src/core/ext/xds/xds_endpoint.cc | 5 +- src/core/lib/channel/call_tracer.cc | 7 +- src/core/lib/channel/call_tracer.h | 21 +- src/core/load_balancing/xds/cds.cc | 8 +- .../load_balancing/xds/xds_cluster_impl.cc | 57 +++--- .../load_balancing/xds/xds_wrr_locality.cc | 11 +- .../resolver/xds/xds_dependency_manager.cc | 7 +- src/cpp/ext/csm/metadata_exchange.cc | 48 ++--- src/cpp/ext/csm/metadata_exchange.h | 9 +- .../filters/census/open_census_call_tracer.h | 5 +- src/cpp/ext/otel/key_value_iterable.h | 28 ++- src/cpp/ext/otel/otel_client_call_tracer.cc | 18 +- src/cpp/ext/otel/otel_client_call_tracer.h | 13 +- src/cpp/ext/otel/otel_plugin.cc | 46 ++++- src/cpp/ext/otel/otel_plugin.h | 26 ++- src/cpp/ext/otel/otel_server_call_tracer.cc | 2 +- .../grpc_observability/client_call_tracer.h | 7 +- test/core/end2end/tests/http2_stats.cc | 6 +- test/core/util/fake_stats_plugin.h | 16 +- .../xds/xds_cluster_resource_type_test.cc | 25 ++- .../end2end/xds/xds_cluster_end2end_test.cc | 47 ++--- test/cpp/ext/csm/metadata_exchange_test.cc | 16 +- test/cpp/ext/otel/otel_plugin_test.cc | 182 +++++++++++++++++- test/cpp/ext/otel/otel_test_library.cc | 41 ++-- test/cpp/ext/otel/otel_test_library.h | 14 +- 32 files changed, 480 insertions(+), 240 deletions(-) diff --git a/BUILD b/BUILD index cc6170ecf57..417d9f21a94 100644 --- a/BUILD +++ b/BUILD @@ -1708,6 +1708,7 @@ grpc_cc_library( external_deps = [ "absl/status", "absl/strings", + "absl/types:optional", ], language = "c++", visibility = ["@grpc:alt_grpc_base_legacy"], @@ -1721,6 +1722,7 @@ grpc_cc_library( "//src/core:context", "//src/core:error", "//src/core:metadata_batch", + "//src/core:ref_counted_string", "//src/core:slice_buffer", ], ) @@ -4269,6 +4271,7 @@ grpc_cc_library( visibility = ["@grpc:xds_client_core"], deps = [ "backoff", + "call_tracer", "debug_location", "endpoint_addresses", "envoy_admin_upb", diff --git a/src/core/BUILD b/src/core/BUILD index e9c3409ee46..7ab4b2b3f2c 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -5348,10 +5348,12 @@ grpc_cc_library( "match", "pollset_set", "ref_counted", + "ref_counted_string", "resolved_address", "subchannel_interface", "validation_errors", "xds_dependency_manager", + "//:call_tracer", "//:config", "//:debug_location", "//:endpoint_addresses", @@ -5433,6 +5435,7 @@ grpc_cc_library( "lb_policy_factory", "lb_policy_registry", "pollset_set", + "ref_counted_string", "validation_errors", "//:config", "//:debug_location", diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 40301a60d34..e239c8cc307 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -2026,7 +2026,7 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked( "[xds_client %p] cluster=%s eds_service_name=%s " "locality=%s locality_stats=%p", this, cluster_key.first.c_str(), cluster_key.second.c_str(), - locality_name->AsHumanReadableString().c_str(), + locality_name->human_readable_string().c_str(), locality_state.locality_stats); } } diff --git a/src/core/ext/xds/xds_client_stats.cc b/src/core/ext/xds/xds_client_stats.cc index bb29f906f0b..3d4730a4965 100644 --- a/src/core/ext/xds/xds_client_stats.cc +++ b/src/core/ext/xds/xds_client_stats.cc @@ -111,7 +111,7 @@ XdsClusterLocalityStats::XdsClusterLocalityStats( xds_client_.get(), this, std::string(lrs_server_).c_str(), std::string(cluster_name_).c_str(), std::string(eds_service_name_).c_str(), - name_->AsHumanReadableString().c_str()); + name_->human_readable_string().c_str()); } } @@ -122,7 +122,7 @@ XdsClusterLocalityStats::~XdsClusterLocalityStats() { xds_client_.get(), this, std::string(lrs_server_).c_str(), std::string(cluster_name_).c_str(), std::string(eds_service_name_).c_str(), - name_->AsHumanReadableString().c_str()); + name_->human_readable_string().c_str()); } xds_client_->RemoveClusterLocalityStats(lrs_server_, cluster_name_, eds_service_name_, name_, this); diff --git a/src/core/ext/xds/xds_client_stats.h b/src/core/ext/xds/xds_client_stats.h index 8f15e25d928..6cffcb76ba5 100644 --- a/src/core/ext/xds/xds_client_stats.h +++ b/src/core/ext/xds/xds_client_stats.h @@ -32,6 +32,7 @@ #include "absl/strings/string_view.h" #include "src/core/ext/xds/xds_bootstrap.h" +#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/per_cpu.h" #include "src/core/lib/gprpp/ref_counted.h" @@ -64,10 +65,9 @@ class XdsLocalityName final : public RefCounted { : region_(std::move(region)), zone_(std::move(zone)), sub_zone_(std::move(sub_zone)), - locality_labels_( - std::make_shared>()) { - locality_labels_->emplace("grpc.lb.locality", AsHumanReadableString()); - } + human_readable_string_( + absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}", + region_, zone_, sub_zone_)) {} bool operator==(const XdsLocalityName& other) const { return region_ == other.region_ && zone_ == other.zone_ && @@ -89,13 +89,9 @@ class XdsLocalityName final : public RefCounted { const std::string& region() const { return region_; } const std::string& zone() const { return zone_; } const std::string& sub_zone() const { return sub_zone_; } - std::shared_ptr> locality_labels() const { - return locality_labels_; - } - std::string AsHumanReadableString() const { - return absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}", - region_, zone_, sub_zone_); + const RefCountedStringValue& human_readable_string() const { + return human_readable_string_; } // Channel args traits. @@ -111,7 +107,7 @@ class XdsLocalityName final : public RefCounted { std::string region_; std::string zone_; std::string sub_zone_; - std::shared_ptr> locality_labels_; + RefCountedStringValue human_readable_string_; }; // Drop stats for an xds cluster. diff --git a/src/core/ext/xds/xds_cluster.cc b/src/core/ext/xds/xds_cluster.cc index a12b20d8252..572ce831c53 100644 --- a/src/core/ext/xds/xds_cluster.cc +++ b/src/core/ext/xds/xds_cluster.cc @@ -714,8 +714,6 @@ absl::StatusOr> CdsResourceParse( StdStringToUpbString( absl::string_view("com.google.csm.telemetry_labels")), &telemetry_labels_struct)) { - auto telemetry_labels = - std::make_shared>(); size_t iter = kUpb_Map_Begin; const google_protobuf_Struct_FieldsEntry* fields_entry; while ((fields_entry = google_protobuf_Struct_fields_next( @@ -724,15 +722,17 @@ absl::StatusOr> CdsResourceParse( const google_protobuf_Value* value = google_protobuf_Struct_FieldsEntry_value(fields_entry); if (google_protobuf_Value_has_string_value(value)) { - telemetry_labels->emplace( - UpbStringToStdString( - google_protobuf_Struct_FieldsEntry_key(fields_entry)), - UpbStringToStdString(google_protobuf_Value_string_value(value))); + if (UpbStringToAbsl(google_protobuf_Struct_FieldsEntry_key( + fields_entry)) == "service_name") { + cds_update->service_telemetry_label = RefCountedStringValue( + UpbStringToAbsl(google_protobuf_Value_string_value(value))); + } else if (UpbStringToAbsl(google_protobuf_Struct_FieldsEntry_key( + fields_entry)) == "service_namespace") { + cds_update->namespace_telemetry_label = RefCountedStringValue( + UpbStringToAbsl(google_protobuf_Value_string_value(value))); + } } } - if (!telemetry_labels->empty()) { - cds_update->telemetry_labels = std::move(telemetry_labels); - } } } // Return result. diff --git a/src/core/ext/xds/xds_cluster.h b/src/core/ext/xds/xds_cluster.h index a8bf915a22c..445c443a8e5 100644 --- a/src/core/ext/xds/xds_cluster.h +++ b/src/core/ext/xds/xds_cluster.h @@ -101,7 +101,8 @@ struct XdsClusterResource : public XdsResourceType::ResourceData { XdsHealthStatusSet override_host_statuses; - std::shared_ptr> telemetry_labels; + RefCountedStringValue service_telemetry_label; + RefCountedStringValue namespace_telemetry_label; bool operator==(const XdsClusterResource& other) const { return type == other.type && lb_policy_config == other.lb_policy_config && @@ -110,7 +111,9 @@ struct XdsClusterResource : public XdsResourceType::ResourceData { connection_idle_timeout == other.connection_idle_timeout && max_concurrent_requests == other.max_concurrent_requests && outlier_detection == other.outlier_detection && - override_host_statuses == other.override_host_statuses; + override_host_statuses == other.override_host_statuses && + service_telemetry_label == other.service_telemetry_label && + namespace_telemetry_label == other.namespace_telemetry_label; } std::string ToString() const; diff --git a/src/core/ext/xds/xds_endpoint.cc b/src/core/ext/xds/xds_endpoint.cc index 761d54969e1..18236f51720 100644 --- a/src/core/ext/xds/xds_endpoint.cc +++ b/src/core/ext/xds/xds_endpoint.cc @@ -81,7 +81,7 @@ std::string XdsEndpointResource::Priority::Locality::ToString() const { for (const EndpointAddresses& endpoint : endpoints) { endpoint_strings.emplace_back(endpoint.ToString()); } - return absl::StrCat("{name=", name->AsHumanReadableString(), + return absl::StrCat("{name=", name->human_readable_string().as_string_view(), ", lb_weight=", lb_weight, ", endpoints=[", absl::StrJoin(endpoint_strings, ", "), "]}"); } @@ -420,7 +420,8 @@ absl::StatusOr> EdsResourceParse( if (it != locality_map.end()) { errors.AddError(absl::StrCat( "duplicate locality ", - parsed_locality->locality.name->AsHumanReadableString(), + parsed_locality->locality.name->human_readable_string() + .as_string_view(), " found in priority ", parsed_locality->priority)); } else { locality_map.emplace(parsed_locality->locality.name.get(), diff --git a/src/core/lib/channel/call_tracer.cc b/src/core/lib/channel/call_tracer.cc index 83a8ec8b22c..47782406b1f 100644 --- a/src/core/lib/channel/call_tracer.cc +++ b/src/core/lib/channel/call_tracer.cc @@ -151,11 +151,10 @@ class DelegatingClientCallTracer : public ClientCallTracer { std::shared_ptr StartNewTcpTrace() override { return nullptr; } - void AddOptionalLabels( - OptionalLabelComponent component, - std::shared_ptr> labels) override { + void SetOptionalLabel(OptionalLabelKey key, + RefCountedStringValue value) override { for (auto* tracer : tracers_) { - tracer->AddOptionalLabels(component, labels); + tracer->SetOptionalLabel(key, value); } } std::string TraceId() override { return tracers_[0]->TraceId(); } diff --git a/src/core/lib/channel/call_tracer.h b/src/core/lib/channel/call_tracer.h index e2be3d1a448..7c4d2447df4 100644 --- a/src/core/lib/channel/call_tracer.h +++ b/src/core/lib/channel/call_tracer.h @@ -26,12 +26,14 @@ #include "absl/status/status.h" #include "absl/strings/string_view.h" +#include "absl/types/optional.h" #include #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/channel/tcp_tracer.h" +#include "src/core/lib/gprpp/ref_counted_string.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice_buffer.h" @@ -126,13 +128,16 @@ class ClientCallTracer : public CallTracerAnnotationInterface { // as transparent retry attempts.) class CallAttemptTracer : public CallTracerInterface { public: - enum class OptionalLabelComponent : std::uint8_t { - kXdsServiceLabels, - kXdsLocalityLabels, - kSize, // keep last + // Note that not all of the optional label keys are exposed as public API. + enum class OptionalLabelKey : std::uint8_t { + kXdsServiceName, // not public + kXdsServiceNamespace, // not public + kLocality, + kSize // should be last }; ~CallAttemptTracer() override {} + // TODO(yashykt): The following two methods `RecordReceivedTrailingMetadata` // and `RecordEnd` should be moved into CallTracerInterface. // If the call was cancelled before the recv_trailing_metadata op @@ -145,10 +150,10 @@ class ClientCallTracer : public CallTracerAnnotationInterface { // library is free to destroy the object. virtual void RecordEnd(const gpr_timespec& latency) = 0; - // Adds optional labels to be reported by the underlying tracer in a call. - virtual void AddOptionalLabels( - OptionalLabelComponent component, - std::shared_ptr> labels) = 0; + // Sets an optional label on the per-attempt metrics recorded at the end of + // the attempt. + virtual void SetOptionalLabel(OptionalLabelKey key, + RefCountedStringValue value) = 0; }; ~ClientCallTracer() override {} diff --git a/src/core/load_balancing/xds/cds.cc b/src/core/load_balancing/xds/cds.cc index a46c1e8b488..4a7c07b41ed 100644 --- a/src/core/load_balancing/xds/cds.cc +++ b/src/core/load_balancing/xds/cds.cc @@ -37,9 +37,6 @@ #include #include -#include "src/core/load_balancing/address_filtering.h" -#include "src/core/load_balancing/outlier_detection/outlier_detection.h" -#include "src/core/load_balancing/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_cluster.h" #include "src/core/ext/xds/xds_common_types.h" #include "src/core/ext/xds/xds_health_status.h" @@ -59,10 +56,13 @@ #include "src/core/lib/json/json_args.h" #include "src/core/lib/json/json_object_loader.h" #include "src/core/lib/json/json_writer.h" +#include "src/core/load_balancing/address_filtering.h" #include "src/core/load_balancing/delegating_helper.h" #include "src/core/load_balancing/lb_policy.h" #include "src/core/load_balancing/lb_policy_factory.h" #include "src/core/load_balancing/lb_policy_registry.h" +#include "src/core/load_balancing/outlier_detection/outlier_detection.h" +#include "src/core/load_balancing/xds/xds_channel_args.h" #include "src/core/resolver/xds/xds_dependency_manager.h" namespace grpc_core { @@ -254,7 +254,7 @@ class PriorityEndpointIterator final : public EndpointAddressesIterator { const auto& locality = p.second; std::vector hierarchical_path = { RefCountedStringValue(priority_child_name), - RefCountedStringValue(locality_name->AsHumanReadableString())}; + locality_name->human_readable_string()}; auto hierarchical_path_attr = MakeRefCounted(std::move(hierarchical_path)); for (const auto& endpoint : locality.endpoints) { diff --git a/src/core/load_balancing/xds/xds_cluster_impl.cc b/src/core/load_balancing/xds/xds_cluster_impl.cc index 0f03857ce21..ef766148021 100644 --- a/src/core/load_balancing/xds/xds_cluster_impl.cc +++ b/src/core/load_balancing/xds/xds_cluster_impl.cc @@ -38,15 +38,13 @@ #include #include "src/core/client_channel/client_channel_internal.h" -#include "src/core/load_balancing/backend_metric_data.h" -#include "src/core/load_balancing/child_policy_handler.h" -#include "src/core/load_balancing/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_bootstrap_grpc.h" #include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_client_grpc.h" #include "src/core/ext/xds/xds_client_stats.h" #include "src/core/ext/xds/xds_endpoint.h" +#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" @@ -55,6 +53,7 @@ #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/ref_counted_string.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/iomgr/pollset_set.h" @@ -64,11 +63,14 @@ #include "src/core/lib/json/json_object_loader.h" #include "src/core/lib/security/credentials/xds/xds_credentials.h" #include "src/core/lib/transport/connectivity_state.h" +#include "src/core/load_balancing/backend_metric_data.h" +#include "src/core/load_balancing/child_policy_handler.h" #include "src/core/load_balancing/delegating_helper.h" #include "src/core/load_balancing/lb_policy.h" #include "src/core/load_balancing/lb_policy_factory.h" #include "src/core/load_balancing/lb_policy_registry.h" #include "src/core/load_balancing/subchannel_interface.h" +#include "src/core/load_balancing/xds/xds_channel_args.h" #include "src/core/resolver/endpoint_addresses.h" #include "src/core/resolver/xds/xds_dependency_manager.h" @@ -78,8 +80,6 @@ TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb"); namespace { -using OptionalLabelComponent = - ClientCallTracer::CallAttemptTracer::OptionalLabelComponent; using XdsConfig = XdsDependencyManager::XdsConfig; // @@ -193,11 +193,11 @@ class XdsClusterImplLb final : public LoadBalancingPolicy { class StatsSubchannelWrapper final : public DelegatingSubchannel { public: // If load reporting is enabled and we have an XdsClusterLocalityStats - // object, that object already contains the locality labels. We - // need to store the locality labels directly only in the case where + // object, that object already contains the locality label. We + // need to store the locality label directly only in the case where // load reporting is disabled. using LocalityData = absl::variant< - std::shared_ptr> /*locality_labels*/, + RefCountedStringValue /*locality*/, RefCountedPtr /*locality_stats*/>; StatsSubchannelWrapper( @@ -206,23 +206,19 @@ class XdsClusterImplLb final : public LoadBalancingPolicy { : DelegatingSubchannel(std::move(wrapped_subchannel)), locality_data_(std::move(locality_data)) {} - std::shared_ptr> locality_labels() - const { + RefCountedStringValue locality() const { return Match( locality_data_, - [](const std::shared_ptr>& - locality_labels) { - return locality_labels; - }, + [](RefCountedStringValue locality) { return locality; }, [](const RefCountedPtr& locality_stats) { - return locality_stats->locality_name()->locality_labels(); + return locality_stats->locality_name()->human_readable_string(); }); } XdsClusterLocalityStats* locality_stats() const { return Match( locality_data_, - [](const std::shared_ptr>&) { + [](const RefCountedStringValue&) { return static_cast(nullptr); }, [](const RefCountedPtr& locality_stats) { @@ -247,7 +243,8 @@ class XdsClusterImplLb final : public LoadBalancingPolicy { RefCountedPtr call_counter_; uint32_t max_concurrent_requests_; - std::shared_ptr> service_labels_; + RefCountedStringValue service_telemetry_label_; + RefCountedStringValue namespace_telemetry_label_; RefCountedPtr drop_config_; RefCountedPtr drop_stats_; RefCountedPtr picker_; @@ -391,7 +388,10 @@ XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb, : call_counter_(xds_cluster_impl_lb->call_counter_), max_concurrent_requests_( xds_cluster_impl_lb->cluster_resource_->max_concurrent_requests), - service_labels_(xds_cluster_impl_lb->cluster_resource_->telemetry_labels), + service_telemetry_label_( + xds_cluster_impl_lb->cluster_resource_->service_telemetry_label), + namespace_telemetry_label_( + xds_cluster_impl_lb->cluster_resource_->namespace_telemetry_label), drop_config_(xds_cluster_impl_lb->drop_config_), drop_stats_(xds_cluster_impl_lb->drop_stats_), picker_(std::move(picker)) { @@ -406,8 +406,13 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick( auto* call_state = static_cast(args.call_state); auto* call_attempt_tracer = call_state->GetCallAttemptTracer(); if (call_attempt_tracer != nullptr) { - call_attempt_tracer->AddOptionalLabels( - OptionalLabelComponent::kXdsServiceLabels, service_labels_); + call_attempt_tracer->SetOptionalLabel( + ClientCallTracer::CallAttemptTracer::OptionalLabelKey::kXdsServiceName, + service_telemetry_label_); + call_attempt_tracer->SetOptionalLabel( + ClientCallTracer::CallAttemptTracer::OptionalLabelKey:: + kXdsServiceNamespace, + namespace_telemetry_label_); } // Handle EDS drops. const std::string* drop_category; @@ -436,11 +441,11 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick( if (complete_pick != nullptr) { auto* subchannel_wrapper = static_cast(complete_pick->subchannel.get()); - // Add locality labels to per-call metrics if needed. + // Add locality label to per-call metrics if needed. if (call_attempt_tracer != nullptr) { - call_attempt_tracer->AddOptionalLabels( - OptionalLabelComponent::kXdsLocalityLabels, - subchannel_wrapper->locality_labels()); + call_attempt_tracer->SetOptionalLabel( + ClientCallTracer::CallAttemptTracer::OptionalLabelKey::kLocality, + subchannel_wrapper->locality()); } // Handle load reporting. RefCountedPtr locality_stats; @@ -779,7 +784,7 @@ RefCountedPtr XdsClusterImplLb::Helper::CreateSubchannel( const grpc_resolved_address& address, const ChannelArgs& per_address_args, const ChannelArgs& args) { if (parent()->shutting_down_) return nullptr; - // Wrap the subchannel so that we pass along the locality labels and + // Wrap the subchannel so that we pass along the locality label and // (if load reporting is enabled) the locality stats object, which // will be used by the picker. auto locality_name = per_address_args.GetObjectRef(); @@ -806,7 +811,7 @@ RefCountedPtr XdsClusterImplLb::Helper::CreateSubchannel( if (locality_stats != nullptr) { locality_data = std::move(locality_stats); } else { - locality_data = locality_name->locality_labels(); + locality_data = locality_name->human_readable_string(); } return MakeRefCounted( parent()->channel_control_helper()->CreateSubchannel( diff --git a/src/core/load_balancing/xds/xds_wrr_locality.cc b/src/core/load_balancing/xds/xds_wrr_locality.cc index de2264a0389..2a773941983 100644 --- a/src/core/load_balancing/xds/xds_wrr_locality.cc +++ b/src/core/load_balancing/xds/xds_wrr_locality.cc @@ -32,7 +32,6 @@ #include #include -#include "src/core/load_balancing/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_client_stats.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" @@ -40,6 +39,7 @@ #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/ref_counted_string.h" #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/json/json.h" @@ -50,6 +50,7 @@ #include "src/core/load_balancing/lb_policy.h" #include "src/core/load_balancing/lb_policy_factory.h" #include "src/core/load_balancing/lb_policy_registry.h" +#include "src/core/load_balancing/xds/xds_channel_args.h" #include "src/core/resolver/endpoint_addresses.h" namespace grpc_core { @@ -167,7 +168,7 @@ absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) { } auto config = args.config.TakeAsSubclass(); // Scan the addresses to find the weight for each locality. - std::map locality_weights; + std::map locality_weights; if (args.addresses.ok()) { (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) { auto* locality_name = endpoint.args().GetObject(); @@ -175,7 +176,7 @@ absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) { endpoint.args().GetInt(GRPC_ARG_XDS_LOCALITY_WEIGHT).value_or(0); if (locality_name != nullptr && weight > 0) { auto p = locality_weights.emplace( - locality_name->AsHumanReadableString(), weight); + locality_name->human_readable_string(), weight); if (!p.second && p.first->second != weight) { gpr_log(GPR_ERROR, "INTERNAL ERROR: xds_wrr_locality found different weights " @@ -188,10 +189,10 @@ absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) { // Construct the config for the weighted_target policy. Json::Object weighted_targets; for (const auto& p : locality_weights) { - const std::string& locality_name = p.first; + absl::string_view locality_name = p.first.as_string_view(); uint32_t weight = p.second; // Add weighted target entry. - weighted_targets[locality_name] = Json::FromObject({ + weighted_targets[std::string(locality_name)] = Json::FromObject({ {"weight", Json::FromNumber(weight)}, {"childPolicy", config->child_config()}, }); diff --git a/src/core/resolver/xds/xds_dependency_manager.cc b/src/core/resolver/xds/xds_dependency_manager.cc index 49a4ecb2a60..b9e37bca0bf 100644 --- a/src/core/resolver/xds/xds_dependency_manager.cc +++ b/src/core/resolver/xds/xds_dependency_manager.cc @@ -18,6 +18,8 @@ #include "src/core/resolver/xds/xds_dependency_manager.h" +#include + #include "absl/strings/str_join.h" #include "src/core/ext/xds/xds_routing.h" @@ -635,11 +637,12 @@ void XdsDependencyManager::OnEndpointUpdate( it->second.update.resolution_note = absl::StrCat("EDS resource ", name, " contains no localities"); } else { - std::set empty_localities; + std::set empty_localities; for (const auto& priority : endpoint->priorities) { for (const auto& p : priority.localities) { if (p.second.endpoints.empty()) { - empty_localities.insert(p.first->AsHumanReadableString()); + empty_localities.insert( + p.first->human_readable_string().as_string_view()); } } } diff --git a/src/cpp/ext/csm/metadata_exchange.cc b/src/cpp/ext/csm/metadata_exchange.cc index c4908efe85e..2338499e415 100644 --- a/src/cpp/ext/csm/metadata_exchange.cc +++ b/src/cpp/ext/csm/metadata_exchange.cc @@ -52,8 +52,8 @@ namespace grpc { namespace internal { -using OptionalLabelComponent = - grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelComponent; +using OptionalLabelKey = + grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey; namespace { @@ -428,8 +428,7 @@ void ServiceMeshLabelsInjector::AddLabels( bool ServiceMeshLabelsInjector::AddOptionalLabels( bool is_client, - absl::Span>> - optional_labels_span, + absl::Span optional_labels, opentelemetry::nostd::function_ref< bool(opentelemetry::nostd::string_view, opentelemetry::common::AttributeValue)> @@ -438,35 +437,26 @@ bool ServiceMeshLabelsInjector::AddOptionalLabels( // Currently the CSM optional labels are only set on client. return true; } - // According to the CSM Observability Metric spec, if the control plane fails - // to provide these labels, the client will set their values to "unknown". - // These default values are set below. - absl::string_view service_name = "unknown"; - absl::string_view service_namespace = "unknown"; // Performs JSON label name format to CSM Observability Metric spec format // conversion. - if (optional_labels_span.size() > - static_cast(OptionalLabelComponent::kXdsServiceLabels)) { - const auto& optional_labels = optional_labels_span[static_cast( - OptionalLabelComponent::kXdsServiceLabels)]; - if (optional_labels != nullptr) { - auto it = optional_labels->find("service_name"); - if (it != optional_labels->end()) service_name = it->second; - it = optional_labels->find("service_namespace"); - if (it != optional_labels->end()) service_namespace = it->second; - } - } + absl::string_view service_name = + optional_labels[static_cast( + grpc_core::ClientCallTracer::CallAttemptTracer:: + OptionalLabelKey::kXdsServiceName)] + .as_string_view(); + absl::string_view service_namespace = + optional_labels[static_cast( + grpc_core::ClientCallTracer::CallAttemptTracer:: + OptionalLabelKey::kXdsServiceNamespace)] + .as_string_view(); return callback("csm.service_name", - AbslStrViewToOpenTelemetryStrView(service_name)) && + service_name.empty() + ? "unknown" + : AbslStrViewToOpenTelemetryStrView(service_name)) && callback("csm.service_namespace_name", - AbslStrViewToOpenTelemetryStrView(service_namespace)); -} - -size_t ServiceMeshLabelsInjector::GetOptionalLabelsSize( - bool is_client, - absl::Span>>) - const { - return is_client ? 2 : 0; + service_namespace.empty() + ? "unknown" + : AbslStrViewToOpenTelemetryStrView(service_namespace)); } } // namespace internal diff --git a/src/cpp/ext/csm/metadata_exchange.h b/src/cpp/ext/csm/metadata_exchange.h index 560a07f1e97..eac78747f11 100644 --- a/src/cpp/ext/csm/metadata_exchange.h +++ b/src/cpp/ext/csm/metadata_exchange.h @@ -55,8 +55,7 @@ class ServiceMeshLabelsInjector : public LabelsInjector { // Add optional labels to the traced calls. bool AddOptionalLabels( bool is_client, - absl::Span>> - optional_labels_span, + absl::Span optional_labels, opentelemetry::nostd::function_ref< bool(opentelemetry::nostd::string_view, opentelemetry::common::AttributeValue)> @@ -65,8 +64,10 @@ class ServiceMeshLabelsInjector : public LabelsInjector { // Gets the size of the actual optional labels. size_t GetOptionalLabelsSize( bool is_client, - absl::Span>> - optional_labels_span) const override; + absl::Span /*optional_labels*/) + const override { + return is_client ? 2 : 0; + } const std::vector>& TestOnlyLocalLabels() const { diff --git a/src/cpp/ext/filters/census/open_census_call_tracer.h b/src/cpp/ext/filters/census/open_census_call_tracer.h index 235a19ff6a5..744e44cb2cf 100644 --- a/src/cpp/ext/filters/census/open_census_call_tracer.h +++ b/src/cpp/ext/filters/census/open_census_call_tracer.h @@ -103,9 +103,8 @@ class OpenCensusCallTracer : public grpc_core::ClientCallTracer { void RecordAnnotation(absl::string_view annotation) override; void RecordAnnotation(const Annotation& annotation) override; std::shared_ptr StartNewTcpTrace() override; - void AddOptionalLabels( - OptionalLabelComponent, - std::shared_ptr>) override {} + void SetOptionalLabel(OptionalLabelKey, + grpc_core::RefCountedStringValue) override {} experimental::CensusContext* context() { return &context_; } diff --git a/src/cpp/ext/otel/key_value_iterable.h b/src/cpp/ext/otel/key_value_iterable.h index 89c158f2016..d515c211483 100644 --- a/src/cpp/ext/otel/key_value_iterable.h +++ b/src/cpp/ext/otel/key_value_iterable.h @@ -56,14 +56,13 @@ class OpenTelemetryPlugin::KeyValueIterable additional_labels, const OpenTelemetryPlugin::ActivePluginOptionsView* active_plugin_options_view, - absl::Span>> - optional_labels_span, + absl::Span optional_labels, bool is_client, const OpenTelemetryPlugin* otel_plugin) : injected_labels_from_plugin_options_( injected_labels_from_plugin_options), additional_labels_(additional_labels), active_plugin_options_view_(active_plugin_options_view), - optional_labels_(optional_labels_span), + optional_labels_(optional_labels), is_client_(is_client), otel_plugin_(otel_plugin) {} @@ -100,6 +99,26 @@ class OpenTelemetryPlugin::KeyValueIterable return false; } } + // Add per-call optional labels + if (!optional_labels_.empty()) { + GPR_ASSERT( + optional_labels_.size() == + static_cast(grpc_core::ClientCallTracer::CallAttemptTracer:: + OptionalLabelKey::kSize)); + for (size_t i = 0; i < optional_labels_.size(); ++i) { + if (!otel_plugin_->per_call_optional_label_bits_.test(i)) { + continue; + } + if (!callback( + AbslStrViewToOpenTelemetryStrView(OptionalLabelKeyToString( + static_cast(i))), + AbslStrViewToOpenTelemetryStrView( + optional_labels_[i].as_string_view()))) { + return false; + } + } + } return true; } @@ -132,8 +151,7 @@ class OpenTelemetryPlugin::KeyValueIterable additional_labels_; const OpenTelemetryPlugin::ActivePluginOptionsView* active_plugin_options_view_; - absl::Span>> - optional_labels_; + absl::Span optional_labels_; bool is_client_; const OpenTelemetryPlugin* otel_plugin_; }; diff --git a/src/cpp/ext/otel/otel_client_call_tracer.cc b/src/cpp/ext/otel/otel_client_call_tracer.cc index 8dafe16db62..17ab0d816c8 100644 --- a/src/cpp/ext/otel/otel_client_call_tracer.cc +++ b/src/cpp/ext/otel/otel_client_call_tracer.cc @@ -82,8 +82,8 @@ OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::CallAttemptTracer( 1, KeyValueIterable( /*injected_labels_from_plugin_options=*/{}, additional_labels, /*active_plugin_options_view=*/nullptr, - /*optional_labels_span=*/{}, /*is_client=*/true, - parent_->otel_plugin_)); + /*optional_labels=*/{}, + /*is_client=*/true, parent_->otel_plugin_)); } } @@ -156,8 +156,8 @@ void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer:: static_cast(status.code()))}}}; KeyValueIterable labels( injected_labels_from_plugin_options_, additional_labels, - &parent_->scope_config_->active_plugin_options_view(), - optional_labels_array_, /*is_client=*/true, parent_->otel_plugin_); + &parent_->scope_config_->active_plugin_options_view(), optional_labels_, + /*is_client=*/true, parent_->otel_plugin_); if (parent_->otel_plugin_->client_.attempt.duration != nullptr) { parent_->otel_plugin_->client_.attempt.duration->Record( absl::ToDoubleSeconds(absl::Now() - start_time_), labels, @@ -209,12 +209,10 @@ OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::StartNewTcpTrace() { return nullptr; } -void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer:: - AddOptionalLabels( - OptionalLabelComponent component, - std::shared_ptr> optional_labels) { - optional_labels_array_[static_cast(component)] = - std::move(optional_labels); +void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::SetOptionalLabel( + OptionalLabelKey key, grpc_core::RefCountedStringValue value) { + GPR_ASSERT(key < OptionalLabelKey::kSize); + optional_labels_[static_cast(key)] = std::move(value); } // diff --git a/src/cpp/ext/otel/otel_client_call_tracer.h b/src/cpp/ext/otel/otel_client_call_tracer.h index e88aeac16a5..2d9c255c226 100644 --- a/src/cpp/ext/otel/otel_client_call_tracer.h +++ b/src/cpp/ext/otel/otel_client_call_tracer.h @@ -92,9 +92,8 @@ class OpenTelemetryPlugin::ClientCallTracer void RecordAnnotation(absl::string_view /*annotation*/) override; void RecordAnnotation(const Annotation& /*annotation*/) override; std::shared_ptr StartNewTcpTrace() override; - void AddOptionalLabels(OptionalLabelComponent component, - std::shared_ptr> - optional_labels) override; + void SetOptionalLabel(OptionalLabelKey key, + grpc_core::RefCountedStringValue value) override; private: const ClientCallTracer* parent_; @@ -102,10 +101,10 @@ class OpenTelemetryPlugin::ClientCallTracer // Start time (for measuring latency). absl::Time start_time_; std::unique_ptr injected_labels_; - // The indices of the array correspond to the OptionalLabelComponent enum. - std::array>, - static_cast(OptionalLabelComponent::kSize)> - optional_labels_array_; + // Avoid std::map to avoid per-call allocations. + std::array(OptionalLabelKey::kSize)> + optional_labels_; std::vector> injected_labels_from_plugin_options_; }; diff --git a/src/cpp/ext/otel/otel_plugin.cc b/src/cpp/ext/otel/otel_plugin.cc index 4b9e69892b3..aac4dd0222c 100644 --- a/src/cpp/ext/otel/otel_plugin.cc +++ b/src/cpp/ext/otel/otel_plugin.cc @@ -214,10 +214,7 @@ OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::AddPluginOption( OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::AddOptionalLabel( absl::string_view optional_label_key) { - if (optional_label_keys_ == nullptr) { - optional_label_keys_ = std::make_shared>(); - } - optional_label_keys_->emplace(optional_label_key); + optional_label_keys_.emplace(optional_label_key); return *this; } @@ -340,7 +337,7 @@ OpenTelemetryPlugin::OpenTelemetryPlugin( server_selector, std::vector> plugin_options, - std::shared_ptr> optional_label_keys, + const std::set& optional_label_keys, absl::AnyInvocable< bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const> channel_scope_filter) @@ -423,6 +420,17 @@ OpenTelemetryPlugin::OpenTelemetryPlugin( kServerCallRcvdTotalCompressedMessageSizeInstrumentName), "Compressed message bytes received per server call", "By"); } + // Store optional label keys for per call metrics + GPR_ASSERT(static_cast( + grpc_core::ClientCallTracer::CallAttemptTracer:: + OptionalLabelKey::kSize) <= kOptionalLabelsSizeLimit); + for (const auto& key : optional_label_keys) { + auto optional_key = OptionalLabelStringToKey(key); + if (optional_key.has_value()) { + per_call_optional_label_bits_.set( + static_cast(optional_key.value())); + } + } // Non-per-call metrics. grpc_core::GlobalInstrumentsRegistry::ForEach( [&, this](const grpc_core::GlobalInstrumentsRegistry:: @@ -527,14 +535,38 @@ OpenTelemetryPlugin::OpenTelemetryPlugin( descriptor.instrument_type)); } for (size_t i = 0; i < descriptor.optional_label_keys.size(); ++i) { - if (optional_label_keys->find(descriptor.optional_label_keys[i]) != - optional_label_keys->end()) { + if (optional_label_keys.find(descriptor.optional_label_keys[i]) != + optional_label_keys.end()) { instruments_data_[descriptor.index].optional_labels_bits.set(i); } } }); } +namespace { +constexpr absl::string_view kLocality = "grpc.lb.locality"; +} + +absl::string_view OpenTelemetryPlugin::OptionalLabelKeyToString( + grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey key) { + switch (key) { + case grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey:: + kLocality: + return kLocality; + default: + grpc_core::Crash("Illegal OptionalLabelKey index"); + } +} + +absl::optional +OpenTelemetryPlugin::OptionalLabelStringToKey(absl::string_view key) { + if (key == kLocality) { + return grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey:: + kLocality; + } + return absl::nullopt; +} + std::pair> OpenTelemetryPlugin::IsEnabledForChannel( const OpenTelemetryPluginBuilder::ChannelScope& scope) const { diff --git a/src/cpp/ext/otel/otel_plugin.h b/src/cpp/ext/otel/otel_plugin.h index 3468e167632..3d1fdd3440e 100644 --- a/src/cpp/ext/otel/otel_plugin.h +++ b/src/cpp/ext/otel/otel_plugin.h @@ -89,8 +89,7 @@ class LabelsInjector { // false when callback returns false. virtual bool AddOptionalLabels( bool is_client, - absl::Span>> - optional_labels_span, + absl::Span optional_labels, opentelemetry::nostd::function_ref< bool(opentelemetry::nostd::string_view, opentelemetry::common::AttributeValue)> @@ -100,8 +99,8 @@ class LabelsInjector { // produce through the AddOptionalLabels method. virtual size_t GetOptionalLabelsSize( bool is_client, - absl::Span>> - optional_labels_span) const = 0; + absl::Span optional_labels) + const = 0; }; class InternalOpenTelemetryPluginOption @@ -195,7 +194,7 @@ class OpenTelemetryPluginBuilderImpl { server_selector_; std::vector> plugin_options_; - std::shared_ptr> optional_label_keys_; + std::set optional_label_keys_; absl::AnyInvocable channel_scope_filter_; @@ -215,7 +214,7 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin { server_selector, std::vector> plugin_options, - std::shared_ptr> optional_label_keys, + const std::set& optional_label_keys, absl::AnyInvocable< bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const> channel_scope_filter); @@ -365,6 +364,16 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin { grpc_core::RegisteredMetricCallback* key_; }; + // Returns the string form of \a key + static absl::string_view OptionalLabelKeyToString( + grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey key); + + // Returns the OptionalLabelKey form of \a key if \a key is recognized and + // is public, absl::nullopt otherwise. + static absl::optional< + grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey> + OptionalLabelStringToKey(absl::string_view key); + // StatsPlugin: std::pair> IsEnabledForChannel( @@ -455,6 +464,9 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin { // Instruments for per-call metrics. ClientMetrics client_; ServerMetrics server_; + static constexpr int kOptionalLabelsSizeLimit = 64; + using OptionalLabelsBitSet = std::bitset; + OptionalLabelsBitSet per_call_optional_label_bits_; // Instruments for non-per-call metrics. struct Disabled {}; using Instrument = absl::variant< @@ -464,8 +476,6 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin { std::unique_ptr>, std::unique_ptr>, std::unique_ptr>>; - static constexpr int kOptionalLabelsSizeLimit = 64; - using OptionalLabelsBitSet = std::bitset; struct InstrumentData { Instrument instrument; OptionalLabelsBitSet optional_labels_bits; diff --git a/src/cpp/ext/otel/otel_server_call_tracer.cc b/src/cpp/ext/otel/otel_server_call_tracer.cc index 21d09153275..c7c04323117 100644 --- a/src/cpp/ext/otel/otel_server_call_tracer.cc +++ b/src/cpp/ext/otel/otel_server_call_tracer.cc @@ -113,7 +113,7 @@ void OpenTelemetryPlugin::ServerCallTracer::RecordEnd( // Currently we do not have any optional labels on the server side. KeyValueIterable labels( injected_labels_from_plugin_options_, additional_labels, - /*active_plugin_options_view=*/nullptr, /*optional_labels_span=*/{}, + /*active_plugin_options_view=*/nullptr, /*optional_labels=*/{}, /*is_client=*/false, otel_plugin_); if (otel_plugin_->server_.call.duration != nullptr) { otel_plugin_->server_.call.duration->Record( diff --git a/src/python/grpcio_observability/grpc_observability/client_call_tracer.h b/src/python/grpcio_observability/grpc_observability/client_call_tracer.h index d49ca42bd20..1aefd75d5f6 100644 --- a/src/python/grpcio_observability/grpc_observability/client_call_tracer.h +++ b/src/python/grpcio_observability/grpc_observability/client_call_tracer.h @@ -73,10 +73,9 @@ class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer { void RecordAnnotation(absl::string_view annotation) override; void RecordAnnotation(const Annotation& annotation) override; std::shared_ptr StartNewTcpTrace() override; - void AddOptionalLabels( - OptionalLabelComponent /*component*/, - std::shared_ptr> /*labels*/) - override {} + void SetOptionalLabel(OptionalLabelKey /*key*/, + grpc_core::RefCountedStringValue /*value*/) override { + } private: // Maximum size of trace context is sent on the wire. diff --git a/test/core/end2end/tests/http2_stats.cc b/test/core/end2end/tests/http2_stats.cc index ffb2efdd7b8..5e497f7b8f2 100644 --- a/test/core/end2end/tests/http2_stats.cc +++ b/test/core/end2end/tests/http2_stats.cc @@ -99,10 +99,8 @@ class FakeCallTracer : public ClientCallTracer { void RecordAnnotation(absl::string_view /*annotation*/) override {} void RecordAnnotation(const Annotation& /*annotation*/) override {} - void AddOptionalLabels( - OptionalLabelComponent /*component*/, - std::shared_ptr> /*labels*/) - override {} + void SetOptionalLabel(OptionalLabelKey /*key*/, + RefCountedStringValue /*value*/) override {} static grpc_transport_stream_stats transport_stream_stats() { MutexLock lock(g_mu); diff --git a/test/core/util/fake_stats_plugin.h b/test/core/util/fake_stats_plugin.h index 4c6710ccf88..16f7e996890 100644 --- a/test/core/util/fake_stats_plugin.h +++ b/test/core/util/fake_stats_plugin.h @@ -91,26 +91,22 @@ class FakeClientCallTracer : public ClientCallTracer { std::shared_ptr StartNewTcpTrace() override { return nullptr; } - void AddOptionalLabels( - OptionalLabelComponent component, - std::shared_ptr> labels) override { - optional_labels_.emplace(component, std::move(labels)); + void SetOptionalLabel(OptionalLabelKey key, + RefCountedStringValue value) override { + optional_labels_.emplace(key, std::move(value)); } std::string TraceId() override { return ""; } std::string SpanId() override { return ""; } bool IsSampled() override { return false; } - const std::map>>& - GetOptionalLabels() const { + const std::map& GetOptionalLabels() + const { return optional_labels_; } private: std::vector* annotation_logger_; - std::map>> - optional_labels_; + std::map optional_labels_; }; explicit FakeClientCallTracer(std::vector* annotation_logger) diff --git a/test/core/xds/xds_cluster_resource_type_test.cc b/test/core/xds/xds_cluster_resource_type_test.cc index 2cb04833eec..2537fc8af31 100644 --- a/test/core/xds/xds_cluster_resource_type_test.cc +++ b/test/core/xds/xds_cluster_resource_type_test.cc @@ -1635,10 +1635,8 @@ TEST_F(TelemetryLabelTest, ValidServiceLabelsConfig) { ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status(); auto& resource = static_cast(**decode_result.resource); - EXPECT_THAT(*resource.telemetry_labels, - ::testing::UnorderedElementsAre( - ::testing::Pair("service_name", "abc"), - ::testing::Pair("service_namespace", "xyz"))); + EXPECT_EQ(resource.service_telemetry_label.as_string_view(), "abc"); + EXPECT_EQ(resource.namespace_telemetry_label.as_string_view(), "xyz"); } TEST_F(TelemetryLabelTest, MissingMetadataField) { @@ -1653,7 +1651,10 @@ TEST_F(TelemetryLabelTest, MissingMetadataField) { ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status(); auto& resource = static_cast(**decode_result.resource); - EXPECT_EQ(resource.telemetry_labels, nullptr); + EXPECT_THAT(resource.service_telemetry_label.as_string_view(), + ::testing::IsEmpty()); + EXPECT_THAT(resource.namespace_telemetry_label.as_string_view(), + ::testing::IsEmpty()); } TEST_F(TelemetryLabelTest, MissingCsmFilterMetadataField) { @@ -1671,10 +1672,13 @@ TEST_F(TelemetryLabelTest, MissingCsmFilterMetadataField) { ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status(); auto& resource = static_cast(**decode_result.resource); - EXPECT_EQ(resource.telemetry_labels, nullptr); + EXPECT_THAT(resource.service_telemetry_label.as_string_view(), + ::testing::IsEmpty()); + EXPECT_THAT(resource.namespace_telemetry_label.as_string_view(), + ::testing::IsEmpty()); } -TEST_F(TelemetryLabelTest, IgnoreNonStringEntries) { +TEST_F(TelemetryLabelTest, IgnoreNonServiceLabelEntries) { Cluster cluster; cluster.set_type(cluster.EDS); cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self(); @@ -1684,6 +1688,7 @@ TEST_F(TelemetryLabelTest, IgnoreNonStringEntries) { label_map["bool_value"].set_bool_value(true); label_map["number_value"].set_number_value(3.14); *label_map["string_value"].mutable_string_value() = "abc"; + *label_map["service_name"].mutable_string_value() = "service"; label_map["null_value"].set_null_value(::google::protobuf::NULL_VALUE); auto& list_value_values = *label_map["list_value"].mutable_list_value()->mutable_values(); @@ -1700,9 +1705,9 @@ TEST_F(TelemetryLabelTest, IgnoreNonStringEntries) { ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status(); auto& resource = static_cast(**decode_result.resource); - EXPECT_THAT( - *resource.telemetry_labels, - ::testing::UnorderedElementsAre(::testing::Pair("string_value", "abc"))); + EXPECT_THAT(resource.service_telemetry_label.as_string_view(), "service"); + EXPECT_THAT(resource.namespace_telemetry_label.as_string_view(), + ::testing::IsEmpty()); } } // namespace diff --git a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc index 2772a2370cf..41780a96313 100644 --- a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc @@ -44,8 +44,8 @@ using ::envoy::config::core::v3::HealthStatus; using ::envoy::type::v3::FractionalPercent; using ClientStats = LrsServiceImpl::ClientStats; -using OptionalLabelComponent = - grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelComponent; +using OptionalLabelKey = + grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey; constexpr char kLbDropType[] = "lb"; constexpr char kThrottleDropType[] = "throttle"; @@ -336,38 +336,31 @@ TEST_P(CdsTest, MetricLabels) { ->GetLastCallAttemptTracer() ->GetOptionalLabels(), ::testing::ElementsAre( - ::testing::Pair( - OptionalLabelComponent::kXdsServiceLabels, - ::testing::Pointee(::testing::ElementsAre( - ::testing::Pair("service_name", "myservice"), - ::testing::Pair("service_namespace", "mynamespace")))), - ::testing::Pair( - OptionalLabelComponent::kXdsLocalityLabels, - ::testing::Pointee(::testing::ElementsAre(::testing::Pair( - "grpc.lb.locality", LocalityNameString("locality0"))))))); + ::testing::Pair(OptionalLabelKey::kXdsServiceName, "myservice"), + ::testing::Pair(OptionalLabelKey::kXdsServiceNamespace, + "mynamespace"), + ::testing::Pair(OptionalLabelKey::kLocality, + LocalityNameString("locality0")))); // Send an RPC to backend 1. WaitForBackend(DEBUG_LOCATION, 1); - // Verify that the optional labels are recorded in the call tracer. + // Verify that the optional labels are recorded in the call + // tracer. EXPECT_THAT( fake_client_call_tracer_factory.GetLastFakeClientCallTracer() ->GetLastCallAttemptTracer() ->GetOptionalLabels(), ::testing::ElementsAre( - ::testing::Pair( - OptionalLabelComponent::kXdsServiceLabels, - ::testing::Pointee(::testing::ElementsAre( - ::testing::Pair("service_name", "myservice"), - ::testing::Pair("service_namespace", "mynamespace")))), - ::testing::Pair( - OptionalLabelComponent::kXdsLocalityLabels, - ::testing::Pointee(::testing::ElementsAre(::testing::Pair( - "grpc.lb.locality", LocalityNameString("locality1"))))))); - // TODO(yashkt, yijiem): This shutdown shouldn't actually be necessary. - // The only reason it's here is to add a delay before - // fake_client_call_tracer_factory goes out of scope, since there may - // be lingering callbacks in the call stack that are using the - // CallAttemptTracer even after we get here, which would then cause a - // crash. Find a cleaner way to fix this. + ::testing::Pair(OptionalLabelKey::kXdsServiceName, "myservice"), + ::testing::Pair(OptionalLabelKey::kXdsServiceNamespace, + "mynamespace"), + ::testing::Pair(OptionalLabelKey::kLocality, + LocalityNameString("locality1")))); + // TODO(yashkt, yijiem): This shutdown shouldn't actually be necessary. The + // only reason it's here is to add a delay before + // fake_client_call_tracer_factory goes out of scope, since there may be + // lingering callbacks in the call stack that are using the CallAttemptTracer + // even after we get here, which would then cause a crash. Find a cleaner way + // to fix this. balancer_->Shutdown(); } diff --git a/test/cpp/ext/csm/metadata_exchange_test.cc b/test/cpp/ext/csm/metadata_exchange_test.cc index 18a743dcd61..4d6c1f271ec 100644 --- a/test/cpp/ext/csm/metadata_exchange_test.cc +++ b/test/cpp/ext/csm/metadata_exchange_test.cc @@ -43,6 +43,8 @@ namespace grpc { namespace testing { namespace { +using OptionalLabelKey = + grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey; using ::testing::ElementsAre; using ::testing::Pair; @@ -155,9 +157,12 @@ class MetadataExchangeTest : public OpenTelemetryPluginEnd2EndTest, public ::testing::WithParamInterface { protected: - void Init(const std::vector& metric_names, - bool enable_client_side_injector = true, - std::map labels_to_inject = {}) { + void Init( + const std::vector& metric_names, + bool enable_client_side_injector = true, + std::map + labels_to_inject = {}) { const char* kBootstrap = "{\"node\": {\"id\": " "\"projects/1234567890/networks/mesh:mesh-id/nodes/" @@ -406,7 +411,10 @@ TEST_P(MetadataExchangeTest, VerifyCsmServiceLabels) { kClientAttemptDurationInstrumentName}, /*enable_client_side_injector=*/true, // Injects CSM service labels to be recorded in the call. - {{"service_name", "myservice"}, {"service_namespace", "mynamespace"}}); + {{OptionalLabelKey::kXdsServiceName, + grpc_core::RefCountedStringValue("myservice")}, + {OptionalLabelKey::kXdsServiceNamespace, + grpc_core::RefCountedStringValue("mynamespace")}}); SendRPC(); const char* kMetricName = "grpc.client.attempt.duration"; auto data = ReadCurrentMetricsData( diff --git a/test/cpp/ext/otel/otel_plugin_test.cc b/test/cpp/ext/otel/otel_plugin_test.cc index 7c244f50713..0fd5e811309 100644 --- a/test/cpp/ext/otel/otel_plugin_test.cc +++ b/test/cpp/ext/otel/otel_plugin_test.cc @@ -844,6 +844,170 @@ TEST_F(OpenTelemetryPluginEnd2EndTest, EXPECT_EQ(*status_value, "UNIMPLEMENTED"); } +TEST_F(OpenTelemetryPluginEnd2EndTest, OptionalPerCallLocalityLabel) { + Init( + std::move(Options() + .set_metric_names({grpc::OpenTelemetryPluginBuilder:: + kClientAttemptStartedInstrumentName, + grpc::OpenTelemetryPluginBuilder:: + kClientAttemptDurationInstrumentName, + grpc::OpenTelemetryPluginBuilder:: + kServerCallStartedInstrumentName, + grpc::OpenTelemetryPluginBuilder:: + kServerCallDurationInstrumentName}) + .add_optional_label("grpc.lb.locality") + .set_labels_to_inject( + {{grpc_core::ClientCallTracer::CallAttemptTracer:: + OptionalLabelKey::kLocality, + grpc_core::RefCountedStringValue("locality")}}))); + SendRPC(); + auto data = ReadCurrentMetricsData( + [&](const absl::flat_hash_map< + std::string, + std::vector>& + data) { + return !data.contains("grpc.client.attempt.started") || + !data.contains("grpc.client.attempt.duration") || + !data.contains("grpc.server.call.started") || + !data.contains("grpc.server.call.duration"); + }); + // Verify client side metric (grpc.client.attempt.started) does not sees this + // label + ASSERT_EQ(data["grpc.client.attempt.started"].size(), 1); + const auto& client_attributes = + data["grpc.client.attempt.started"][0].attributes.GetAttributes(); + EXPECT_THAT( + client_attributes, + ::testing::Not(::testing::Contains(::testing::Key("grpc.lb.locality")))); + // Verify client side metric (grpc.client.attempt.duration) sees this label. + ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1); + const auto& client_duration_attributes = + data["grpc.client.attempt.duration"][0].attributes.GetAttributes(); + EXPECT_EQ( + absl::get(client_duration_attributes.at("grpc.lb.locality")), + "locality"); + // Verify server metric (grpc.server.call.started) does not see this label + ASSERT_EQ(data["grpc.server.call.started"].size(), 1); + const auto& server_attributes = + data["grpc.server.call.started"][0].attributes.GetAttributes(); + EXPECT_THAT( + server_attributes, + ::testing::Not(::testing::Contains(::testing::Key("grpc.lb.locality")))); + // Verify server metric (grpc.server.call.duration) does not see this label + ASSERT_EQ(data["grpc.server.call.duration"].size(), 1); + const auto& server_duration_attributes = + data["grpc.server.call.duration"][0].attributes.GetAttributes(); + EXPECT_THAT( + server_duration_attributes, + ::testing::Not(::testing::Contains(::testing::Key("grpc.lb.locality")))); +} + +// Tests that when locality label is enabled on the plugin but not provided by +// gRPC, an empty value is recorded. +TEST_F(OpenTelemetryPluginEnd2EndTest, + OptionalPerCallLocalityLabelWhenNotAvailable) { + Init(std::move( + Options() + .set_metric_names({grpc::OpenTelemetryPluginBuilder:: + kClientAttemptDurationInstrumentName}) + .add_optional_label("grpc.lb.locality"))); + SendRPC(); + auto data = ReadCurrentMetricsData( + [&](const absl::flat_hash_map< + std::string, + std::vector>& + data) { return !data.contains("grpc.client.attempt.duration"); }); + // Verify client side metric (grpc.client.attempt.duration) sees the empty + // label value. + ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1); + const auto& client_duration_attributes = + data["grpc.client.attempt.duration"][0].attributes.GetAttributes(); + EXPECT_EQ( + absl::get(client_duration_attributes.at("grpc.lb.locality")), + ""); +} + +// Tests that when locality label is injected but not enabled by the plugin, the +// label is not recorded. +TEST_F(OpenTelemetryPluginEnd2EndTest, + OptionalPerCallLocalityLabelNotRecordedWhenNotEnabled) { + Init(std::move( + Options() + .set_metric_names({grpc::OpenTelemetryPluginBuilder:: + kClientAttemptDurationInstrumentName}) + .set_labels_to_inject( + {{grpc_core::ClientCallTracer::CallAttemptTracer:: + OptionalLabelKey::kLocality, + grpc_core::RefCountedStringValue("locality")}}))); + SendRPC(); + auto data = ReadCurrentMetricsData( + [&](const absl::flat_hash_map< + std::string, + std::vector>& + data) { return !data.contains("grpc.client.attempt.duration"); }); + // Verify client side metric (grpc.client.attempt.duration) does not see the + // locality label. + ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1); + const auto& client_duration_attributes = + data["grpc.client.attempt.duration"][0].attributes.GetAttributes(); + EXPECT_THAT( + client_duration_attributes, + ::testing::Not(::testing::Contains(::testing::Key("grpc.lb.locality")))); +} + +TEST_F(OpenTelemetryPluginEnd2EndTest, + UnknownLabelDoesNotShowOnPerCallMetrics) { + Init( + std::move(Options() + .set_metric_names({grpc::OpenTelemetryPluginBuilder:: + kClientAttemptStartedInstrumentName, + grpc::OpenTelemetryPluginBuilder:: + kClientAttemptDurationInstrumentName, + grpc::OpenTelemetryPluginBuilder:: + kServerCallStartedInstrumentName, + grpc::OpenTelemetryPluginBuilder:: + kServerCallDurationInstrumentName}) + .add_optional_label("unknown"))); + SendRPC(); + auto data = ReadCurrentMetricsData( + [&](const absl::flat_hash_map< + std::string, + std::vector>& + data) { + return !data.contains("grpc.client.attempt.started") || + !data.contains("grpc.client.attempt.duration") || + !data.contains("grpc.server.call.started") || + !data.contains("grpc.server.call.duration"); + }); + // Verify client side metric (grpc.client.attempt.started) does not sees this + // label + ASSERT_EQ(data["grpc.client.attempt.started"].size(), 1); + const auto& client_attributes = + data["grpc.client.attempt.started"][0].attributes.GetAttributes(); + EXPECT_THAT(client_attributes, + ::testing::Not(::testing::Contains(::testing::Key("unknown")))); + // Verify client side metric (grpc.client.attempt.duration) does not see this + // label + ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1); + const auto& client_duration_attributes = + data["grpc.client.attempt.duration"][0].attributes.GetAttributes(); + EXPECT_THAT(client_duration_attributes, + ::testing::Not(::testing::Contains(::testing::Key("unknown")))); + // Verify server metric (grpc.server.call.started) does not see this label + ASSERT_EQ(data["grpc.server.call.started"].size(), 1); + const auto& server_attributes = + data["grpc.server.call.started"][0].attributes.GetAttributes(); + EXPECT_THAT( + server_attributes, + ::testing::Not(::testing::Contains(::testing::Key("grpc.lb.locality")))); + // Verify server metric (grpc.server.call.duration) does not see this label + ASSERT_EQ(data["grpc.server.call.duration"].size(), 1); + const auto& server_duration_attributes = + data["grpc.server.call.duration"][0].attributes.GetAttributes(); + EXPECT_THAT(server_duration_attributes, + ::testing::Not(::testing::Contains(::testing::Key("unknown")))); +} + using OpenTelemetryPluginOptionEnd2EndTest = OpenTelemetryPluginEnd2EndTest; class SimpleLabelIterable : public grpc::internal::LabelsIterable { @@ -886,20 +1050,18 @@ class CustomLabelInjector : public grpc::internal::LabelsInjector { grpc::internal::LabelsIterable* /*labels_from_incoming_metadata*/) const override {} - bool AddOptionalLabels( - bool /*is_client*/, - absl::Span>> - /*optional_labels_span*/, - opentelemetry::nostd::function_ref< - bool(opentelemetry::nostd::string_view, - opentelemetry::common::AttributeValue)> - /*callback*/) const override { + bool AddOptionalLabels(bool /*is_client*/, + absl::Span + /*optional_labels*/, + opentelemetry::nostd::function_ref< + bool(opentelemetry::nostd::string_view, + opentelemetry::common::AttributeValue)> + /*callback*/) const override { return true; } size_t GetOptionalLabelsSize( - bool /*is_client*/, - absl::Span>> + bool /*is_client*/, absl::Span /*optional_labels_span*/) const override { return 0; } diff --git a/test/cpp/ext/otel/otel_test_library.cc b/test/cpp/ext/otel/otel_test_library.cc index a921763ef66..0aac08a4da3 100644 --- a/test/cpp/ext/otel/otel_test_library.cc +++ b/test/cpp/ext/otel/otel_test_library.cc @@ -46,15 +46,16 @@ namespace testing { // A subchannel filter that adds the service labels for test to the // CallAttemptTracer in a call. -class AddServiceLabelsFilter : public grpc_core::ChannelFilter { +class AddLabelsFilter : public grpc_core::ChannelFilter { public: static const grpc_channel_filter kFilter; - static absl::StatusOr Create( + static absl::StatusOr Create( const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) { - return AddServiceLabelsFilter( - args.GetPointer>( - GRPC_ARG_LABELS_TO_INJECT)); + return AddLabelsFilter( + *args.GetPointer>(GRPC_ARG_LABELS_TO_INJECT)); } grpc_core::ArenaPromise MakeCallPromise( @@ -65,23 +66,27 @@ class AddServiceLabelsFilter : public grpc_core::ChannelFilter { auto* call_tracer = static_cast( call_context[GRPC_CONTEXT_CALL_TRACER].value); EXPECT_NE(call_tracer, nullptr); - call_tracer->AddOptionalLabels( - CallAttemptTracer::OptionalLabelComponent::kXdsServiceLabels, - std::make_shared>( - *labels_to_inject_)); + for (const auto& pair : labels_to_inject_) { + call_tracer->SetOptionalLabel(pair.first, pair.second); + } return next_promise_factory(std::move(call_args)); } private: - explicit AddServiceLabelsFilter( - const std::map* labels_to_inject) - : labels_to_inject_(labels_to_inject) {} + explicit AddLabelsFilter( + std::map + labels_to_inject) + : labels_to_inject_(std::move(labels_to_inject)) {} - const std::map* labels_to_inject_; + const std::map< + grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey, + grpc_core::RefCountedStringValue> + labels_to_inject_; }; -const grpc_channel_filter AddServiceLabelsFilter::kFilter = - grpc_core::MakePromiseBasedFilter( "add_service_labels_filter"); @@ -135,11 +140,11 @@ void OpenTelemetryPluginEnd2EndTest::Init(Options config) { grpc_core::CoreConfiguration::Reset(); ChannelArguments channel_args; if (!config.labels_to_inject.empty()) { - labels_to_inject_ = config.labels_to_inject; + labels_to_inject_ = std::move(config.labels_to_inject); grpc_core::CoreConfiguration::RegisterBuilder( [](grpc_core::CoreConfiguration::Builder* builder) mutable { - builder->channel_init()->RegisterFilter( - GRPC_CLIENT_SUBCHANNEL, &AddServiceLabelsFilter::kFilter); + builder->channel_init()->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, + &AddLabelsFilter::kFilter); }); channel_args.SetPointer(GRPC_ARG_LABELS_TO_INJECT, &labels_to_inject_); } diff --git a/test/cpp/ext/otel/otel_test_library.h b/test/cpp/ext/otel/otel_test_library.h index 66ef2d3577e..a11dc177934 100644 --- a/test/cpp/ext/otel/otel_test_library.h +++ b/test/cpp/ext/otel/otel_test_library.h @@ -78,7 +78,11 @@ class OpenTelemetryPluginEnd2EndTest : public ::testing::Test { return *this; } - Options& set_labels_to_inject(std::map labels) { + Options& set_labels_to_inject( + std::map< + grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey, + grpc_core::RefCountedStringValue> + labels) { labels_to_inject = std::move(labels); return *this; } @@ -132,7 +136,9 @@ class OpenTelemetryPluginEnd2EndTest : public ::testing::Test { opentelemetry::sdk::resource::Resource::Create({})); std::unique_ptr labels_injector; bool use_meter_provider = true; - std::map labels_to_inject; + std::map + labels_to_inject; absl::AnyInvocable channel_scope_filter; @@ -199,7 +205,9 @@ class OpenTelemetryPluginEnd2EndTest : public ::testing::Test { const absl::string_view kMethodName = "grpc.testing.EchoTestService/Echo"; const absl::string_view kGenericMethodName = "foo/bar"; - std::map labels_to_inject_; + std::map + labels_to_inject_; std::shared_ptr reader_; std::string server_address_; std::string canonical_server_address_;