diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 1a7a27a00ae..953e1a4bc3d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -199,206 +199,153 @@ class XdsLb : public LoadBalancingPolicy { LoadBalancingPolicy* child_ = nullptr; }; - // There is only one PriorityList instance, which has the same lifetime with - // the XdsLb instance. - class PriorityList { + // Each LocalityMap holds a ref to the XdsLb. + class LocalityMap : public InternallyRefCounted { public: - // Each LocalityMap holds a ref to the XdsLb. - class LocalityMap : public InternallyRefCounted { + // Each Locality holds a ref to the LocalityMap it is in. + class Locality : public InternallyRefCounted { public: - // Each Locality holds a ref to the LocalityMap it is in. - class Locality : public InternallyRefCounted { - public: - Locality(RefCountedPtr locality_map, - RefCountedPtr name); - ~Locality(); - - void UpdateLocked(uint32_t locality_weight, - ServerAddressList serverlist); - void ShutdownLocked(); - void ResetBackoffLocked(); - void DeactivateLocked(); - void Orphan() override; - - grpc_connectivity_state connectivity_state() const { - return connectivity_state_; - } - uint32_t weight() const { return weight_; } - RefCountedPtr picker_wrapper() const { - return picker_wrapper_; - } - - void set_locality_map(RefCountedPtr locality_map) { - locality_map_ = std::move(locality_map); - } - - private: - class Helper : public ChannelControlHelper { - public: - explicit Helper(RefCountedPtr locality) - : locality_(std::move(locality)) {} - - ~Helper() { locality_.reset(DEBUG_LOCATION, "Helper"); } - - RefCountedPtr CreateSubchannel( - const grpc_channel_args& args) override; - void UpdateState(grpc_connectivity_state state, - std::unique_ptr picker) override; - // This is a no-op, because we get the addresses from the xds - // client, which is a watch-based API. - void RequestReresolution() override {} - void AddTraceEvent(TraceSeverity severity, - StringView message) override; - void set_child(LoadBalancingPolicy* child) { child_ = child; } - - private: - bool CalledByPendingChild() const; - bool CalledByCurrentChild() const; - - RefCountedPtr locality_; - LoadBalancingPolicy* child_ = nullptr; - }; - - // Methods for dealing with the child policy. - OrphanablePtr CreateChildPolicyLocked( - const char* name, const grpc_channel_args* args); - grpc_channel_args* CreateChildPolicyArgsLocked( - const grpc_channel_args* args); - - static void OnDelayedRemovalTimer(void* arg, grpc_error* error); - static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); - - XdsLb* xds_policy() const { return locality_map_->xds_policy(); } - - // The owning locality map. - RefCountedPtr locality_map_; - - RefCountedPtr name_; - OrphanablePtr child_policy_; - OrphanablePtr pending_child_policy_; - RefCountedPtr picker_wrapper_; - grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; - uint32_t weight_; - - // States for delayed removal. - grpc_timer delayed_removal_timer_; - grpc_closure on_delayed_removal_timer_; - bool delayed_removal_timer_callback_pending_ = false; - bool shutdown_ = false; - }; - - LocalityMap(RefCountedPtr xds_policy, uint32_t priority); + Locality(RefCountedPtr locality_map, + RefCountedPtr name); + ~Locality(); - ~LocalityMap() { xds_policy_.reset(DEBUG_LOCATION, "LocalityMap"); } - - void UpdateLocked( - const XdsPriorityListUpdate::LocalityMap& locality_map_update); + void UpdateLocked(uint32_t locality_weight, ServerAddressList serverlist); + void ShutdownLocked(); void ResetBackoffLocked(); - void UpdateXdsPickerLocked(); - OrphanablePtr ExtractLocalityLocked( - const RefCountedPtr& name); void DeactivateLocked(); - // Returns true if this locality map becomes the currently used one (i.e., - // its priority is selected) after reactivation. - bool MaybeReactivateLocked(); - void MaybeCancelFailoverTimerLocked(); - void Orphan() override; - XdsLb* xds_policy() const { return xds_policy_.get(); } - uint32_t priority() const { return priority_; } grpc_connectivity_state connectivity_state() const { return connectivity_state_; } - bool failover_timer_callback_pending() const { - return failover_timer_callback_pending_; + uint32_t weight() const { return weight_; } + RefCountedPtr picker_wrapper() const { + return picker_wrapper_; + } + + void set_locality_map(RefCountedPtr locality_map) { + locality_map_ = std::move(locality_map); } private: - void OnLocalityStateUpdateLocked(); - void UpdateConnectivityStateLocked(); + class Helper : public ChannelControlHelper { + public: + explicit Helper(RefCountedPtr locality) + : locality_(std::move(locality)) {} + + ~Helper() { locality_.reset(DEBUG_LOCATION, "Helper"); } + + RefCountedPtr CreateSubchannel( + const grpc_channel_args& args) override; + void UpdateState(grpc_connectivity_state state, + std::unique_ptr picker) override; + // This is a no-op, because we get the addresses from the xds + // client, which is a watch-based API. + void RequestReresolution() override {} + void AddTraceEvent(TraceSeverity severity, StringView message) override; + void set_child(LoadBalancingPolicy* child) { child_ = child; } + + private: + bool CalledByPendingChild() const; + bool CalledByCurrentChild() const; + + RefCountedPtr locality_; + LoadBalancingPolicy* child_ = nullptr; + }; + + // Methods for dealing with the child policy. + OrphanablePtr CreateChildPolicyLocked( + const char* name, const grpc_channel_args* args); + grpc_channel_args* CreateChildPolicyArgsLocked( + const grpc_channel_args* args); + static void OnDelayedRemovalTimer(void* arg, grpc_error* error); - static void OnFailoverTimer(void* arg, grpc_error* error); static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); - static void OnFailoverTimerLocked(void* arg, grpc_error* error); - PriorityList* priority_list() const { - return &xds_policy_->priority_list_; - } - const XdsPriorityListUpdate& priority_list_update() const { - return xds_policy_->priority_list_update_; - } - const XdsPriorityListUpdate::LocalityMap* locality_map_update() const { - return xds_policy_->priority_list_update_.Find(priority_); - } + XdsLb* xds_policy() const { return locality_map_->xds_policy(); } - RefCountedPtr xds_policy_; + // The owning locality map. + RefCountedPtr locality_map_; - std::map, OrphanablePtr, - XdsLocalityName::Less> - localities_; - const uint32_t priority_; + RefCountedPtr name_; + OrphanablePtr child_policy_; + OrphanablePtr pending_child_policy_; + RefCountedPtr picker_wrapper_; grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; + uint32_t weight_; // States for delayed removal. grpc_timer delayed_removal_timer_; grpc_closure on_delayed_removal_timer_; bool delayed_removal_timer_callback_pending_ = false; - - // States of failover. - grpc_timer failover_timer_; - grpc_closure on_failover_timer_; - bool failover_timer_callback_pending_ = false; + bool shutdown_ = false; }; - explicit PriorityList(XdsLb* xds_policy) : xds_policy_(xds_policy) {} + LocalityMap(RefCountedPtr xds_policy, uint32_t priority); + + ~LocalityMap() { xds_policy_.reset(DEBUG_LOCATION, "LocalityMap"); } - void UpdateLocked(); + void UpdateLocked( + const XdsPriorityListUpdate::LocalityMap& locality_map_update); void ResetBackoffLocked(); - void ShutdownLocked(); void UpdateXdsPickerLocked(); + OrphanablePtr ExtractLocalityLocked( + const RefCountedPtr& name); + void DeactivateLocked(); + // Returns true if this locality map becomes the currently used one (i.e., + // its priority is selected) after reactivation. + bool MaybeReactivateLocked(); + void MaybeCancelFailoverTimerLocked(); + + void Orphan() override; + + XdsLb* xds_policy() const { return xds_policy_.get(); } + uint32_t priority() const { return priority_; } + grpc_connectivity_state connectivity_state() const { + return connectivity_state_; + } + bool failover_timer_callback_pending() const { + return failover_timer_callback_pending_; + } + + private: + void OnLocalityStateUpdateLocked(); + void UpdateConnectivityStateLocked(); + static void OnDelayedRemovalTimer(void* arg, grpc_error* error); + static void OnFailoverTimer(void* arg, grpc_error* error); + static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); + static void OnFailoverTimerLocked(void* arg, grpc_error* error); const XdsPriorityListUpdate& priority_list_update() const { return xds_policy_->priority_list_update_; } - uint32_t current_priority() const { return current_priority_; } - - private: - void MaybeCreateLocalityMapLocked(uint32_t priority); - void FailoverOnConnectionFailureLocked(); - void FailoverOnDisconnectionLocked(uint32_t failed_priority); - void SwitchToHigherPriorityLocked(uint32_t priority); - void DeactivatePrioritiesLowerThan(uint32_t priority); - OrphanablePtr ExtractLocalityLocked( - const RefCountedPtr& name, uint32_t exclude_priority); - // Callers should make sure the priority list is non-empty. - uint32_t LowestPriority() const { - return static_cast(priorities_.size()) - 1; + const XdsPriorityListUpdate::LocalityMap* locality_map_update() const { + return xds_policy_->priority_list_update_.Find(priority_); } - bool Contains(uint32_t priority) { return priority < priorities_.size(); } - XdsLb* xds_policy_; + RefCountedPtr xds_policy_; - // The list of locality maps, indexed by priority. P0 is the highest - // priority. - InlinedVector, 2> priorities_; - // The priority that is being used. - uint32_t current_priority_ = UINT32_MAX; + std::map, OrphanablePtr, + XdsLocalityName::Less> + localities_; + const uint32_t priority_; + grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; + + // States for delayed removal. + grpc_timer delayed_removal_timer_; + grpc_closure on_delayed_removal_timer_; + bool delayed_removal_timer_callback_pending_ = false; + + // States of failover. + grpc_timer failover_timer_; + grpc_closure on_failover_timer_; + bool failover_timer_callback_pending_ = false; }; ~XdsLb(); void ShutdownLocked() override; - // Methods for dealing with fallback state. - void MaybeCancelFallbackAtStartupChecks(); - static void OnFallbackTimer(void* arg, grpc_error* error); - static void OnFallbackTimerLocked(void* arg, grpc_error* error); - void UpdateFallbackPolicyLocked(); - OrphanablePtr CreateFallbackPolicyLocked( - const char* name, const grpc_channel_args* args); - void MaybeExitFallbackMode(); - const char* eds_service_name() const { if (config_ != nullptr && config_->eds_service_name() != nullptr) { return config_->eds_service_name(); @@ -411,6 +358,30 @@ class XdsLb : public LoadBalancingPolicy { : xds_client_.get(); } + void UpdatePrioritiesLocked(); + void UpdateXdsPickerLocked(); + void MaybeCreateLocalityMapLocked(uint32_t priority); + void FailoverOnConnectionFailureLocked(); + void FailoverOnDisconnectionLocked(uint32_t failed_priority); + void SwitchToHigherPriorityLocked(uint32_t priority); + void DeactivatePrioritiesLowerThan(uint32_t priority); + OrphanablePtr ExtractLocalityLocked( + const RefCountedPtr& name, uint32_t exclude_priority); + // Callers should make sure the priority list is non-empty. + uint32_t LowestPriority() const { + return static_cast(priorities_.size()) - 1; + } + bool Contains(uint32_t priority) { return priority < priorities_.size(); } + + // Methods for dealing with fallback state. + void MaybeCancelFallbackAtStartupChecks(); + static void OnFallbackTimer(void* arg, grpc_error* error); + static void OnFallbackTimerLocked(void* arg, grpc_error* error); + void UpdateFallbackPolicyLocked(); + OrphanablePtr CreateFallbackPolicyLocked( + const char* name, const grpc_channel_args* args); + void MaybeExitFallbackMode(); + // Server name from target URI. std::string server_name_; @@ -454,8 +425,11 @@ class XdsLb : public LoadBalancingPolicy { const grpc_millis locality_retention_interval_ms_; const grpc_millis locality_map_failover_timeout_ms_; - // A list of locality maps indexed by priority. - PriorityList priority_list_; + // The list of locality maps, indexed by priority. P0 is the highest + // priority. + InlinedVector, 2> priorities_; + // The priority that is being used. + uint32_t current_priority_ = UINT32_MAX; // The update for priority_list_. XdsPriorityListUpdate priority_list_update_; @@ -642,13 +616,13 @@ class XdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface { xds_policy_.get(), drop_config_changed); } if (drop_config_changed) { - xds_policy_->priority_list_.UpdateXdsPickerLocked(); + xds_policy_->UpdateXdsPickerLocked(); } return; } // Update the priority list. xds_policy_->priority_list_update_ = std::move(update.priority_list_update); - xds_policy_->priority_list_.UpdateLocked(); + xds_policy_->UpdatePrioritiesLocked(); } void OnError(grpc_error* error) override { @@ -701,8 +675,7 @@ XdsLb::XdsLb(Args args) {GRPC_XDS_DEFAULT_LOCALITY_RETENTION_INTERVAL_MS, 0, INT_MAX})), locality_map_failover_timeout_ms_(grpc_channel_args_find_integer( args.args, GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS, - {GRPC_XDS_DEFAULT_FAILOVER_TIMEOUT_MS, 0, INT_MAX})), - priority_list_(this) { + {GRPC_XDS_DEFAULT_FAILOVER_TIMEOUT_MS, 0, INT_MAX})) { if (xds_client_from_channel_ != nullptr && GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p] Using xds client %p from channel", this, @@ -735,7 +708,7 @@ void XdsLb::ShutdownLocked() { } shutting_down_ = true; MaybeCancelFallbackAtStartupChecks(); - priority_list_.ShutdownLocked(); + priorities_.clear(); if (fallback_policy_ != nullptr) { grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(), interested_parties()); @@ -775,7 +748,9 @@ void XdsLb::ResetBackoffLocked() { // LB policy, this is done via the resolver, so we don't need to do it // for xds_client_from_channel_ here. if (xds_client_ != nullptr) xds_client_->ResetBackoff(); - priority_list_.ResetBackoffLocked(); + for (size_t i = 0; i < priorities_.size(); ++i) { + priorities_[i]->ResetBackoffLocked(); + } if (fallback_policy_ != nullptr) { fallback_policy_->ResetBackoffLocked(); } @@ -800,7 +775,7 @@ void XdsLb::UpdateLocked(UpdateArgs args) { args_ = args.args; args.args = nullptr; // Update priority list. - priority_list_.UpdateLocked(); + UpdatePrioritiesLocked(); // Update the existing fallback policy. The fallback policy config and/or the // fallback addresses may be new. if (fallback_policy_ != nullptr) UpdateFallbackPolicyLocked(); @@ -1047,17 +1022,16 @@ void XdsLb::MaybeExitFallbackMode() { } // -// XdsLb::PriorityList +// priority list-related methods // -void XdsLb::PriorityList::UpdateLocked() { - const auto& priority_list_update = xds_policy_->priority_list_update_; +void XdsLb::UpdatePrioritiesLocked() { // 1. Remove from the priority list the priorities that are not in the update. - DeactivatePrioritiesLowerThan(priority_list_update.LowestPriority()); + DeactivatePrioritiesLowerThan(priority_list_update_.LowestPriority()); // 2. Update all the existing priorities. for (uint32_t priority = 0; priority < priorities_.size(); ++priority) { LocalityMap* locality_map = priorities_[priority].get(); - const auto* locality_map_update = priority_list_update.Find(priority); + const auto* locality_map_update = priority_list_update_.Find(priority); // Propagate locality_map_update. // TODO(juanlishen): Find a clean way to skip duplicate update for a // priority. @@ -1075,22 +1049,14 @@ void XdsLb::PriorityList::UpdateLocked() { } } -void XdsLb::PriorityList::ResetBackoffLocked() { - for (size_t i = 0; i < priorities_.size(); ++i) { - priorities_[i]->ResetBackoffLocked(); - } -} - -void XdsLb::PriorityList::ShutdownLocked() { priorities_.clear(); } - -void XdsLb::PriorityList::UpdateXdsPickerLocked() { +void XdsLb::UpdateXdsPickerLocked() { // If we are in fallback mode, don't generate an xds picker from localities. - if (xds_policy_->fallback_policy_ != nullptr) return; - if (current_priority() == UINT32_MAX) { + if (fallback_policy_ != nullptr) return; + if (current_priority_ == UINT32_MAX) { grpc_error* error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ready locality map"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); - xds_policy_->channel_control_helper()->UpdateState( + channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_core::MakeUnique(error)); return; @@ -1098,29 +1064,28 @@ void XdsLb::PriorityList::UpdateXdsPickerLocked() { priorities_[current_priority_]->UpdateXdsPickerLocked(); } -void XdsLb::PriorityList::MaybeCreateLocalityMapLocked(uint32_t priority) { +void XdsLb::MaybeCreateLocalityMapLocked(uint32_t priority) { // Exhausted priorities in the update. - if (!priority_list_update().Contains(priority)) return; - auto new_locality_map = new LocalityMap( - xds_policy_->Ref(DEBUG_LOCATION, "LocalityMap"), priority); + if (!priority_list_update_.Contains(priority)) return; + auto new_locality_map = + new LocalityMap(Ref(DEBUG_LOCATION, "LocalityMap"), priority); priorities_.emplace_back(OrphanablePtr(new_locality_map)); - new_locality_map->UpdateLocked(*priority_list_update().Find(priority)); + new_locality_map->UpdateLocked(*priority_list_update_.Find(priority)); } -void XdsLb::PriorityList::FailoverOnConnectionFailureLocked() { +void XdsLb::FailoverOnConnectionFailureLocked() { const uint32_t failed_priority = LowestPriority(); // If we're failing over from the lowest priority, report TRANSIENT_FAILURE. - if (failed_priority == priority_list_update().LowestPriority()) { + if (failed_priority == priority_list_update_.LowestPriority()) { UpdateXdsPickerLocked(); } MaybeCreateLocalityMapLocked(failed_priority + 1); } -void XdsLb::PriorityList::FailoverOnDisconnectionLocked( - uint32_t failed_priority) { +void XdsLb::FailoverOnDisconnectionLocked(uint32_t failed_priority) { current_priority_ = UINT32_MAX; for (uint32_t next_priority = failed_priority + 1; - next_priority <= priority_list_update().LowestPriority(); + next_priority <= priority_list_update_.LowestPriority(); ++next_priority) { if (!Contains(next_priority)) { MaybeCreateLocalityMapLocked(next_priority); @@ -1130,17 +1095,17 @@ void XdsLb::PriorityList::FailoverOnDisconnectionLocked( } } -void XdsLb::PriorityList::SwitchToHigherPriorityLocked(uint32_t priority) { +void XdsLb::SwitchToHigherPriorityLocked(uint32_t priority) { current_priority_ = priority; DeactivatePrioritiesLowerThan(current_priority_); UpdateXdsPickerLocked(); } -void XdsLb::PriorityList::DeactivatePrioritiesLowerThan(uint32_t priority) { +void XdsLb::DeactivatePrioritiesLowerThan(uint32_t priority) { if (priorities_.empty()) return; // Deactivate the locality maps from the lowest priority. for (uint32_t p = LowestPriority(); p > priority; --p) { - if (xds_policy_->locality_retention_interval_ms_ == 0) { + if (locality_retention_interval_ms_ == 0) { priorities_.pop_back(); } else { priorities_[p]->DeactivateLocked(); @@ -1148,8 +1113,7 @@ void XdsLb::PriorityList::DeactivatePrioritiesLowerThan(uint32_t priority) { } } -OrphanablePtr -XdsLb::PriorityList::ExtractLocalityLocked( +OrphanablePtr XdsLb::ExtractLocalityLocked( const RefCountedPtr& name, uint32_t exclude_priority) { for (uint32_t priority = 0; priority < priorities_.size(); ++priority) { if (priority == exclude_priority) continue; @@ -1161,11 +1125,11 @@ XdsLb::PriorityList::ExtractLocalityLocked( } // -// XdsLb::PriorityList::LocalityMap +// XdsLb::LocalityMap // -XdsLb::PriorityList::LocalityMap::LocalityMap(RefCountedPtr xds_policy, - uint32_t priority) +XdsLb::LocalityMap::LocalityMap(RefCountedPtr xds_policy, + uint32_t priority) : xds_policy_(std::move(xds_policy)), priority_(priority) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p] Creating priority %" PRIu32, @@ -1189,7 +1153,7 @@ XdsLb::PriorityList::LocalityMap::LocalityMap(RefCountedPtr xds_policy, } } -void XdsLb::PriorityList::LocalityMap::UpdateLocked( +void XdsLb::LocalityMap::UpdateLocked( const XdsPriorityListUpdate::LocalityMap& locality_map_update) { if (xds_policy_->shutting_down_) return; if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { @@ -1221,7 +1185,7 @@ void XdsLb::PriorityList::LocalityMap::UpdateLocked( OrphanablePtr& locality = localities_[name]; if (locality == nullptr) { // Move from another locality map if possible. - locality = priority_list()->ExtractLocalityLocked(name, priority_); + locality = xds_policy_->ExtractLocalityLocked(name, priority_); if (locality != nullptr) { locality->set_locality_map( Ref(DEBUG_LOCATION, "LocalityMap+Locality_move")); @@ -1237,11 +1201,11 @@ void XdsLb::PriorityList::LocalityMap::UpdateLocked( } } -void XdsLb::PriorityList::LocalityMap::ResetBackoffLocked() { +void XdsLb::LocalityMap::ResetBackoffLocked() { for (auto& p : localities_) p.second->ResetBackoffLocked(); } -void XdsLb::PriorityList::LocalityMap::UpdateXdsPickerLocked() { +void XdsLb::LocalityMap::UpdateXdsPickerLocked() { // Construct a new xds picker which maintains a map of all locality pickers // that are ready. Each locality is represented by a portion of the range // proportional to its weight, such that the total range is the sum of the @@ -1264,8 +1228,8 @@ void XdsLb::PriorityList::LocalityMap::UpdateXdsPickerLocked() { std::move(picker_list))); } -OrphanablePtr -XdsLb::PriorityList::LocalityMap::ExtractLocalityLocked( +OrphanablePtr +XdsLb::LocalityMap::ExtractLocalityLocked( const RefCountedPtr& name) { for (auto iter = localities_.begin(); iter != localities_.end(); ++iter) { const auto& name_in_map = iter->first; @@ -1278,7 +1242,7 @@ XdsLb::PriorityList::LocalityMap::ExtractLocalityLocked( return nullptr; } -void XdsLb::PriorityList::LocalityMap::DeactivateLocked() { +void XdsLb::LocalityMap::DeactivateLocked() { // If already deactivated, don't do it again. if (delayed_removal_timer_callback_pending_) return; MaybeCancelFailoverTimerLocked(); @@ -1299,24 +1263,24 @@ void XdsLb::PriorityList::LocalityMap::DeactivateLocked() { delayed_removal_timer_callback_pending_ = true; } -bool XdsLb::PriorityList::LocalityMap::MaybeReactivateLocked() { +bool XdsLb::LocalityMap::MaybeReactivateLocked() { // Don't reactivate a priority that is not higher than the current one. - if (priority_ >= priority_list()->current_priority()) return false; + if (priority_ >= xds_policy_->current_priority_) return false; // Reactivate this priority by cancelling deletion timer. if (delayed_removal_timer_callback_pending_) { grpc_timer_cancel(&delayed_removal_timer_); } // Switch to this higher priority if it's READY. if (connectivity_state_ != GRPC_CHANNEL_READY) return false; - priority_list()->SwitchToHigherPriorityLocked(priority_); + xds_policy_->SwitchToHigherPriorityLocked(priority_); return true; } -void XdsLb::PriorityList::LocalityMap::MaybeCancelFailoverTimerLocked() { +void XdsLb::LocalityMap::MaybeCancelFailoverTimerLocked() { if (failover_timer_callback_pending_) grpc_timer_cancel(&failover_timer_); } -void XdsLb::PriorityList::LocalityMap::Orphan() { +void XdsLb::LocalityMap::Orphan() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p] Priority %" PRIu32 " orphaned.", xds_policy(), priority_); @@ -1329,11 +1293,11 @@ void XdsLb::PriorityList::LocalityMap::Orphan() { Unref(DEBUG_LOCATION, "LocalityMap+Orphan"); } -void XdsLb::PriorityList::LocalityMap::OnLocalityStateUpdateLocked() { +void XdsLb::LocalityMap::OnLocalityStateUpdateLocked() { UpdateConnectivityStateLocked(); // Ignore priorities not in priority_list_update. if (!priority_list_update().Contains(priority_)) return; - const uint32_t current_priority = priority_list()->current_priority(); + const uint32_t current_priority = xds_policy_->current_priority_; // Ignore lower-than-current priorities. if (priority_ > current_priority) return; // Maybe update fallback state. @@ -1347,13 +1311,13 @@ void XdsLb::PriorityList::LocalityMap::OnLocalityStateUpdateLocked() { if (connectivity_state_ == GRPC_CHANNEL_READY) { MaybeCancelFailoverTimerLocked(); // If a higher-than-current priority becomes READY, switch to use it. - priority_list()->SwitchToHigherPriorityLocked(priority_); + xds_policy_->SwitchToHigherPriorityLocked(priority_); } else if (connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) { // If a higher-than-current priority becomes TRANSIENT_FAILURE, only // handle it if it's the priority that is still in failover timeout. if (failover_timer_callback_pending_) { MaybeCancelFailoverTimerLocked(); - priority_list()->FailoverOnConnectionFailureLocked(); + xds_policy_->FailoverOnConnectionFailureLocked(); } } return; @@ -1361,7 +1325,7 @@ void XdsLb::PriorityList::LocalityMap::OnLocalityStateUpdateLocked() { // Update is for current priority. if (connectivity_state_ != GRPC_CHANNEL_READY) { // Fail over if it's no longer READY. - priority_list()->FailoverOnDisconnectionLocked(priority_); + xds_policy_->FailoverOnDisconnectionLocked(priority_); } // At this point, one of the following things has happened to the current // priority. @@ -1369,10 +1333,10 @@ void XdsLb::PriorityList::LocalityMap::OnLocalityStateUpdateLocked() { // 2. It changed to a lower priority due to failover. // 3. It became invalid because failover didn't yield a READY priority. // In any case, update the xds picker. - priority_list()->UpdateXdsPickerLocked(); + xds_policy_->UpdateXdsPickerLocked(); } -void XdsLb::PriorityList::LocalityMap::UpdateConnectivityStateLocked() { +void XdsLb::LocalityMap::UpdateConnectivityStateLocked() { size_t num_ready = 0; size_t num_connecting = 0; size_t num_idle = 0; @@ -1420,8 +1384,7 @@ void XdsLb::PriorityList::LocalityMap::UpdateConnectivityStateLocked() { } } -void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimer( - void* arg, grpc_error* error) { +void XdsLb::LocalityMap::OnDelayedRemovalTimer(void* arg, grpc_error* error) { LocalityMap* self = static_cast(arg); self->xds_policy_->combiner()->Run( GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, @@ -1429,14 +1392,13 @@ void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimer( GRPC_ERROR_REF(error)); } -void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked( - void* arg, grpc_error* error) { +void XdsLb::LocalityMap::OnDelayedRemovalTimerLocked(void* arg, + grpc_error* error) { LocalityMap* self = static_cast(arg); self->delayed_removal_timer_callback_pending_ = false; if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) { - auto* priority_list = self->priority_list(); const bool keep = self->priority_list_update().Contains(self->priority_) && - self->priority_ <= priority_list->current_priority(); + self->priority_ <= self->xds_policy_->current_priority_; if (!keep) { // This check is to make sure we always delete the locality maps from // the lowest priority even if the closures of the back-to-back timers @@ -1445,8 +1407,8 @@ void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked( // deactivated locality maps when out-of-order closures are run. // TODO(juanlishen): Check the timer implementation to see if this // defense is necessary. - if (self->priority_ == priority_list->LowestPriority()) { - priority_list->priorities_.pop_back(); + if (self->priority_ == self->xds_policy_->LowestPriority()) { + self->xds_policy_->priorities_.pop_back(); } else { gpr_log(GPR_ERROR, "[xdslb %p] Priority %" PRIu32 @@ -1459,8 +1421,7 @@ void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked( self->Unref(DEBUG_LOCATION, "LocalityMap+timer"); } -void XdsLb::PriorityList::LocalityMap::OnFailoverTimer(void* arg, - grpc_error* error) { +void XdsLb::LocalityMap::OnFailoverTimer(void* arg, grpc_error* error) { LocalityMap* self = static_cast(arg); self->xds_policy_->combiner()->Run( GRPC_CLOSURE_INIT(&self->on_failover_timer_, OnFailoverTimerLocked, self, @@ -1468,23 +1429,21 @@ void XdsLb::PriorityList::LocalityMap::OnFailoverTimer(void* arg, GRPC_ERROR_REF(error)); } -void XdsLb::PriorityList::LocalityMap::OnFailoverTimerLocked( - void* arg, grpc_error* error) { +void XdsLb::LocalityMap::OnFailoverTimerLocked(void* arg, grpc_error* error) { LocalityMap* self = static_cast(arg); self->failover_timer_callback_pending_ = false; if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) { - self->priority_list()->FailoverOnConnectionFailureLocked(); + self->xds_policy_->FailoverOnConnectionFailureLocked(); } self->Unref(DEBUG_LOCATION, "LocalityMap+OnFailoverTimerLocked"); } // -// XdsLb::PriorityList::LocalityMap::Locality +// XdsLb::LocalityMap::Locality // -XdsLb::PriorityList::LocalityMap::Locality::Locality( - RefCountedPtr locality_map, - RefCountedPtr name) +XdsLb::LocalityMap::Locality::Locality(RefCountedPtr locality_map, + RefCountedPtr name) : locality_map_(std::move(locality_map)), name_(std::move(name)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p] created Locality %p for %s", xds_policy(), @@ -1492,7 +1451,7 @@ XdsLb::PriorityList::LocalityMap::Locality::Locality( } } -XdsLb::PriorityList::LocalityMap::Locality::~Locality() { +XdsLb::LocalityMap::Locality::~Locality() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: destroying locality", xds_policy(), this, name_->AsHumanReadableString()); @@ -1500,8 +1459,7 @@ XdsLb::PriorityList::LocalityMap::Locality::~Locality() { locality_map_.reset(DEBUG_LOCATION, "Locality"); } -grpc_channel_args* -XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyArgsLocked( +grpc_channel_args* XdsLb::LocalityMap::Locality::CreateChildPolicyArgsLocked( const grpc_channel_args* args_in) { const grpc_arg args_to_add[] = { // A channel arg indicating if the target is a backend inferred from a @@ -1519,7 +1477,7 @@ XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyArgsLocked( } OrphanablePtr -XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyLocked( +XdsLb::LocalityMap::Locality::CreateChildPolicyLocked( const char* name, const grpc_channel_args* args) { Helper* helper = new Helper(this->Ref(DEBUG_LOCATION, "Helper")); LoadBalancingPolicy::Args lb_policy_args; @@ -1551,8 +1509,8 @@ XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyLocked( return lb_policy; } -void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked( - uint32_t locality_weight, ServerAddressList serverlist) { +void XdsLb::LocalityMap::Locality::UpdateLocked(uint32_t locality_weight, + ServerAddressList serverlist) { if (xds_policy()->shutting_down_) return; // Update locality weight. weight_ = locality_weight; @@ -1661,7 +1619,7 @@ void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked( policy_to_update->UpdateLocked(std::move(update_args)); } -void XdsLb::PriorityList::LocalityMap::Locality::ShutdownLocked() { +void XdsLb::LocalityMap::Locality::ShutdownLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: shutting down locality", xds_policy(), this, name_->AsHumanReadableString()); @@ -1686,19 +1644,19 @@ void XdsLb::PriorityList::LocalityMap::Locality::ShutdownLocked() { shutdown_ = true; } -void XdsLb::PriorityList::LocalityMap::Locality::ResetBackoffLocked() { +void XdsLb::LocalityMap::Locality::ResetBackoffLocked() { child_policy_->ResetBackoffLocked(); if (pending_child_policy_ != nullptr) { pending_child_policy_->ResetBackoffLocked(); } } -void XdsLb::PriorityList::LocalityMap::Locality::Orphan() { +void XdsLb::LocalityMap::Locality::Orphan() { ShutdownLocked(); Unref(); } -void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() { +void XdsLb::LocalityMap::Locality::DeactivateLocked() { // If already deactivated, don't do that again. if (weight_ == 0) return; // Set the locality weight to 0 so that future xds picker won't contain this @@ -1715,8 +1673,8 @@ void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() { delayed_removal_timer_callback_pending_ = true; } -void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimer( - void* arg, grpc_error* error) { +void XdsLb::LocalityMap::Locality::OnDelayedRemovalTimer(void* arg, + grpc_error* error) { Locality* self = static_cast(arg); self->xds_policy()->combiner()->Run( GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, @@ -1724,7 +1682,7 @@ void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimer( GRPC_ERROR_REF(error)); } -void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimerLocked( +void XdsLb::LocalityMap::Locality::OnDelayedRemovalTimerLocked( void* arg, grpc_error* error) { Locality* self = static_cast(arg); self->delayed_removal_timer_callback_pending_ = false; @@ -1738,20 +1696,18 @@ void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimerLocked( // XdsLb::Locality::Helper // -bool XdsLb::PriorityList::LocalityMap::Locality::Helper::CalledByPendingChild() - const { +bool XdsLb::LocalityMap::Locality::Helper::CalledByPendingChild() const { GPR_ASSERT(child_ != nullptr); return child_ == locality_->pending_child_policy_.get(); } -bool XdsLb::PriorityList::LocalityMap::Locality::Helper::CalledByCurrentChild() - const { +bool XdsLb::LocalityMap::Locality::Helper::CalledByCurrentChild() const { GPR_ASSERT(child_ != nullptr); return child_ == locality_->child_policy_.get(); } RefCountedPtr -XdsLb::PriorityList::LocalityMap::Locality::Helper::CreateSubchannel( +XdsLb::LocalityMap::Locality::Helper::CreateSubchannel( const grpc_channel_args& args) { if (locality_->xds_policy()->shutting_down_ || (!CalledByPendingChild() && !CalledByCurrentChild())) { @@ -1761,7 +1717,7 @@ XdsLb::PriorityList::LocalityMap::Locality::Helper::CreateSubchannel( args); } -void XdsLb::PriorityList::LocalityMap::Locality::Helper::UpdateState( +void XdsLb::LocalityMap::Locality::Helper::UpdateState( grpc_connectivity_state state, std::unique_ptr picker) { if (locality_->xds_policy()->shutting_down_) return; // If this request is from the pending child policy, ignore it until @@ -1797,8 +1753,8 @@ void XdsLb::PriorityList::LocalityMap::Locality::Helper::UpdateState( locality_->locality_map_->OnLocalityStateUpdateLocked(); } -void XdsLb::PriorityList::LocalityMap::Locality::Helper::AddTraceEvent( - TraceSeverity severity, StringView message) { +void XdsLb::LocalityMap::Locality::Helper::AddTraceEvent(TraceSeverity severity, + StringView message) { if (locality_->xds_policy()->shutting_down_ || (!CalledByPendingChild() && !CalledByCurrentChild())) { return;