[OTel C++] Add experimental optional locality label available to client per-attempt metrics (#36254)

As per https://github.com/grpc/proposal/pull/419, the experimental optional label `grpc.lb.locality` is added to the follow per-call metrics -
* grpc.client.attempt.duration
* grpc.client.attempt.sent_total_compressed_message_size
* grpc.client.attempt.rcvd_total_compressed_message_size

Closes #36254

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36254 from yashykt:OTelOptionalLabelsOnPerCall c5390c99a1
PiperOrigin-RevId: 622973959
pull/32500/merge
Yash Tibrewal 10 months ago committed by Copybara-Service
parent 7c57fb70e9
commit 70839a9b19
  1. 3
      BUILD
  2. 3
      src/core/BUILD
  3. 2
      src/core/ext/xds/xds_client.cc
  4. 4
      src/core/ext/xds/xds_client_stats.cc
  5. 18
      src/core/ext/xds/xds_client_stats.h
  6. 18
      src/core/ext/xds/xds_cluster.cc
  7. 7
      src/core/ext/xds/xds_cluster.h
  8. 5
      src/core/ext/xds/xds_endpoint.cc
  9. 7
      src/core/lib/channel/call_tracer.cc
  10. 21
      src/core/lib/channel/call_tracer.h
  11. 8
      src/core/load_balancing/xds/cds.cc
  12. 57
      src/core/load_balancing/xds/xds_cluster_impl.cc
  13. 11
      src/core/load_balancing/xds/xds_wrr_locality.cc
  14. 7
      src/core/resolver/xds/xds_dependency_manager.cc
  15. 48
      src/cpp/ext/csm/metadata_exchange.cc
  16. 9
      src/cpp/ext/csm/metadata_exchange.h
  17. 5
      src/cpp/ext/filters/census/open_census_call_tracer.h
  18. 28
      src/cpp/ext/otel/key_value_iterable.h
  19. 18
      src/cpp/ext/otel/otel_client_call_tracer.cc
  20. 13
      src/cpp/ext/otel/otel_client_call_tracer.h
  21. 46
      src/cpp/ext/otel/otel_plugin.cc
  22. 26
      src/cpp/ext/otel/otel_plugin.h
  23. 2
      src/cpp/ext/otel/otel_server_call_tracer.cc
  24. 7
      src/python/grpcio_observability/grpc_observability/client_call_tracer.h
  25. 6
      test/core/end2end/tests/http2_stats.cc
  26. 16
      test/core/util/fake_stats_plugin.h
  27. 25
      test/core/xds/xds_cluster_resource_type_test.cc
  28. 47
      test/cpp/end2end/xds/xds_cluster_end2end_test.cc
  29. 16
      test/cpp/ext/csm/metadata_exchange_test.cc
  30. 182
      test/cpp/ext/otel/otel_plugin_test.cc
  31. 41
      test/cpp/ext/otel/otel_test_library.cc
  32. 14
      test/cpp/ext/otel/otel_test_library.h

@ -1708,6 +1708,7 @@ grpc_cc_library(
external_deps = [
"absl/status",
"absl/strings",
"absl/types:optional",
],
language = "c++",
visibility = ["@grpc:alt_grpc_base_legacy"],
@ -1721,6 +1722,7 @@ grpc_cc_library(
"//src/core:context",
"//src/core:error",
"//src/core:metadata_batch",
"//src/core:ref_counted_string",
"//src/core:slice_buffer",
],
)
@ -4269,6 +4271,7 @@ grpc_cc_library(
visibility = ["@grpc:xds_client_core"],
deps = [
"backoff",
"call_tracer",
"debug_location",
"endpoint_addresses",
"envoy_admin_upb",

@ -5348,10 +5348,12 @@ grpc_cc_library(
"match",
"pollset_set",
"ref_counted",
"ref_counted_string",
"resolved_address",
"subchannel_interface",
"validation_errors",
"xds_dependency_manager",
"//:call_tracer",
"//:config",
"//:debug_location",
"//:endpoint_addresses",
@ -5433,6 +5435,7 @@ grpc_cc_library(
"lb_policy_factory",
"lb_policy_registry",
"pollset_set",
"ref_counted_string",
"validation_errors",
"//:config",
"//:debug_location",

@ -2026,7 +2026,7 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
"[xds_client %p] cluster=%s eds_service_name=%s "
"locality=%s locality_stats=%p",
this, cluster_key.first.c_str(), cluster_key.second.c_str(),
locality_name->AsHumanReadableString().c_str(),
locality_name->human_readable_string().c_str(),
locality_state.locality_stats);
}
}

@ -111,7 +111,7 @@ XdsClusterLocalityStats::XdsClusterLocalityStats(
xds_client_.get(), this, std::string(lrs_server_).c_str(),
std::string(cluster_name_).c_str(),
std::string(eds_service_name_).c_str(),
name_->AsHumanReadableString().c_str());
name_->human_readable_string().c_str());
}
}
@ -122,7 +122,7 @@ XdsClusterLocalityStats::~XdsClusterLocalityStats() {
xds_client_.get(), this, std::string(lrs_server_).c_str(),
std::string(cluster_name_).c_str(),
std::string(eds_service_name_).c_str(),
name_->AsHumanReadableString().c_str());
name_->human_readable_string().c_str());
}
xds_client_->RemoveClusterLocalityStats(lrs_server_, cluster_name_,
eds_service_name_, name_, this);

