From db9ca10b351da70cd34359f8ea5ea00cbb4ffc1a Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 4 Mar 2024 17:41:31 -0800 Subject: [PATCH] [xDS] add optional locality label to per-call metrics (#36049) Closes #36049 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36049 from markdroth:xds_locality_label_on_per_call_metrics 9c5aeb7fb14a229e80baf6ed267f8ead5c5fd044 PiperOrigin-RevId: 612648213 --- src/core/BUILD | 1 + src/core/ext/xds/xds_client_stats.h | 12 +- src/core/lib/channel/call_tracer.h | 5 +- .../load_balancing/xds/xds_cluster_impl.cc | 119 ++++++++++++------ .../end2end/xds/xds_cluster_end2end_test.cc | 58 ++++++--- test/cpp/end2end/xds/xds_utils.cc | 6 + test/cpp/end2end/xds/xds_utils.h | 3 + test/cpp/end2end/xds/xds_wrr_end2end_test.cc | 6 - 8 files changed, 147 insertions(+), 63 deletions(-) diff --git a/src/core/BUILD b/src/core/BUILD index 3f80297081c..f0c58279faf 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -5277,6 +5277,7 @@ grpc_cc_library( "lb_policy", "lb_policy_factory", "lb_policy_registry", + "match", "pollset_set", "ref_counted", "resolved_address", diff --git a/src/core/ext/xds/xds_client_stats.h b/src/core/ext/xds/xds_client_stats.h index 55385fa2e90..5f78e156768 100644 --- a/src/core/ext/xds/xds_client_stats.h +++ b/src/core/ext/xds/xds_client_stats.h @@ -63,7 +63,11 @@ class XdsLocalityName : public RefCounted { XdsLocalityName(std::string region, std::string zone, std::string sub_zone) : region_(std::move(region)), zone_(std::move(zone)), - sub_zone_(std::move(sub_zone)) {} + sub_zone_(std::move(sub_zone)), + locality_labels_( + std::make_shared>()) { + locality_labels_->emplace("grpc.lb.locality", AsHumanReadableString()); + } bool operator==(const XdsLocalityName& other) const { return region_ == other.region_ && zone_ == other.zone_ && @@ -85,6 +89,9 @@ class XdsLocalityName : public RefCounted { const std::string& region() const { return region_; } const std::string& zone() const { return zone_; } const std::string& sub_zone() const { return sub_zone_; } + std::shared_ptr> locality_labels() const { + return locality_labels_; + } std::string AsHumanReadableString() const { return absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}", @@ -104,6 +111,7 @@ class XdsLocalityName : public RefCounted { std::string region_; std::string zone_; std::string sub_zone_; + std::shared_ptr> locality_labels_; }; // Drop stats for an xds cluster. @@ -223,6 +231,8 @@ class XdsClusterLocalityStats : public RefCounted { void AddCallFinished(const std::map* named_metrics, bool fail = false); + XdsLocalityName* locality_name() const { return name_.get(); } + private: struct Stats { std::atomic total_successful_requests{0}; diff --git a/src/core/lib/channel/call_tracer.h b/src/core/lib/channel/call_tracer.h index f189f311963..54d5e2c8508 100644 --- a/src/core/lib/channel/call_tracer.h +++ b/src/core/lib/channel/call_tracer.h @@ -128,8 +128,9 @@ class ClientCallTracer : public CallTracerAnnotationInterface { class CallAttemptTracer : public CallTracerInterface { public: enum class OptionalLabelComponent : std::uint8_t { - kXdsServiceLabels = 0, - kSize = 1, // keep last + kXdsServiceLabels, + kXdsLocalityLabels, + kSize, // keep last }; ~CallAttemptTracer() override {} diff --git a/src/core/load_balancing/xds/xds_cluster_impl.cc b/src/core/load_balancing/xds/xds_cluster_impl.cc index c4a6b1dddd8..fc5c5e32b7a 100644 --- a/src/core/load_balancing/xds/xds_cluster_impl.cc +++ b/src/core/load_balancing/xds/xds_cluster_impl.cc @@ -51,6 +51,7 @@ #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -191,18 +192,46 @@ class XdsClusterImplLb : public LoadBalancingPolicy { private: class StatsSubchannelWrapper : public DelegatingSubchannel { public: + // If load reporting is enabled and we have an XdsClusterLocalityStats + // object, that object already contains the locality labels. We + // need to store the locality labels directly only in the case where + // load reporting is disabled. + using LocalityData = absl::variant< + std::shared_ptr> /*locality_labels*/, + RefCountedPtr /*locality_stats*/>; + StatsSubchannelWrapper( RefCountedPtr wrapped_subchannel, - RefCountedPtr locality_stats) + LocalityData locality_data) : DelegatingSubchannel(std::move(wrapped_subchannel)), - locality_stats_(std::move(locality_stats)) {} + locality_data_(std::move(locality_data)) {} + + std::shared_ptr> locality_labels() + const { + return Match( + locality_data_, + [](const std::shared_ptr>& + locality_labels) { + return locality_labels; + }, + [](const RefCountedPtr& locality_stats) { + return locality_stats->locality_name()->locality_labels(); + }); + } XdsClusterLocalityStats* locality_stats() const { - return locality_stats_.get(); + return Match( + locality_data_, + [](const std::shared_ptr>&) { + return static_cast(nullptr); + }, + [](const RefCountedPtr& locality_stats) { + return locality_stats.get(); + }); } private: - RefCountedPtr locality_stats_; + LocalityData locality_data_; }; // A picker that wraps the picker from the child to perform drops. @@ -375,8 +404,9 @@ XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb, LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick( LoadBalancingPolicy::PickArgs args) { auto* call_state = static_cast(args.call_state); - if (call_state->GetCallAttemptTracer() != nullptr) { - call_state->GetCallAttemptTracer()->AddOptionalLabels( + auto* call_attempt_tracer = call_state->GetCallAttemptTracer(); + if (call_attempt_tracer != nullptr) { + call_attempt_tracer->AddOptionalLabels( OptionalLabelComponent::kXdsServiceLabels, service_labels_); } // Handle EDS drops. @@ -404,16 +434,22 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick( PickResult result = picker_->Pick(args); auto* complete_pick = absl::get_if(&result.result); if (complete_pick != nullptr) { + auto* subchannel_wrapper = + static_cast(complete_pick->subchannel.get()); + // Add locality labels to per-call metrics if needed. + if (call_attempt_tracer != nullptr) { + call_attempt_tracer->AddOptionalLabels( + OptionalLabelComponent::kXdsLocalityLabels, + subchannel_wrapper->locality_labels()); + } + // Handle load reporting. RefCountedPtr locality_stats; - if (drop_stats_ != nullptr) { // If load reporting is enabled. - auto* subchannel_wrapper = - static_cast(complete_pick->subchannel.get()); - // Handle load reporting. + if (subchannel_wrapper->locality_stats() != nullptr) { locality_stats = subchannel_wrapper->locality_stats()->Ref( DEBUG_LOCATION, "SubchannelCallTracker"); - // Unwrap subchannel to pass back up the stack. - complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel(); } + // Unwrap subchannel to pass back up the stack. + complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel(); // Inject subchannel call tracker to record call completion. complete_pick->subchannel_call_tracker = std::make_unique( @@ -743,36 +779,39 @@ RefCountedPtr XdsClusterImplLb::Helper::CreateSubchannel( const grpc_resolved_address& address, const ChannelArgs& per_address_args, const ChannelArgs& args) { if (parent()->shutting_down_) return nullptr; - // If load reporting is enabled, wrap the subchannel such that it - // includes the locality stats object, which will be used by the Picker. + // Wrap the subchannel so that we pass along the locality labels and + // (if load reporting is enabled) the locality stats object, which + // will be used by the picker. + auto locality_name = per_address_args.GetObjectRef(); + RefCountedPtr locality_stats; if (parent()->cluster_resource_->lrs_load_reporting_server.has_value()) { - auto locality_name = per_address_args.GetObjectRef(); - RefCountedPtr locality_stats = - parent()->xds_client_->AddClusterLocalityStats( - parent()->cluster_resource_->lrs_load_reporting_server.value(), - parent()->config_->cluster_name(), - GetEdsResourceName(*parent()->cluster_resource_), - std::move(locality_name)); - if (locality_stats != nullptr) { - return MakeRefCounted( - parent()->channel_control_helper()->CreateSubchannel( - address, per_address_args, args), - std::move(locality_stats)); + locality_stats = parent()->xds_client_->AddClusterLocalityStats( + parent()->cluster_resource_->lrs_load_reporting_server.value(), + parent()->config_->cluster_name(), + GetEdsResourceName(*parent()->cluster_resource_), locality_name); + if (locality_stats == nullptr) { + gpr_log(GPR_ERROR, + "[xds_cluster_impl_lb %p] Failed to get locality stats object " + "for LRS server %s, cluster %s, EDS service name %s; load " + "reports will not be generated", + parent(), + parent() + ->cluster_resource_->lrs_load_reporting_server->server_uri() + .c_str(), + parent()->config_->cluster_name().c_str(), + GetEdsResourceName(*parent()->cluster_resource_).c_str()); } - gpr_log(GPR_ERROR, - "[xds_cluster_impl_lb %p] Failed to get locality stats object for " - "LRS server %s, cluster %s, EDS service name %s; load reports will " - "not be generated (not wrapping subchannel)", - parent(), - parent() - ->cluster_resource_->lrs_load_reporting_server->server_uri() - .c_str(), - parent()->config_->cluster_name().c_str(), - GetEdsResourceName(*parent()->cluster_resource_).c_str()); - } - // Load reporting not enabled, so don't wrap the subchannel. - return parent()->channel_control_helper()->CreateSubchannel( - address, per_address_args, args); + } + StatsSubchannelWrapper::LocalityData locality_data; + if (locality_stats != nullptr) { + locality_data = std::move(locality_stats); + } else { + locality_data = locality_name->locality_labels(); + } + return MakeRefCounted( + parent()->channel_control_helper()->CreateSubchannel( + address, per_address_args, args), + std::move(locality_data)); } void XdsClusterImplLb::Helper::UpdateState( diff --git a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc index 18562fee895..2bef38e84b8 100644 --- a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc @@ -309,12 +309,13 @@ TEST_P(CdsTest, ClusterChangeAfterAdsCallFails) { WaitForBackend(DEBUG_LOCATION, 1); } -TEST_P(CdsTest, VerifyCsmServiceLabelsParsing) { +TEST_P(CdsTest, MetricLabels) { // Injects a fake client call tracer factory. Try keep this at top. grpc_core::FakeClientCallTracerFactory fake_client_call_tracer_factory; - CreateAndStartBackends(1); + CreateAndStartBackends(2); // Populates EDS resources. - EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}, + {"locality1", CreateEndpointsForBackends(1, 2)}}); balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); // Populates service labels to CDS resources. auto cluster = default_cluster_; @@ -328,17 +329,46 @@ TEST_P(CdsTest, VerifyCsmServiceLabelsParsing) { channel_args.SetPointer(GRPC_ARG_INJECT_FAKE_CLIENT_CALL_TRACER_FACTORY, &fake_client_call_tracer_factory); ResetStub(/*failover_timeout_ms=*/0, &channel_args); - // Sends an RPC and verifies that the service labels are recorded in the fake - // client call tracer. - CheckRpcSendOk(DEBUG_LOCATION); - EXPECT_THAT(fake_client_call_tracer_factory.GetLastFakeClientCallTracer() - ->GetLastCallAttemptTracer() - ->GetOptionalLabels(), - ::testing::ElementsAre(::testing::Pair( - OptionalLabelComponent::kXdsServiceLabels, - ::testing::Pointee(::testing::ElementsAre( - ::testing::Pair("service_name", "myservice"), - ::testing::Pair("service_namespace", "mynamespace")))))); + // Send an RPC to backend 0. + WaitForBackend(DEBUG_LOCATION, 0); + // Verify that the optional labels are recorded in the call tracer. + EXPECT_THAT( + fake_client_call_tracer_factory.GetLastFakeClientCallTracer() + ->GetLastCallAttemptTracer() + ->GetOptionalLabels(), + ::testing::ElementsAre( + ::testing::Pair( + OptionalLabelComponent::kXdsServiceLabels, + ::testing::Pointee(::testing::ElementsAre( + ::testing::Pair("service_name", "myservice"), + ::testing::Pair("service_namespace", "mynamespace")))), + ::testing::Pair( + OptionalLabelComponent::kXdsLocalityLabels, + ::testing::Pointee(::testing::ElementsAre(::testing::Pair( + "grpc.lb.locality", LocalityNameString("locality0"))))))); + // Send an RPC to backend 1. + WaitForBackend(DEBUG_LOCATION, 1); + // Verify that the optional labels are recorded in the call tracer. + EXPECT_THAT( + fake_client_call_tracer_factory.GetLastFakeClientCallTracer() + ->GetLastCallAttemptTracer() + ->GetOptionalLabels(), + ::testing::ElementsAre( + ::testing::Pair( + OptionalLabelComponent::kXdsServiceLabels, + ::testing::Pointee(::testing::ElementsAre( + ::testing::Pair("service_name", "myservice"), + ::testing::Pair("service_namespace", "mynamespace")))), + ::testing::Pair( + OptionalLabelComponent::kXdsLocalityLabels, + ::testing::Pointee(::testing::ElementsAre(::testing::Pair( + "grpc.lb.locality", LocalityNameString("locality1"))))))); + // TODO(yashkt, yijiem): This shutdown shouldn't actually be necessary. + // The only reason it's here is to add a delay before + // fake_client_call_tracer_factory goes out of scope, since there may + // be lingering callbacks in the call stack that are using the + // CallAttemptTracer even after we get here, which would then cause a + // crash. Find a cleaner way to fix this. balancer_->Shutdown(); } diff --git a/test/cpp/end2end/xds/xds_utils.cc b/test/cpp/end2end/xds/xds_utils.cc index 28f58988169..b1d42bdde79 100644 --- a/test/cpp/end2end/xds/xds_utils.cc +++ b/test/cpp/end2end/xds/xds_utils.cc @@ -317,6 +317,12 @@ void XdsResourceUtils::SetRouteConfiguration( } } +std::string XdsResourceUtils::LocalityNameString(absl::string_view sub_zone) { + return absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}", + kDefaultLocalityRegion, kDefaultLocalityZone, + sub_zone); +} + ClusterLoadAssignment XdsResourceUtils::BuildEdsResource( const EdsResourceArgs& args, absl::string_view eds_service_name) { ClusterLoadAssignment assignment; diff --git a/test/cpp/end2end/xds/xds_utils.h b/test/cpp/end2end/xds/xds_utils.h index 6acfbc6cbe6..e12d36c2cfb 100644 --- a/test/cpp/end2end/xds/xds_utils.h +++ b/test/cpp/end2end/xds/xds_utils.h @@ -195,6 +195,9 @@ class XdsResourceUtils { bool use_rds = false, const Listener* listener_to_copy = nullptr); + // Returns a string representing the locality with the specified sub_zone. + static std::string LocalityNameString(absl::string_view sub_zone); + // Arguments for constructing an EDS resource. struct EdsResourceArgs { // An individual endpoint for a backend running on a specified port. diff --git a/test/cpp/end2end/xds/xds_wrr_end2end_test.cc b/test/cpp/end2end/xds/xds_wrr_end2end_test.cc index 16799140b8a..a89a3021e73 100644 --- a/test/cpp/end2end/xds/xds_wrr_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_wrr_end2end_test.cc @@ -48,12 +48,6 @@ class WrrTest : public XdsEnd2endTest { void SetUp() override { // No-op -- tests must explicitly call InitClient(). } - - static std::string LocalityNameString(absl::string_view sub_zone) { - return absl::StrFormat("{region=\"%s\", zone=\"%s\", sub_zone=\"%s\"}", - kDefaultLocalityRegion, kDefaultLocalityZone, - sub_zone); - } }; INSTANTIATE_TEST_SUITE_P(XdsTest, WrrTest, ::testing::Values(XdsTestType()),