change xds_cluster_impl to use new LRS API

pull/37605/head
Mark D. Roth 3 months ago
parent 1beea74ebb
commit e09ce612cd
  1. 37
      src/core/load_balancing/xds/xds_cluster_impl.cc
  2. 5
      src/core/xds/grpc/xds_client_grpc.cc
  3. 4
      src/core/xds/grpc/xds_client_grpc.h
  4. 2
      src/core/xds/xds_client/xds_client.h

@ -189,13 +189,13 @@ class XdsClusterImplLb final : public LoadBalancingPolicy {
private: private:
class StatsSubchannelWrapper final : public DelegatingSubchannel { class StatsSubchannelWrapper final : public DelegatingSubchannel {
public: public:
// If load reporting is enabled and we have an XdsClusterLocalityStats // If load reporting is enabled and we have a ClusterLocalityStats
// object, that object already contains the locality label. We // object, that object already contains the locality label. We
// need to store the locality label directly only in the case where // need to store the locality label directly only in the case where
// load reporting is disabled. // load reporting is disabled.
using LocalityData = absl::variant< using LocalityData = absl::variant<
RefCountedStringValue /*locality*/, RefCountedStringValue /*locality*/,
RefCountedPtr<XdsClusterLocalityStats> /*locality_stats*/>; RefCountedPtr<LrsClient::ClusterLocalityStats> /*locality_stats*/>;
StatsSubchannelWrapper( StatsSubchannelWrapper(
RefCountedPtr<SubchannelInterface> wrapped_subchannel, RefCountedPtr<SubchannelInterface> wrapped_subchannel,
@ -209,18 +209,20 @@ class XdsClusterImplLb final : public LoadBalancingPolicy {
return Match( return Match(
locality_data_, locality_data_,
[](RefCountedStringValue locality) { return locality; }, [](RefCountedStringValue locality) { return locality; },
[](const RefCountedPtr<XdsClusterLocalityStats>& locality_stats) { [](const RefCountedPtr<LrsClient::ClusterLocalityStats>&
locality_stats) {
return locality_stats->locality_name()->human_readable_string(); return locality_stats->locality_name()->human_readable_string();
}); });
} }
XdsClusterLocalityStats* locality_stats() const { LrsClient::ClusterLocalityStats* locality_stats() const {
return Match( return Match(
locality_data_, locality_data_,
[](const RefCountedStringValue&) { [](const RefCountedStringValue&) {
return static_cast<XdsClusterLocalityStats*>(nullptr); return static_cast<LrsClient::ClusterLocalityStats*>(nullptr);
}, },
[](const RefCountedPtr<XdsClusterLocalityStats>& locality_stats) { [](const RefCountedPtr<LrsClient::ClusterLocalityStats>&
locality_stats) {
return locality_stats.get(); return locality_stats.get();
}); });
} }
@ -250,7 +252,7 @@ class XdsClusterImplLb final : public LoadBalancingPolicy {
RefCountedStringValue service_telemetry_label_; RefCountedStringValue service_telemetry_label_;
RefCountedStringValue namespace_telemetry_label_; RefCountedStringValue namespace_telemetry_label_;
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_; RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
RefCountedPtr<XdsClusterDropStats> drop_stats_; RefCountedPtr<LrsClient::ClusterDropStats> drop_stats_;
RefCountedPtr<SubchannelPicker> picker_; RefCountedPtr<SubchannelPicker> picker_;
}; };
@ -304,7 +306,7 @@ class XdsClusterImplLb final : public LoadBalancingPolicy {
RefCountedPtr<GrpcXdsClient> xds_client_; RefCountedPtr<GrpcXdsClient> xds_client_;
// The stats for client-side load reporting. // The stats for client-side load reporting.
RefCountedPtr<XdsClusterDropStats> drop_stats_; RefCountedPtr<LrsClient::ClusterDropStats> drop_stats_;
OrphanablePtr<LoadBalancingPolicy> child_policy_; OrphanablePtr<LoadBalancingPolicy> child_policy_;
@ -324,7 +326,7 @@ class XdsClusterImplLb::Picker::SubchannelCallTracker final
SubchannelCallTracker( SubchannelCallTracker(
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface> std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
original_subchannel_call_tracker, original_subchannel_call_tracker,
RefCountedPtr<XdsClusterLocalityStats> locality_stats, RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats,
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter) RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter)
: original_subchannel_call_tracker_( : original_subchannel_call_tracker_(
std::move(original_subchannel_call_tracker)), std::move(original_subchannel_call_tracker)),
@ -380,7 +382,7 @@ class XdsClusterImplLb::Picker::SubchannelCallTracker final
private: private:
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface> std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
original_subchannel_call_tracker_; original_subchannel_call_tracker_;
RefCountedPtr<XdsClusterLocalityStats> locality_stats_; RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats_;
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_; RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
#ifndef NDEBUG #ifndef NDEBUG
bool started_ = false; bool started_ = false;
@ -454,7 +456,7 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
subchannel_wrapper->locality()); subchannel_wrapper->locality());
} }
// Handle load reporting. // Handle load reporting.
RefCountedPtr<XdsClusterLocalityStats> locality_stats; RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats;
if (subchannel_wrapper->locality_stats() != nullptr) { if (subchannel_wrapper->locality_stats() != nullptr) {
locality_stats = subchannel_wrapper->locality_stats()->Ref( locality_stats = subchannel_wrapper->locality_stats()->Ref(
DEBUG_LOCATION, "SubchannelCallTracker"); DEBUG_LOCATION, "SubchannelCallTracker");
@ -624,7 +626,7 @@ absl::Status XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
old_eds_service_name != new_eds_service_name || old_eds_service_name != new_eds_service_name ||
cluster_resource_->lrs_load_reporting_server != cluster_resource_->lrs_load_reporting_server !=
new_cluster_config.cluster->lrs_load_reporting_server) { new_cluster_config.cluster->lrs_load_reporting_server) {
drop_stats_ = xds_client_->AddClusterDropStats( drop_stats_ = xds_client_->lrs_client().AddClusterDropStats(
*new_cluster_config.cluster->lrs_load_reporting_server, *new_cluster_config.cluster->lrs_load_reporting_server,
new_config->cluster_name(), new_eds_service_name); new_config->cluster_name(), new_eds_service_name);
if (drop_stats_ == nullptr) { if (drop_stats_ == nullptr) {
@ -819,12 +821,13 @@ RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
// (if load reporting is enabled) the locality stats object, which // (if load reporting is enabled) the locality stats object, which
// will be used by the picker. // will be used by the picker.
auto locality_name = per_address_args.GetObjectRef<XdsLocalityName>(); auto locality_name = per_address_args.GetObjectRef<XdsLocalityName>();
RefCountedPtr<XdsClusterLocalityStats> locality_stats; RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats;
if (parent()->cluster_resource_->lrs_load_reporting_server.has_value()) { if (parent()->cluster_resource_->lrs_load_reporting_server.has_value()) {
locality_stats = parent()->xds_client_->AddClusterLocalityStats( locality_stats =
parent()->cluster_resource_->lrs_load_reporting_server.value(), parent()->xds_client_->lrs_client().AddClusterLocalityStats(
parent()->config_->cluster_name(), parent()->cluster_resource_->lrs_load_reporting_server.value(),
GetEdsResourceName(*parent()->cluster_resource_), locality_name); parent()->config_->cluster_name(),
GetEdsResourceName(*parent()->cluster_resource_), locality_name);
if (locality_stats == nullptr) { if (locality_stats == nullptr) {
LOG(ERROR) LOG(ERROR)
<< "[xds_cluster_impl_lb " << parent() << "[xds_cluster_impl_lb " << parent()

@ -338,6 +338,11 @@ void GrpcXdsClient::Orphaned() {
} }
} }
void GrpcXdsClient::ResetBackoff() {
XdsClient::ResetBackoff();
lrs_client_->ResetBackoff();
}
grpc_pollset_set* GrpcXdsClient::interested_parties() const { grpc_pollset_set* GrpcXdsClient::interested_parties() const {
return reinterpret_cast<GrpcXdsTransportFactory*>(transport_factory()) return reinterpret_cast<GrpcXdsTransportFactory*>(transport_factory())
->interested_parties(); ->interested_parties();

@ -74,6 +74,8 @@ class GrpcXdsClient final : public XdsClient {
return QsortCompare(a, b); return QsortCompare(a, b);
} }
void ResetBackoff() override;
grpc_pollset_set* interested_parties() const; grpc_pollset_set* interested_parties() const;
CertificateProviderStore& certificate_provider_store() const { CertificateProviderStore& certificate_provider_store() const {
@ -82,7 +84,7 @@ class GrpcXdsClient final : public XdsClient {
absl::string_view key() const { return key_; } absl::string_view key() const { return key_; }
RefCountedPtr<LrsClient> lrs_client() const { return lrs_client_; } LrsClient& lrs_client() { return *lrs_client_; }
// Builds ClientStatusResponse containing all resources from all XdsClients // Builds ClientStatusResponse containing all resources from all XdsClients
static grpc_slice DumpAllClientConfigs(); static grpc_slice DumpAllClientConfigs();

@ -148,7 +148,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
XdsClusterLocalityStats* cluster_locality_stats); XdsClusterLocalityStats* cluster_locality_stats);
// Resets connection backoff state. // Resets connection backoff state.
void ResetBackoff(); virtual void ResetBackoff();
grpc_event_engine::experimental::EventEngine* engine() { grpc_event_engine::experimental::EventEngine* engine() {
return engine_.get(); return engine_.get();

Loading…
Cancel
Save