From e09ce612cd5fd53f5fb8f66a786c414ca3a90234 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 30 Aug 2024 22:17:07 +0000 Subject: [PATCH] change xds_cluster_impl to use new LRS API --- .../load_balancing/xds/xds_cluster_impl.cc | 37 ++++++++++--------- src/core/xds/grpc/xds_client_grpc.cc | 5 +++ src/core/xds/grpc/xds_client_grpc.h | 4 +- src/core/xds/xds_client/xds_client.h | 2 +- 4 files changed, 29 insertions(+), 19 deletions(-) diff --git a/src/core/load_balancing/xds/xds_cluster_impl.cc b/src/core/load_balancing/xds/xds_cluster_impl.cc index e66330a21da..630f7f7db31 100644 --- a/src/core/load_balancing/xds/xds_cluster_impl.cc +++ b/src/core/load_balancing/xds/xds_cluster_impl.cc @@ -189,13 +189,13 @@ class XdsClusterImplLb final : public LoadBalancingPolicy { private: class StatsSubchannelWrapper final : public DelegatingSubchannel { public: - // If load reporting is enabled and we have an XdsClusterLocalityStats + // If load reporting is enabled and we have a ClusterLocalityStats // object, that object already contains the locality label. We // need to store the locality label directly only in the case where // load reporting is disabled. using LocalityData = absl::variant< RefCountedStringValue /*locality*/, - RefCountedPtr /*locality_stats*/>; + RefCountedPtr /*locality_stats*/>; StatsSubchannelWrapper( RefCountedPtr wrapped_subchannel, @@ -209,18 +209,20 @@ class XdsClusterImplLb final : public LoadBalancingPolicy { return Match( locality_data_, [](RefCountedStringValue locality) { return locality; }, - [](const RefCountedPtr& locality_stats) { + [](const RefCountedPtr& + locality_stats) { return locality_stats->locality_name()->human_readable_string(); }); } - XdsClusterLocalityStats* locality_stats() const { + LrsClient::ClusterLocalityStats* locality_stats() const { return Match( locality_data_, [](const RefCountedStringValue&) { - return static_cast(nullptr); + return static_cast(nullptr); }, - [](const RefCountedPtr& locality_stats) { + [](const RefCountedPtr& + locality_stats) { return locality_stats.get(); }); } @@ -250,7 +252,7 @@ class XdsClusterImplLb final : public LoadBalancingPolicy { RefCountedStringValue service_telemetry_label_; RefCountedStringValue namespace_telemetry_label_; RefCountedPtr drop_config_; - RefCountedPtr drop_stats_; + RefCountedPtr drop_stats_; RefCountedPtr picker_; }; @@ -304,7 +306,7 @@ class XdsClusterImplLb final : public LoadBalancingPolicy { RefCountedPtr xds_client_; // The stats for client-side load reporting. - RefCountedPtr drop_stats_; + RefCountedPtr drop_stats_; OrphanablePtr child_policy_; @@ -324,7 +326,7 @@ class XdsClusterImplLb::Picker::SubchannelCallTracker final SubchannelCallTracker( std::unique_ptr original_subchannel_call_tracker, - RefCountedPtr locality_stats, + RefCountedPtr locality_stats, RefCountedPtr call_counter) : original_subchannel_call_tracker_( std::move(original_subchannel_call_tracker)), @@ -380,7 +382,7 @@ class XdsClusterImplLb::Picker::SubchannelCallTracker final private: std::unique_ptr original_subchannel_call_tracker_; - RefCountedPtr locality_stats_; + RefCountedPtr locality_stats_; RefCountedPtr call_counter_; #ifndef NDEBUG bool started_ = false; @@ -454,7 +456,7 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick( subchannel_wrapper->locality()); } // Handle load reporting. - RefCountedPtr locality_stats; + RefCountedPtr locality_stats; if (subchannel_wrapper->locality_stats() != nullptr) { locality_stats = subchannel_wrapper->locality_stats()->Ref( DEBUG_LOCATION, "SubchannelCallTracker"); @@ -624,7 +626,7 @@ absl::Status XdsClusterImplLb::UpdateLocked(UpdateArgs args) { old_eds_service_name != new_eds_service_name || cluster_resource_->lrs_load_reporting_server != new_cluster_config.cluster->lrs_load_reporting_server) { - drop_stats_ = xds_client_->AddClusterDropStats( + drop_stats_ = xds_client_->lrs_client().AddClusterDropStats( *new_cluster_config.cluster->lrs_load_reporting_server, new_config->cluster_name(), new_eds_service_name); if (drop_stats_ == nullptr) { @@ -819,12 +821,13 @@ RefCountedPtr XdsClusterImplLb::Helper::CreateSubchannel( // (if load reporting is enabled) the locality stats object, which // will be used by the picker. auto locality_name = per_address_args.GetObjectRef(); - RefCountedPtr locality_stats; + RefCountedPtr locality_stats; if (parent()->cluster_resource_->lrs_load_reporting_server.has_value()) { - locality_stats = parent()->xds_client_->AddClusterLocalityStats( - parent()->cluster_resource_->lrs_load_reporting_server.value(), - parent()->config_->cluster_name(), - GetEdsResourceName(*parent()->cluster_resource_), locality_name); + locality_stats = + parent()->xds_client_->lrs_client().AddClusterLocalityStats( + parent()->cluster_resource_->lrs_load_reporting_server.value(), + parent()->config_->cluster_name(), + GetEdsResourceName(*parent()->cluster_resource_), locality_name); if (locality_stats == nullptr) { LOG(ERROR) << "[xds_cluster_impl_lb " << parent() diff --git a/src/core/xds/grpc/xds_client_grpc.cc b/src/core/xds/grpc/xds_client_grpc.cc index ad6d96d1fcd..ca2d91cb7fc 100644 --- a/src/core/xds/grpc/xds_client_grpc.cc +++ b/src/core/xds/grpc/xds_client_grpc.cc @@ -338,6 +338,11 @@ void GrpcXdsClient::Orphaned() { } } +void GrpcXdsClient::ResetBackoff() { + XdsClient::ResetBackoff(); + lrs_client_->ResetBackoff(); +} + grpc_pollset_set* GrpcXdsClient::interested_parties() const { return reinterpret_cast(transport_factory()) ->interested_parties(); diff --git a/src/core/xds/grpc/xds_client_grpc.h b/src/core/xds/grpc/xds_client_grpc.h index c68344614dc..fd5d46f9f6b 100644 --- a/src/core/xds/grpc/xds_client_grpc.h +++ b/src/core/xds/grpc/xds_client_grpc.h @@ -74,6 +74,8 @@ class GrpcXdsClient final : public XdsClient { return QsortCompare(a, b); } + void ResetBackoff() override; + grpc_pollset_set* interested_parties() const; CertificateProviderStore& certificate_provider_store() const { @@ -82,7 +84,7 @@ class GrpcXdsClient final : public XdsClient { absl::string_view key() const { return key_; } - RefCountedPtr lrs_client() const { return lrs_client_; } + LrsClient& lrs_client() { return *lrs_client_; } // Builds ClientStatusResponse containing all resources from all XdsClients static grpc_slice DumpAllClientConfigs(); diff --git a/src/core/xds/xds_client/xds_client.h b/src/core/xds/xds_client/xds_client.h index bc72da75e16..50d5c11c0a2 100644 --- a/src/core/xds/xds_client/xds_client.h +++ b/src/core/xds/xds_client/xds_client.h @@ -148,7 +148,7 @@ class XdsClient : public DualRefCounted { XdsClusterLocalityStats* cluster_locality_stats); // Resets connection backoff state. - void ResetBackoff(); + virtual void ResetBackoff(); grpc_event_engine::experimental::EventEngine* engine() { return engine_.get();