@ -32,6 +32,7 @@
#include "absl/strings/string_view.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/per_cpu.h"
#include "src/core/lib/gprpp/ref_counted.h"
@ -64,10 +65,9 @@ class XdsLocalityName final : public RefCounted<XdsLocalityName> {
: region_(std::move(region)),
zone_(std::move(zone)),
sub_zone_(std::move(sub_zone)),
locality_labels_(
std::make_shared<std::map<std::string, std::string>>()) {
locality_labels_->emplace("grpc.lb.locality", AsHumanReadableString());
}
human_readable_string_(
absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}",
region_, zone_, sub_zone_)) {}
bool operator==(const XdsLocalityName& other) const {
return region_ == other.region_ && zone_ == other.zone_ &&
@ -89,13 +89,9 @@ class XdsLocalityName final : public RefCounted<XdsLocalityName> {
const std::string& region() const { return region_; }
const std::string& zone() const { return zone_; }
const std::string& sub_zone() const { return sub_zone_; }
std::shared_ptr<std::map<std::string, std::string>> locality_labels() const {
return locality_labels_;
}
std::string AsHumanReadableString() const {
return absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}",
region_, zone_, sub_zone_);
const RefCountedStringValue& human_readable_string() const {
return human_readable_string_;
}
// Channel args traits.
@ -111,7 +107,7 @@ class XdsLocalityName final : public RefCounted<XdsLocalityName> {
std::string region_;
std::string zone_;
std::string sub_zone_;
std::shared_ptr<std::map<std::string, std::string>> locality_labels_;
RefCountedStringValue human_readable_string_;
};
// Drop stats for an xds cluster.

@ -714,8 +714,6 @@ absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse(
StdStringToUpbString(
absl::string_view("com.google.csm.telemetry_labels")),
&telemetry_labels_struct)) {
auto telemetry_labels =
std::make_shared<std::map<std::string, std::string>>();
size_t iter = kUpb_Map_Begin;
const google_protobuf_Struct_FieldsEntry* fields_entry;
while ((fields_entry = google_protobuf_Struct_fields_next(
@ -724,15 +722,17 @@ absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse(
const google_protobuf_Value* value =
google_protobuf_Struct_FieldsEntry_value(fields_entry);
if (google_protobuf_Value_has_string_value(value)) {
telemetry_labels->emplace(
UpbStringToStdString(
google_protobuf_Struct_FieldsEntry_key(fields_entry)),
UpbStringToStdString(google_protobuf_Value_string_value(value)));
if (UpbStringToAbsl(google_protobuf_Struct_FieldsEntry_key(
fields_entry)) == "service_name") {
cds_update->service_telemetry_label = RefCountedStringValue(
UpbStringToAbsl(google_protobuf_Value_string_value(value)));
} else if (UpbStringToAbsl(google_protobuf_Struct_FieldsEntry_key(
fields_entry)) == "service_namespace") {
cds_update->namespace_telemetry_label = RefCountedStringValue(
UpbStringToAbsl(google_protobuf_Value_string_value(value)));
}
}
}
if (!telemetry_labels->empty()) {
cds_update->telemetry_labels = std::move(telemetry_labels);
}
}
}
// Return result.

@ -101,7 +101,8 @@ struct XdsClusterResource : public XdsResourceType::ResourceData {
XdsHealthStatusSet override_host_statuses;
std::shared_ptr<std::map<std::string, std::string>> telemetry_labels;
RefCountedStringValue service_telemetry_label;
RefCountedStringValue namespace_telemetry_label;
bool operator==(const XdsClusterResource& other) const {
return type == other.type && lb_policy_config == other.lb_policy_config &&
@ -110,7 +111,9 @@ struct XdsClusterResource : public XdsResourceType::ResourceData {
connection_idle_timeout == other.connection_idle_timeout &&
max_concurrent_requests == other.max_concurrent_requests &&
outlier_detection == other.outlier_detection &&
override_host_statuses == other.override_host_statuses;
override_host_statuses == other.override_host_statuses &&
service_telemetry_label == other.service_telemetry_label &&
namespace_telemetry_label == other.namespace_telemetry_label;
}
std::string ToString() const;

@ -81,7 +81,7 @@ std::string XdsEndpointResource::Priority::Locality::ToString() const {
for (const EndpointAddresses& endpoint : endpoints) {
endpoint_strings.emplace_back(endpoint.ToString());
}
return absl::StrCat("{name=", name->AsHumanReadableString(),
return absl::StrCat("{name=", name->human_readable_string().as_string_view(),
", lb_weight=", lb_weight, ", endpoints=[",
absl::StrJoin(endpoint_strings, ", "), "]}");
}
@ -420,7 +420,8 @@ absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> EdsResourceParse(
if (it != locality_map.end()) {
errors.AddError(absl::StrCat(
"duplicate locality ",
parsed_locality->locality.name->AsHumanReadableString(),
parsed_locality->locality.name->human_readable_string()
.as_string_view(),
" found in priority ", parsed_locality->priority));
} else {
locality_map.emplace(parsed_locality->locality.name.get(),

@ -151,11 +151,10 @@ class DelegatingClientCallTracer : public ClientCallTracer {
std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
return nullptr;
}
void AddOptionalLabels(
OptionalLabelComponent component,
std::shared_ptr<std::map<std::string, std::string>> labels) override {
void SetOptionalLabel(OptionalLabelKey key,
RefCountedStringValue value) override {
for (auto* tracer : tracers_) {
tracer->AddOptionalLabels(component, labels);
tracer->SetOptionalLabel(key, value);
}
}
std::string TraceId() override { return tracers_[0]->TraceId(); }

@ -26,12 +26,14 @@
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/tcp_tracer.h"
#include "src/core/lib/gprpp/ref_counted_string.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
@ -126,13 +128,16 @@ class ClientCallTracer : public CallTracerAnnotationInterface {
// as transparent retry attempts.)
class CallAttemptTracer : public CallTracerInterface {
public:
enum class OptionalLabelComponent : std::uint8_t {
kXdsServiceLabels,
kXdsLocalityLabels,
kSize, // keep last
// Note that not all of the optional label keys are exposed as public API.
enum class OptionalLabelKey : std::uint8_t {
kXdsServiceName, // not public
kXdsServiceNamespace, // not public
kLocality,
kSize // should be last
};
~CallAttemptTracer() override {}
// TODO(yashykt): The following two methods `RecordReceivedTrailingMetadata`
// and `RecordEnd` should be moved into CallTracerInterface.
// If the call was cancelled before the recv_trailing_metadata op
@ -145,10 +150,10 @@ class ClientCallTracer : public CallTracerAnnotationInterface {
// library is free to destroy the object.
virtual void RecordEnd(const gpr_timespec& latency) = 0;
// Adds optional labels to be reported by the underlying tracer in a call.
virtual void AddOptionalLabels(
OptionalLabelComponent component,
std::shared_ptr<std::map<std::string, std::string>> labels) = 0;
// Sets an optional label on the per-attempt metrics recorded at the end of
// the attempt.
virtual void SetOptionalLabel(OptionalLabelKey key,
RefCountedStringValue value) = 0;
};
~ClientCallTracer() override {}

@ -37,9 +37,6 @@
#include <grpc/support/json.h>
#include <grpc/support/log.h>
#include "src/core/load_balancing/address_filtering.h"
#include "src/core/load_balancing/outlier_detection/outlier_detection.h"
#include "src/core/load_balancing/xds/xds_channel_args.h"
#include "src/core/ext/xds/xds_cluster.h"
#include "src/core/ext/xds/xds_common_types.h"
#include "src/core/ext/xds/xds_health_status.h"
@ -59,10 +56,13 @@
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/json/json_writer.h"
#include "src/core/load_balancing/address_filtering.h"
#include "src/core/load_balancing/delegating_helper.h"
#include "src/core/load_balancing/lb_policy.h"
#include "src/core/load_balancing/lb_policy_factory.h"
#include "src/core/load_balancing/lb_policy_registry.h"
#include "src/core/load_balancing/outlier_detection/outlier_detection.h"
#include "src/core/load_balancing/xds/xds_channel_args.h"
#include "src/core/resolver/xds/xds_dependency_manager.h"
namespace grpc_core {
@ -254,7 +254,7 @@ class PriorityEndpointIterator final : public EndpointAddressesIterator {
const auto& locality = p.second;
std::vector<RefCountedStringValue> hierarchical_path = {
RefCountedStringValue(priority_child_name),
RefCountedStringValue(locality_name->AsHumanReadableString())};
locality_name->human_readable_string()};
auto hierarchical_path_attr =
MakeRefCounted<HierarchicalPathArg>(std::move(hierarchical_path));
for (const auto& endpoint : locality.endpoints) {

@ -38,15 +38,13 @@
#include <grpc/support/log.h>
#include "src/core/client_channel/client_channel_internal.h"
#include "src/core/load_balancing/backend_metric_data.h"
#include "src/core/load_balancing/child_policy_handler.h"
#include "src/core/load_balancing/xds/xds_channel_args.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_bootstrap_grpc.h"
#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_client_grpc.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/ext/xds/xds_endpoint.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
@ -55,6 +53,7 @@
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/ref_counted_string.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/iomgr/pollset_set.h"
@ -64,11 +63,14 @@
#include "src/core/lib/json/json_object_loader.h"
#include "src/core/lib/security/credentials/xds/xds_credentials.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/load_balancing/backend_metric_data.h"
#include "src/core/load_balancing/child_policy_handler.h"
#include "src/core/load_balancing/delegating_helper.h"
#include "src/core/load_balancing/lb_policy.h"
#include "src/core/load_balancing/lb_policy_factory.h"
#include "src/core/load_balancing/lb_policy_registry.h"
#include "src/core/load_balancing/subchannel_interface.h"
#include "src/core/load_balancing/xds/xds_channel_args.h"
#include "src/core/resolver/endpoint_addresses.h"
#include "src/core/resolver/xds/xds_dependency_manager.h"
@ -78,8 +80,6 @@ TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb");
namespace {
using OptionalLabelComponent =
ClientCallTracer::CallAttemptTracer::OptionalLabelComponent;
using XdsConfig = XdsDependencyManager::XdsConfig;
//
@ -193,11 +193,11 @@ class XdsClusterImplLb final : public LoadBalancingPolicy {
class StatsSubchannelWrapper final : public DelegatingSubchannel {
public:
// If load reporting is enabled and we have an XdsClusterLocalityStats
// object, that object already contains the locality labels. We
// need to store the locality labels directly only in the case where
// object, that object already contains the locality label. We
// need to store the locality label directly only in the case where
// load reporting is disabled.
using LocalityData = absl::variant<
std::shared_ptr<std::map<std::string, std::string>> /*locality_labels*/,
RefCountedStringValue /*locality*/,
RefCountedPtr<XdsClusterLocalityStats> /*locality_stats*/>;
StatsSubchannelWrapper(
@ -206,23 +206,19 @@ class XdsClusterImplLb final : public LoadBalancingPolicy {
: DelegatingSubchannel(std::move(wrapped_subchannel)),
locality_data_(std::move(locality_data)) {}
std::shared_ptr<std::map<std::string, std::string>> locality_labels()
const {
RefCountedStringValue locality() const {
return Match(
locality_data_,
[](const std::shared_ptr<std::map<std::string, std::string>>&
locality_labels) {
return locality_labels;
},
[](RefCountedStringValue locality) { return locality; },
[](const RefCountedPtr<XdsClusterLocalityStats>& locality_stats) {
return locality_stats->locality_name()->locality_labels();
return locality_stats->locality_name()->human_readable_string();
});
}
XdsClusterLocalityStats* locality_stats() const {
return Match(
locality_data_,
[](const std::shared_ptr<std::map<std::string, std::string>>&) {
[](const RefCountedStringValue&) {
return static_cast<XdsClusterLocalityStats*>(nullptr);
},
[](const RefCountedPtr<XdsClusterLocalityStats>& locality_stats) {
@ -247,7 +243,8 @@ class XdsClusterImplLb final : public LoadBalancingPolicy {
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
uint32_t max_concurrent_requests_;
std::shared_ptr<std::map<std::string, std::string>> service_labels_;
RefCountedStringValue service_telemetry_label_;
RefCountedStringValue namespace_telemetry_label_;
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
RefCountedPtr<XdsClusterDropStats> drop_stats_;
RefCountedPtr<SubchannelPicker> picker_;
@ -391,7 +388,10 @@ XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
: call_counter_(xds_cluster_impl_lb->call_counter_),
max_concurrent_requests_(
xds_cluster_impl_lb->cluster_resource_->max_concurrent_requests),
service_labels_(xds_cluster_impl_lb->cluster_resource_->telemetry_labels),
service_telemetry_label_(
xds_cluster_impl_lb->cluster_resource_->service_telemetry_label),
namespace_telemetry_label_(
xds_cluster_impl_lb->cluster_resource_->namespace_telemetry_label),
drop_config_(xds_cluster_impl_lb->drop_config_),
drop_stats_(xds_cluster_impl_lb->drop_stats_),
picker_(std::move(picker)) {
@ -406,8 +406,13 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
auto* call_attempt_tracer = call_state->GetCallAttemptTracer();
if (call_attempt_tracer != nullptr) {
call_attempt_tracer->AddOptionalLabels(
OptionalLabelComponent::kXdsServiceLabels, service_labels_);
call_attempt_tracer->SetOptionalLabel(
ClientCallTracer::CallAttemptTracer::OptionalLabelKey::kXdsServiceName,
service_telemetry_label_);
call_attempt_tracer->SetOptionalLabel(
ClientCallTracer::CallAttemptTracer::OptionalLabelKey::
kXdsServiceNamespace,
namespace_telemetry_label_);
}
// Handle EDS drops.
const std::string* drop_category;
@ -436,11 +441,11 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
if (complete_pick != nullptr) {
auto* subchannel_wrapper =
static_cast<StatsSubchannelWrapper*>(complete_pick->subchannel.get());
// Add locality labels to per-call metrics if needed.
// Add locality label to per-call metrics if needed.
if (call_attempt_tracer != nullptr) {
call_attempt_tracer->AddOptionalLabels(
OptionalLabelComponent::kXdsLocalityLabels,
subchannel_wrapper->locality_labels());
call_attempt_tracer->SetOptionalLabel(
ClientCallTracer::CallAttemptTracer::OptionalLabelKey::kLocality,
subchannel_wrapper->locality());
}
// Handle load reporting.
RefCountedPtr<XdsClusterLocalityStats> locality_stats;
@ -779,7 +784,7 @@ RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
const grpc_resolved_address& address, const ChannelArgs& per_address_args,
const ChannelArgs& args) {
if (parent()->shutting_down_) return nullptr;
// Wrap the subchannel so that we pass along the locality labels and
// Wrap the subchannel so that we pass along the locality label and
// (if load reporting is enabled) the locality stats object, which
// will be used by the picker.
auto locality_name = per_address_args.GetObjectRef<XdsLocalityName>();
@ -806,7 +811,7 @@ RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
if (locality_stats != nullptr) {
locality_data = std::move(locality_stats);
} else {
locality_data = locality_name->locality_labels();
locality_data = locality_name->human_readable_string();
}
return MakeRefCounted<StatsSubchannelWrapper>(
parent()->channel_control_helper()->CreateSubchannel(

@ -32,7 +32,6 @@
#include <grpc/support/json.h>
#include <grpc/support/log.h>
#include "src/core/load_balancing/xds/xds_channel_args.h"
#include "src/core/ext/xds/xds_client_stats.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
@ -40,6 +39,7 @@
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/ref_counted_string.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/json/json.h"
@ -50,6 +50,7 @@
#include "src/core/load_balancing/lb_policy.h"
#include "src/core/load_balancing/lb_policy_factory.h"
#include "src/core/load_balancing/lb_policy_registry.h"
#include "src/core/load_balancing/xds/xds_channel_args.h"
#include "src/core/resolver/endpoint_addresses.h"
namespace grpc_core {
@ -167,7 +168,7 @@ absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) {
}
auto config = args.config.TakeAsSubclass<XdsWrrLocalityLbConfig>();
// Scan the addresses to find the weight for each locality.
std::map<std::string, uint32_t> locality_weights;
std::map<RefCountedStringValue, uint32_t> locality_weights;
if (args.addresses.ok()) {
(*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
auto* locality_name = endpoint.args().GetObject<XdsLocalityName>();
@ -175,7 +176,7 @@ absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) {
endpoint.args().GetInt(GRPC_ARG_XDS_LOCALITY_WEIGHT).value_or(0);
if (locality_name != nullptr && weight > 0) {
auto p = locality_weights.emplace(
locality_name->AsHumanReadableString(), weight);
locality_name->human_readable_string(), weight);
if (!p.second && p.first->second != weight) {
gpr_log(GPR_ERROR,
"INTERNAL ERROR: xds_wrr_locality found different weights "
@ -188,10 +189,10 @@ absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) {
// Construct the config for the weighted_target policy.
Json::Object weighted_targets;
for (const auto& p : locality_weights) {
const std::string& locality_name = p.first;
absl::string_view locality_name = p.first.as_string_view();
uint32_t weight = p.second;
// Add weighted target entry.
weighted_targets[locality_name] = Json::FromObject({
weighted_targets[std::string(locality_name)] = Json::FromObject({
{"weight", Json::FromNumber(weight)},
{"childPolicy", config->child_config()},
});

@ -18,6 +18,8 @@
#include "src/core/resolver/xds/xds_dependency_manager.h"
#include <set>
#include "absl/strings/str_join.h"
#include "src/core/ext/xds/xds_routing.h"
@ -635,11 +637,12 @@ void XdsDependencyManager::OnEndpointUpdate(
it->second.update.resolution_note =
absl::StrCat("EDS resource ", name, " contains no localities");
} else {
std::set<std::string> empty_localities;
std::set<absl::string_view> empty_localities;
for (const auto& priority : endpoint->priorities) {
for (const auto& p : priority.localities) {
if (p.second.endpoints.empty()) {
empty_localities.insert(p.first->AsHumanReadableString());
empty_localities.insert(
p.first->human_readable_string().as_string_view());
}
}
}

@ -52,8 +52,8 @@
namespace grpc {
namespace internal {
using OptionalLabelComponent =
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelComponent;
using OptionalLabelKey =
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey;
namespace {
@ -428,8 +428,7 @@ void ServiceMeshLabelsInjector::AddLabels(
bool ServiceMeshLabelsInjector::AddOptionalLabels(
bool is_client,
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_span,
absl::Span<const grpc_core::RefCountedStringValue> optional_labels,
opentelemetry::nostd::function_ref<
bool(opentelemetry::nostd::string_view,
opentelemetry::common::AttributeValue)>
@ -438,35 +437,26 @@ bool ServiceMeshLabelsInjector::AddOptionalLabels(
// Currently the CSM optional labels are only set on client.
return true;
}
// According to the CSM Observability Metric spec, if the control plane fails
// to provide these labels, the client will set their values to "unknown".
// These default values are set below.
absl::string_view service_name = "unknown";
absl::string_view service_namespace = "unknown";
// Performs JSON label name format to CSM Observability Metric spec format
// conversion.
if (optional_labels_span.size() >
static_cast<size_t>(OptionalLabelComponent::kXdsServiceLabels)) {
const auto& optional_labels = optional_labels_span[static_cast<size_t>(
OptionalLabelComponent::kXdsServiceLabels)];
if (optional_labels != nullptr) {
auto it = optional_labels->find("service_name");
if (it != optional_labels->end()) service_name = it->second;
it = optional_labels->find("service_namespace");
if (it != optional_labels->end()) service_namespace = it->second;
}
}
absl::string_view service_name =
optional_labels[static_cast<size_t>(
grpc_core::ClientCallTracer::CallAttemptTracer::
OptionalLabelKey::kXdsServiceName)]
.as_string_view();
absl::string_view service_namespace =
optional_labels[static_cast<size_t>(
grpc_core::ClientCallTracer::CallAttemptTracer::
OptionalLabelKey::kXdsServiceNamespace)]
.as_string_view();
return callback("csm.service_name",
AbslStrViewToOpenTelemetryStrView(service_name)) &&
service_name.empty()
? "unknown"
: AbslStrViewToOpenTelemetryStrView(service_name)) &&
callback("csm.service_namespace_name",
AbslStrViewToOpenTelemetryStrView(service_namespace));
}
size_t ServiceMeshLabelsInjector::GetOptionalLabelsSize(
bool is_client,
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>)
const {
return is_client ? 2 : 0;
service_namespace.empty()
? "unknown"
: AbslStrViewToOpenTelemetryStrView(service_namespace));
}
} // namespace internal

@ -55,8 +55,7 @@ class ServiceMeshLabelsInjector : public LabelsInjector {
// Add optional labels to the traced calls.
bool AddOptionalLabels(
bool is_client,
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_span,
absl::Span<const grpc_core::RefCountedStringValue> optional_labels,
opentelemetry::nostd::function_ref<
bool(opentelemetry::nostd::string_view,
opentelemetry::common::AttributeValue)>
@ -65,8 +64,10 @@ class ServiceMeshLabelsInjector : public LabelsInjector {
// Gets the size of the actual optional labels.
size_t GetOptionalLabelsSize(
bool is_client,
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_span) const override;
absl::Span<const grpc_core::RefCountedStringValue> /*optional_labels*/)
const override {
return is_client ? 2 : 0;
}
const std::vector<std::pair<absl::string_view, std::string>>&
TestOnlyLocalLabels() const {

@ -103,9 +103,8 @@ class OpenCensusCallTracer : public grpc_core::ClientCallTracer {
void RecordAnnotation(absl::string_view annotation) override;
void RecordAnnotation(const Annotation& annotation) override;
std::shared_ptr<grpc_core::TcpTracerInterface> StartNewTcpTrace() override;
void AddOptionalLabels(
OptionalLabelComponent,
std::shared_ptr<std::map<std::string, std::string>>) override {}
void SetOptionalLabel(OptionalLabelKey,
grpc_core::RefCountedStringValue) override {}
experimental::CensusContext* context() { return &context_; }

@ -56,14 +56,13 @@ class OpenTelemetryPlugin::KeyValueIterable
additional_labels,
const OpenTelemetryPlugin::ActivePluginOptionsView*
active_plugin_options_view,
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_span,
absl::Span<const grpc_core::RefCountedStringValue> optional_labels,
bool is_client, const OpenTelemetryPlugin* otel_plugin)
: injected_labels_from_plugin_options_(
injected_labels_from_plugin_options),
additional_labels_(additional_labels),
active_plugin_options_view_(active_plugin_options_view),
optional_labels_(optional_labels_span),
optional_labels_(optional_labels),
is_client_(is_client),
otel_plugin_(otel_plugin) {}
@ -100,6 +99,26 @@ class OpenTelemetryPlugin::KeyValueIterable
return false;
}
}
// Add per-call optional labels
if (!optional_labels_.empty()) {
GPR_ASSERT(
optional_labels_.size() ==
static_cast<size_t>(grpc_core::ClientCallTracer::CallAttemptTracer::
OptionalLabelKey::kSize));
for (size_t i = 0; i < optional_labels_.size(); ++i) {
if (!otel_plugin_->per_call_optional_label_bits_.test(i)) {
continue;
}
if (!callback(
AbslStrViewToOpenTelemetryStrView(OptionalLabelKeyToString(
static_cast<grpc_core::ClientCallTracer::CallAttemptTracer::
OptionalLabelKey>(i))),
AbslStrViewToOpenTelemetryStrView(
optional_labels_[i].as_string_view()))) {
return false;
}
}
}
return true;
}
@ -132,8 +151,7 @@ class OpenTelemetryPlugin::KeyValueIterable
additional_labels_;
const OpenTelemetryPlugin::ActivePluginOptionsView*
active_plugin_options_view_;
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_;
absl::Span<const grpc_core::RefCountedStringValue> optional_labels_;
bool is_client_;
const OpenTelemetryPlugin* otel_plugin_;
};

@ -82,8 +82,8 @@ OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::CallAttemptTracer(
1, KeyValueIterable(
/*injected_labels_from_plugin_options=*/{}, additional_labels,
/*active_plugin_options_view=*/nullptr,
/*optional_labels_span=*/{}, /*is_client=*/true,
parent_->otel_plugin_));
/*optional_labels=*/{},
/*is_client=*/true, parent_->otel_plugin_));
}
}
@ -156,8 +156,8 @@ void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
static_cast<grpc_status_code>(status.code()))}}};
KeyValueIterable labels(
injected_labels_from_plugin_options_, additional_labels,
&parent_->scope_config_->active_plugin_options_view(),
optional_labels_array_, /*is_client=*/true, parent_->otel_plugin_);
&parent_->scope_config_->active_plugin_options_view(), optional_labels_,
/*is_client=*/true, parent_->otel_plugin_);
if (parent_->otel_plugin_->client_.attempt.duration != nullptr) {
parent_->otel_plugin_->client_.attempt.duration->Record(
absl::ToDoubleSeconds(absl::Now() - start_time_), labels,
@ -209,12 +209,10 @@ OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::StartNewTcpTrace() {
return nullptr;
}
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
AddOptionalLabels(
OptionalLabelComponent component,
std::shared_ptr<std::map<std::string, std::string>> optional_labels) {
optional_labels_array_[static_cast<std::size_t>(component)] =
std::move(optional_labels);
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::SetOptionalLabel(
OptionalLabelKey key, grpc_core::RefCountedStringValue value) {
GPR_ASSERT(key < OptionalLabelKey::kSize);
optional_labels_[static_cast<size_t>(key)] = std::move(value);
}
//

@ -92,9 +92,8 @@ class OpenTelemetryPlugin::ClientCallTracer
void RecordAnnotation(absl::string_view /*annotation*/) override;
void RecordAnnotation(const Annotation& /*annotation*/) override;
std::shared_ptr<grpc_core::TcpTracerInterface> StartNewTcpTrace() override;
void AddOptionalLabels(OptionalLabelComponent component,
std::shared_ptr<std::map<std::string, std::string>>
optional_labels) override;
void SetOptionalLabel(OptionalLabelKey key,
grpc_core::RefCountedStringValue value) override;
private:
const ClientCallTracer* parent_;
@ -102,10 +101,10 @@ class OpenTelemetryPlugin::ClientCallTracer
// Start time (for measuring latency).
absl::Time start_time_;
std::unique_ptr<LabelsIterable> injected_labels_;
// The indices of the array correspond to the OptionalLabelComponent enum.
std::array<std::shared_ptr<std::map<std::string, std::string>>,
static_cast<size_t>(OptionalLabelComponent::kSize)>
optional_labels_array_;
// Avoid std::map to avoid per-call allocations.
std::array<grpc_core::RefCountedStringValue,
static_cast<size_t>(OptionalLabelKey::kSize)>
optional_labels_;
std::vector<std::unique_ptr<LabelsIterable>>
injected_labels_from_plugin_options_;
};

@ -214,10 +214,7 @@ OpenTelemetryPluginBuilderImpl& OpenTelemetryPluginBuilderImpl::AddPluginOption(
OpenTelemetryPluginBuilderImpl&
OpenTelemetryPluginBuilderImpl::AddOptionalLabel(
absl::string_view optional_label_key) {
if (optional_label_keys_ == nullptr) {
optional_label_keys_ = std::make_shared<std::set<absl::string_view>>();
}
optional_label_keys_->emplace(optional_label_key);
optional_label_keys_.emplace(optional_label_key);
return *this;
}
@ -340,7 +337,7 @@ OpenTelemetryPlugin::OpenTelemetryPlugin(
server_selector,
std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>>
plugin_options,
std::shared_ptr<std::set<absl::string_view>> optional_label_keys,
const std::set<absl::string_view>& optional_label_keys,
absl::AnyInvocable<
bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
channel_scope_filter)
@ -423,6 +420,17 @@ OpenTelemetryPlugin::OpenTelemetryPlugin(
kServerCallRcvdTotalCompressedMessageSizeInstrumentName),
"Compressed message bytes received per server call", "By");
}
// Store optional label keys for per call metrics
GPR_ASSERT(static_cast<size_t>(
grpc_core::ClientCallTracer::CallAttemptTracer::
OptionalLabelKey::kSize) <= kOptionalLabelsSizeLimit);
for (const auto& key : optional_label_keys) {
auto optional_key = OptionalLabelStringToKey(key);
if (optional_key.has_value()) {
per_call_optional_label_bits_.set(
static_cast<size_t>(optional_key.value()));
}
}
// Non-per-call metrics.
grpc_core::GlobalInstrumentsRegistry::ForEach(
[&, this](const grpc_core::GlobalInstrumentsRegistry::
@ -527,14 +535,38 @@ OpenTelemetryPlugin::OpenTelemetryPlugin(
descriptor.instrument_type));
}
for (size_t i = 0; i < descriptor.optional_label_keys.size(); ++i) {
if (optional_label_keys->find(descriptor.optional_label_keys[i]) !=
optional_label_keys->end()) {
if (optional_label_keys.find(descriptor.optional_label_keys[i]) !=
optional_label_keys.end()) {
instruments_data_[descriptor.index].optional_labels_bits.set(i);
}
}
});
}
namespace {
constexpr absl::string_view kLocality = "grpc.lb.locality";
}
absl::string_view OpenTelemetryPlugin::OptionalLabelKeyToString(
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey key) {
switch (key) {
case grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey::
kLocality:
return kLocality;
default:
grpc_core::Crash("Illegal OptionalLabelKey index");
}
}
absl::optional<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey>
OpenTelemetryPlugin::OptionalLabelStringToKey(absl::string_view key) {
if (key == kLocality) {
return grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey::
kLocality;
}
return absl::nullopt;
}
std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
OpenTelemetryPlugin::IsEnabledForChannel(
const OpenTelemetryPluginBuilder::ChannelScope& scope) const {

@ -89,8 +89,7 @@ class LabelsInjector {
// false when callback returns false.
virtual bool AddOptionalLabels(
bool is_client,
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_span,
absl::Span<const grpc_core::RefCountedStringValue> optional_labels,
opentelemetry::nostd::function_ref<
bool(opentelemetry::nostd::string_view,
opentelemetry::common::AttributeValue)>
@ -100,8 +99,8 @@ class LabelsInjector {
// produce through the AddOptionalLabels method.
virtual size_t GetOptionalLabelsSize(
bool is_client,
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_span) const = 0;
absl::Span<const grpc_core::RefCountedStringValue> optional_labels)
const = 0;
};
class InternalOpenTelemetryPluginOption
@ -195,7 +194,7 @@ class OpenTelemetryPluginBuilderImpl {
server_selector_;
std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>>
plugin_options_;
std::shared_ptr<std::set<absl::string_view>> optional_label_keys_;
std::set<absl::string_view> optional_label_keys_;
absl::AnyInvocable<bool(
const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
channel_scope_filter_;
@ -215,7 +214,7 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
server_selector,
std::vector<std::unique_ptr<InternalOpenTelemetryPluginOption>>
plugin_options,
std::shared_ptr<std::set<absl::string_view>> optional_label_keys,
const std::set<absl::string_view>& optional_label_keys,
absl::AnyInvocable<
bool(const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
channel_scope_filter);
@ -365,6 +364,16 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
grpc_core::RegisteredMetricCallback* key_;
};
// Returns the string form of \a key
static absl::string_view OptionalLabelKeyToString(
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey key);
// Returns the OptionalLabelKey form of \a key if \a key is recognized and
// is public, absl::nullopt otherwise.
static absl::optional<
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey>
OptionalLabelStringToKey(absl::string_view key);
// StatsPlugin:
std::pair<bool, std::shared_ptr<grpc_core::StatsPlugin::ScopeConfig>>
IsEnabledForChannel(
@ -455,6 +464,9 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
// Instruments for per-call metrics.
ClientMetrics client_;
ServerMetrics server_;
static constexpr int kOptionalLabelsSizeLimit = 64;
using OptionalLabelsBitSet = std::bitset<kOptionalLabelsSizeLimit>;
OptionalLabelsBitSet per_call_optional_label_bits_;
// Instruments for non-per-call metrics.
struct Disabled {};
using Instrument = absl::variant<
@ -464,8 +476,6 @@ class OpenTelemetryPlugin : public grpc_core::StatsPlugin {
std::unique_ptr<opentelemetry::metrics::Histogram<double>>,
std::unique_ptr<CallbackGaugeState<int64_t>>,
std::unique_ptr<CallbackGaugeState<double>>>;
static constexpr int kOptionalLabelsSizeLimit = 64;
using OptionalLabelsBitSet = std::bitset<kOptionalLabelsSizeLimit>;
struct InstrumentData {
Instrument instrument;
OptionalLabelsBitSet optional_labels_bits;

@ -113,7 +113,7 @@ void OpenTelemetryPlugin::ServerCallTracer::RecordEnd(
// Currently we do not have any optional labels on the server side.
KeyValueIterable labels(
injected_labels_from_plugin_options_, additional_labels,
/*active_plugin_options_view=*/nullptr, /*optional_labels_span=*/{},
/*active_plugin_options_view=*/nullptr, /*optional_labels=*/{},
/*is_client=*/false, otel_plugin_);
if (otel_plugin_->server_.call.duration != nullptr) {
otel_plugin_->server_.call.duration->Record(

@ -73,10 +73,9 @@ class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer {
void RecordAnnotation(absl::string_view annotation) override;
void RecordAnnotation(const Annotation& annotation) override;
std::shared_ptr<grpc_core::TcpTracerInterface> StartNewTcpTrace() override;
void AddOptionalLabels(
OptionalLabelComponent /*component*/,
std::shared_ptr<std::map<std::string, std::string>> /*labels*/)
override {}
void SetOptionalLabel(OptionalLabelKey /*key*/,
grpc_core::RefCountedStringValue /*value*/) override {
}
private:
// Maximum size of trace context is sent on the wire.

@ -99,10 +99,8 @@ class FakeCallTracer : public ClientCallTracer {
void RecordAnnotation(absl::string_view /*annotation*/) override {}
void RecordAnnotation(const Annotation& /*annotation*/) override {}
void AddOptionalLabels(
OptionalLabelComponent /*component*/,
std::shared_ptr<std::map<std::string, std::string>> /*labels*/)
override {}
void SetOptionalLabel(OptionalLabelKey /*key*/,
RefCountedStringValue /*value*/) override {}
static grpc_transport_stream_stats transport_stream_stats() {
MutexLock lock(g_mu);

@ -91,26 +91,22 @@ class FakeClientCallTracer : public ClientCallTracer {
std::shared_ptr<TcpTracerInterface> StartNewTcpTrace() override {
return nullptr;
}
void AddOptionalLabels(
OptionalLabelComponent component,
std::shared_ptr<std::map<std::string, std::string>> labels) override {
optional_labels_.emplace(component, std::move(labels));
void SetOptionalLabel(OptionalLabelKey key,
RefCountedStringValue value) override {
optional_labels_.emplace(key, std::move(value));
}
std::string TraceId() override { return ""; }
std::string SpanId() override { return ""; }
bool IsSampled() override { return false; }
const std::map<OptionalLabelComponent,
std::shared_ptr<std::map<std::string, std::string>>>&
GetOptionalLabels() const {
const std::map<OptionalLabelKey, RefCountedStringValue>& GetOptionalLabels()
const {
return optional_labels_;
}
private:
std::vector<std::string>* annotation_logger_;
std::map<OptionalLabelComponent,
std::shared_ptr<std::map<std::string, std::string>>>
optional_labels_;
std::map<OptionalLabelKey, RefCountedStringValue> optional_labels_;
};
explicit FakeClientCallTracer(std::vector<std::string>* annotation_logger)

@ -1635,10 +1635,8 @@ TEST_F(TelemetryLabelTest, ValidServiceLabelsConfig) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_THAT(*resource.telemetry_labels,
::testing::UnorderedElementsAre(
::testing::Pair("service_name", "abc"),
::testing::Pair("service_namespace", "xyz")));
EXPECT_EQ(resource.service_telemetry_label.as_string_view(), "abc");
EXPECT_EQ(resource.namespace_telemetry_label.as_string_view(), "xyz");
}
TEST_F(TelemetryLabelTest, MissingMetadataField) {
@ -1653,7 +1651,10 @@ TEST_F(TelemetryLabelTest, MissingMetadataField) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_EQ(resource.telemetry_labels, nullptr);
EXPECT_THAT(resource.service_telemetry_label.as_string_view(),
::testing::IsEmpty());
EXPECT_THAT(resource.namespace_telemetry_label.as_string_view(),
::testing::IsEmpty());
}
TEST_F(TelemetryLabelTest, MissingCsmFilterMetadataField) {
@ -1671,10 +1672,13 @@ TEST_F(TelemetryLabelTest, MissingCsmFilterMetadataField) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_EQ(resource.telemetry_labels, nullptr);
EXPECT_THAT(resource.service_telemetry_label.as_string_view(),
::testing::IsEmpty());
EXPECT_THAT(resource.namespace_telemetry_label.as_string_view(),
::testing::IsEmpty());
}
TEST_F(TelemetryLabelTest, IgnoreNonStringEntries) {
TEST_F(TelemetryLabelTest, IgnoreNonServiceLabelEntries) {
Cluster cluster;
cluster.set_type(cluster.EDS);
cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self();
@ -1684,6 +1688,7 @@ TEST_F(TelemetryLabelTest, IgnoreNonStringEntries) {
label_map["bool_value"].set_bool_value(true);
label_map["number_value"].set_number_value(3.14);
*label_map["string_value"].mutable_string_value() = "abc";
*label_map["service_name"].mutable_string_value() = "service";
label_map["null_value"].set_null_value(::google::protobuf::NULL_VALUE);
auto& list_value_values =
*label_map["list_value"].mutable_list_value()->mutable_values();
@ -1700,9 +1705,9 @@ TEST_F(TelemetryLabelTest, IgnoreNonStringEntries) {
ASSERT_TRUE(decode_result.resource.ok()) << decode_result.resource.status();
auto& resource =
static_cast<const XdsClusterResource&>(**decode_result.resource);
EXPECT_THAT(
*resource.telemetry_labels,
::testing::UnorderedElementsAre(::testing::Pair("string_value", "abc")));
EXPECT_THAT(resource.service_telemetry_label.as_string_view(), "service");
EXPECT_THAT(resource.namespace_telemetry_label.as_string_view(),
::testing::IsEmpty());
}
} // namespace

@ -44,8 +44,8 @@ using ::envoy::config::core::v3::HealthStatus;
using ::envoy::type::v3::FractionalPercent;
using ClientStats = LrsServiceImpl::ClientStats;
using OptionalLabelComponent =
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelComponent;
using OptionalLabelKey =
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey;
constexpr char kLbDropType[] = "lb";
constexpr char kThrottleDropType[] = "throttle";
@ -336,38 +336,31 @@ TEST_P(CdsTest, MetricLabels) {
->GetLastCallAttemptTracer()
->GetOptionalLabels(),
::testing::ElementsAre(
::testing::Pair(
OptionalLabelComponent::kXdsServiceLabels,
::testing::Pointee(::testing::ElementsAre(
::testing::Pair("service_name", "myservice"),
::testing::Pair("service_namespace", "mynamespace")))),
::testing::Pair(
OptionalLabelComponent::kXdsLocalityLabels,
::testing::Pointee(::testing::ElementsAre(::testing::Pair(
"grpc.lb.locality", LocalityNameString("locality0")))))));
::testing::Pair(OptionalLabelKey::kXdsServiceName, "myservice"),
::testing::Pair(OptionalLabelKey::kXdsServiceNamespace,
"mynamespace"),
::testing::Pair(OptionalLabelKey::kLocality,
LocalityNameString("locality0"))));
// Send an RPC to backend 1.
WaitForBackend(DEBUG_LOCATION, 1);
// Verify that the optional labels are recorded in the call tracer.
// Verify that the optional labels are recorded in the call
// tracer.
EXPECT_THAT(
fake_client_call_tracer_factory.GetLastFakeClientCallTracer()
->GetLastCallAttemptTracer()
->GetOptionalLabels(),
::testing::ElementsAre(
::testing::Pair(
OptionalLabelComponent::kXdsServiceLabels,
::testing::Pointee(::testing::ElementsAre(
::testing::Pair("service_name", "myservice"),
::testing::Pair("service_namespace", "mynamespace")))),
::testing::Pair(
OptionalLabelComponent::kXdsLocalityLabels,
::testing::Pointee(::testing::ElementsAre(::testing::Pair(
"grpc.lb.locality", LocalityNameString("locality1")))))));
// TODO(yashkt, yijiem): This shutdown shouldn't actually be necessary.
// The only reason it's here is to add a delay before
// fake_client_call_tracer_factory goes out of scope, since there may
// be lingering callbacks in the call stack that are using the
// CallAttemptTracer even after we get here, which would then cause a
// crash. Find a cleaner way to fix this.
::testing::Pair(OptionalLabelKey::kXdsServiceName, "myservice"),
::testing::Pair(OptionalLabelKey::kXdsServiceNamespace,
"mynamespace"),
::testing::Pair(OptionalLabelKey::kLocality,
LocalityNameString("locality1"))));
// TODO(yashkt, yijiem): This shutdown shouldn't actually be necessary. The
// only reason it's here is to add a delay before
// fake_client_call_tracer_factory goes out of scope, since there may be
// lingering callbacks in the call stack that are using the CallAttemptTracer
// even after we get here, which would then cause a crash. Find a cleaner way
// to fix this.
balancer_->Shutdown();
}

@ -43,6 +43,8 @@ namespace grpc {
namespace testing {
namespace {
using OptionalLabelKey =
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey;
using ::testing::ElementsAre;
using ::testing::Pair;
@ -155,9 +157,12 @@ class MetadataExchangeTest
: public OpenTelemetryPluginEnd2EndTest,
public ::testing::WithParamInterface<TestScenario> {
protected:
void Init(const std::vector<absl::string_view>& metric_names,
bool enable_client_side_injector = true,
std::map<std::string, std::string> labels_to_inject = {}) {
void Init(
const std::vector<absl::string_view>& metric_names,
bool enable_client_side_injector = true,
std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
grpc_core::RefCountedStringValue>
labels_to_inject = {}) {
const char* kBootstrap =
"{\"node\": {\"id\": "
"\"projects/1234567890/networks/mesh:mesh-id/nodes/"
@ -406,7 +411,10 @@ TEST_P(MetadataExchangeTest, VerifyCsmServiceLabels) {
kClientAttemptDurationInstrumentName},
/*enable_client_side_injector=*/true,
// Injects CSM service labels to be recorded in the call.
{{"service_name", "myservice"}, {"service_namespace", "mynamespace"}});
{{OptionalLabelKey::kXdsServiceName,
grpc_core::RefCountedStringValue("myservice")},
{OptionalLabelKey::kXdsServiceNamespace,
grpc_core::RefCountedStringValue("mynamespace")}});
SendRPC();
const char* kMetricName = "grpc.client.attempt.duration";
auto data = ReadCurrentMetricsData(

@ -844,6 +844,170 @@ TEST_F(OpenTelemetryPluginEnd2EndTest,
EXPECT_EQ(*status_value, "UNIMPLEMENTED");
}
TEST_F(OpenTelemetryPluginEnd2EndTest, OptionalPerCallLocalityLabel) {
Init(
std::move(Options()
.set_metric_names({grpc::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kServerCallStartedInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName})
.add_optional_label("grpc.lb.locality")
.set_labels_to_inject(
{{grpc_core::ClientCallTracer::CallAttemptTracer::
OptionalLabelKey::kLocality,
grpc_core::RefCountedStringValue("locality")}})));
SendRPC();
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) {
return !data.contains("grpc.client.attempt.started") ||
!data.contains("grpc.client.attempt.duration") ||
!data.contains("grpc.server.call.started") ||
!data.contains("grpc.server.call.duration");
});
// Verify client side metric (grpc.client.attempt.started) does not sees this
// label
ASSERT_EQ(data["grpc.client.attempt.started"].size(), 1);
const auto& client_attributes =
data["grpc.client.attempt.started"][0].attributes.GetAttributes();
EXPECT_THAT(
client_attributes,
::testing::Not(::testing::Contains(::testing::Key("grpc.lb.locality"))));
// Verify client side metric (grpc.client.attempt.duration) sees this label.
ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1);
const auto& client_duration_attributes =
data["grpc.client.attempt.duration"][0].attributes.GetAttributes();
EXPECT_EQ(
absl::get<std::string>(client_duration_attributes.at("grpc.lb.locality")),
"locality");
// Verify server metric (grpc.server.call.started) does not see this label
ASSERT_EQ(data["grpc.server.call.started"].size(), 1);
const auto& server_attributes =
data["grpc.server.call.started"][0].attributes.GetAttributes();
EXPECT_THAT(
server_attributes,
::testing::Not(::testing::Contains(::testing::Key("grpc.lb.locality"))));
// Verify server metric (grpc.server.call.duration) does not see this label
ASSERT_EQ(data["grpc.server.call.duration"].size(), 1);
const auto& server_duration_attributes =
data["grpc.server.call.duration"][0].attributes.GetAttributes();
EXPECT_THAT(
server_duration_attributes,
::testing::Not(::testing::Contains(::testing::Key("grpc.lb.locality"))));
}
// Tests that when locality label is enabled on the plugin but not provided by
// gRPC, an empty value is recorded.
TEST_F(OpenTelemetryPluginEnd2EndTest,
OptionalPerCallLocalityLabelWhenNotAvailable) {
Init(std::move(
Options()
.set_metric_names({grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName})
.add_optional_label("grpc.lb.locality")));
SendRPC();
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains("grpc.client.attempt.duration"); });
// Verify client side metric (grpc.client.attempt.duration) sees the empty
// label value.
ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1);
const auto& client_duration_attributes =
data["grpc.client.attempt.duration"][0].attributes.GetAttributes();
EXPECT_EQ(
absl::get<std::string>(client_duration_attributes.at("grpc.lb.locality")),
"");
}
// Tests that when locality label is injected but not enabled by the plugin, the
// label is not recorded.
TEST_F(OpenTelemetryPluginEnd2EndTest,
OptionalPerCallLocalityLabelNotRecordedWhenNotEnabled) {
Init(std::move(
Options()
.set_metric_names({grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName})
.set_labels_to_inject(
{{grpc_core::ClientCallTracer::CallAttemptTracer::
OptionalLabelKey::kLocality,
grpc_core::RefCountedStringValue("locality")}})));
SendRPC();
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) { return !data.contains("grpc.client.attempt.duration"); });
// Verify client side metric (grpc.client.attempt.duration) does not see the
// locality label.
ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1);
const auto& client_duration_attributes =
data["grpc.client.attempt.duration"][0].attributes.GetAttributes();
EXPECT_THAT(
client_duration_attributes,
::testing::Not(::testing::Contains(::testing::Key("grpc.lb.locality"))));
}
TEST_F(OpenTelemetryPluginEnd2EndTest,
UnknownLabelDoesNotShowOnPerCallMetrics) {
Init(
std::move(Options()
.set_metric_names({grpc::OpenTelemetryPluginBuilder::
kClientAttemptStartedInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kClientAttemptDurationInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kServerCallStartedInstrumentName,
grpc::OpenTelemetryPluginBuilder::
kServerCallDurationInstrumentName})
.add_optional_label("unknown")));
SendRPC();
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointDataAttributes>>&
data) {
return !data.contains("grpc.client.attempt.started") ||
!data.contains("grpc.client.attempt.duration") ||
!data.contains("grpc.server.call.started") ||
!data.contains("grpc.server.call.duration");
});
// Verify client side metric (grpc.client.attempt.started) does not sees this
// label
ASSERT_EQ(data["grpc.client.attempt.started"].size(), 1);
const auto& client_attributes =
data["grpc.client.attempt.started"][0].attributes.GetAttributes();
EXPECT_THAT(client_attributes,
::testing::Not(::testing::Contains(::testing::Key("unknown"))));
// Verify client side metric (grpc.client.attempt.duration) does not see this
// label
ASSERT_EQ(data["grpc.client.attempt.duration"].size(), 1);
const auto& client_duration_attributes =
data["grpc.client.attempt.duration"][0].attributes.GetAttributes();
EXPECT_THAT(client_duration_attributes,
::testing::Not(::testing::Contains(::testing::Key("unknown"))));
// Verify server metric (grpc.server.call.started) does not see this label
ASSERT_EQ(data["grpc.server.call.started"].size(), 1);
const auto& server_attributes =
data["grpc.server.call.started"][0].attributes.GetAttributes();
EXPECT_THAT(
server_attributes,
::testing::Not(::testing::Contains(::testing::Key("grpc.lb.locality"))));
// Verify server metric (grpc.server.call.duration) does not see this label
ASSERT_EQ(data["grpc.server.call.duration"].size(), 1);
const auto& server_duration_attributes =
data["grpc.server.call.duration"][0].attributes.GetAttributes();
EXPECT_THAT(server_duration_attributes,
::testing::Not(::testing::Contains(::testing::Key("unknown"))));
}
using OpenTelemetryPluginOptionEnd2EndTest = OpenTelemetryPluginEnd2EndTest;
class SimpleLabelIterable : public grpc::internal::LabelsIterable {
@ -886,20 +1050,18 @@ class CustomLabelInjector : public grpc::internal::LabelsInjector {
grpc::internal::LabelsIterable* /*labels_from_incoming_metadata*/)
const override {}
bool AddOptionalLabels(
bool /*is_client*/,
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
/*optional_labels_span*/,
opentelemetry::nostd::function_ref<
bool(opentelemetry::nostd::string_view,
opentelemetry::common::AttributeValue)>
/*callback*/) const override {
bool AddOptionalLabels(bool /*is_client*/,
absl::Span<const grpc_core::RefCountedStringValue>
/*optional_labels*/,
opentelemetry::nostd::function_ref<
bool(opentelemetry::nostd::string_view,
opentelemetry::common::AttributeValue)>
/*callback*/) const override {
return true;
}
size_t GetOptionalLabelsSize(
bool /*is_client*/,
absl::Span<const std::shared_ptr<std::map<std::string, std::string>>>
bool /*is_client*/, absl::Span<const grpc_core::RefCountedStringValue>
/*optional_labels_span*/) const override {
return 0;
}

@ -46,15 +46,16 @@ namespace testing {
// A subchannel filter that adds the service labels for test to the
// CallAttemptTracer in a call.
class AddServiceLabelsFilter : public grpc_core::ChannelFilter {
class AddLabelsFilter : public grpc_core::ChannelFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<AddServiceLabelsFilter> Create(
static absl::StatusOr<AddLabelsFilter> Create(
const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) {
return AddServiceLabelsFilter(
args.GetPointer<const std::map<std::string, std::string>>(
GRPC_ARG_LABELS_TO_INJECT));
return AddLabelsFilter(
*args.GetPointer<std::map<
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
grpc_core::RefCountedStringValue>>(GRPC_ARG_LABELS_TO_INJECT));
}
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise(
@ -65,23 +66,27 @@ class AddServiceLabelsFilter : public grpc_core::ChannelFilter {
auto* call_tracer = static_cast<CallAttemptTracer*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
EXPECT_NE(call_tracer, nullptr);
call_tracer->AddOptionalLabels(
CallAttemptTracer::OptionalLabelComponent::kXdsServiceLabels,
std::make_shared<std::map<std::string, std::string>>(
*labels_to_inject_));
for (const auto& pair : labels_to_inject_) {
call_tracer->SetOptionalLabel(pair.first, pair.second);
}
return next_promise_factory(std::move(call_args));
}
private:
explicit AddServiceLabelsFilter(
const std::map<std::string, std::string>* labels_to_inject)
: labels_to_inject_(labels_to_inject) {}
explicit AddLabelsFilter(
std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
grpc_core::RefCountedStringValue>
labels_to_inject)
: labels_to_inject_(std::move(labels_to_inject)) {}
const std::map<std::string, std::string>* labels_to_inject_;
const std::map<
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
grpc_core::RefCountedStringValue>
labels_to_inject_;
};
const grpc_channel_filter AddServiceLabelsFilter::kFilter =
grpc_core::MakePromiseBasedFilter<AddServiceLabelsFilter,
const grpc_channel_filter AddLabelsFilter::kFilter =
grpc_core::MakePromiseBasedFilter<AddLabelsFilter,
grpc_core::FilterEndpoint::kClient>(
"add_service_labels_filter");
@ -135,11 +140,11 @@ void OpenTelemetryPluginEnd2EndTest::Init(Options config) {
grpc_core::CoreConfiguration::Reset();
ChannelArguments channel_args;
if (!config.labels_to_inject.empty()) {
labels_to_inject_ = config.labels_to_inject;
labels_to_inject_ = std::move(config.labels_to_inject);
grpc_core::CoreConfiguration::RegisterBuilder(
[](grpc_core::CoreConfiguration::Builder* builder) mutable {
builder->channel_init()->RegisterFilter(
GRPC_CLIENT_SUBCHANNEL, &AddServiceLabelsFilter::kFilter);
builder->channel_init()->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
&AddLabelsFilter::kFilter);
});
channel_args.SetPointer(GRPC_ARG_LABELS_TO_INJECT, &labels_to_inject_);
}

@ -78,7 +78,11 @@ class OpenTelemetryPluginEnd2EndTest : public ::testing::Test {
return *this;
}
Options& set_labels_to_inject(std::map<std::string, std::string> labels) {
Options& set_labels_to_inject(
std::map<
grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
grpc_core::RefCountedStringValue>
labels) {
labels_to_inject = std::move(labels);
return *this;
}
@ -132,7 +136,9 @@ class OpenTelemetryPluginEnd2EndTest : public ::testing::Test {
opentelemetry::sdk::resource::Resource::Create({}));
std::unique_ptr<grpc::internal::LabelsInjector> labels_injector;
bool use_meter_provider = true;
std::map<std::string, std::string> labels_to_inject;
std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
grpc_core::RefCountedStringValue>
labels_to_inject;
absl::AnyInvocable<bool(
const OpenTelemetryPluginBuilder::ChannelScope& /*scope*/) const>
channel_scope_filter;
@ -199,7 +205,9 @@ class OpenTelemetryPluginEnd2EndTest : public ::testing::Test {
const absl::string_view kMethodName = "grpc.testing.EchoTestService/Echo";
const absl::string_view kGenericMethodName = "foo/bar";
std::map<std::string, std::string> labels_to_inject_;
std::map<grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey,
grpc_core::RefCountedStringValue>
labels_to_inject_;
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader_;
std::string server_address_;
std::string canonical_server_address_;

Loading…
Cancel
Save