Remove PriorityList from xds LB policy

pull/21813/head
Mark D. Roth 5 years ago
parent 25d6f5f18f
commit aa851e9e30
  1. 244
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc

@ -199,10 +199,6 @@ class XdsLb : public LoadBalancingPolicy {
LoadBalancingPolicy* child_ = nullptr; LoadBalancingPolicy* child_ = nullptr;
}; };
// There is only one PriorityList instance, which has the same lifetime with
// the XdsLb instance.
class PriorityList {
public:
// Each LocalityMap holds a ref to the XdsLb. // Each LocalityMap holds a ref to the XdsLb.
class LocalityMap : public InternallyRefCounted<LocalityMap> { class LocalityMap : public InternallyRefCounted<LocalityMap> {
public: public:
@ -213,8 +209,7 @@ class XdsLb : public LoadBalancingPolicy {
RefCountedPtr<XdsLocalityName> name); RefCountedPtr<XdsLocalityName> name);
~Locality(); ~Locality();
void UpdateLocked(uint32_t locality_weight, void UpdateLocked(uint32_t locality_weight, ServerAddressList serverlist);
ServerAddressList serverlist);
void ShutdownLocked(); void ShutdownLocked();
void ResetBackoffLocked(); void ResetBackoffLocked();
void DeactivateLocked(); void DeactivateLocked();
@ -247,8 +242,7 @@ class XdsLb : public LoadBalancingPolicy {
// This is a no-op, because we get the addresses from the xds // This is a no-op, because we get the addresses from the xds
// client, which is a watch-based API. // client, which is a watch-based API.
void RequestReresolution() override {} void RequestReresolution() override {}
void AddTraceEvent(TraceSeverity severity, void AddTraceEvent(TraceSeverity severity, StringView message) override;
StringView message) override;
void set_child(LoadBalancingPolicy* child) { child_ = child; } void set_child(LoadBalancingPolicy* child) { child_ = child; }
private: private:
@ -322,9 +316,6 @@ class XdsLb : public LoadBalancingPolicy {
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error);
static void OnFailoverTimerLocked(void* arg, grpc_error* error); static void OnFailoverTimerLocked(void* arg, grpc_error* error);
PriorityList* priority_list() const {
return &xds_policy_->priority_list_;
}
const XdsPriorityListUpdate& priority_list_update() const { const XdsPriorityListUpdate& priority_list_update() const {
return xds_policy_->priority_list_update_; return xds_policy_->priority_list_update_;
} }
@ -351,19 +342,24 @@ class XdsLb : public LoadBalancingPolicy {
bool failover_timer_callback_pending_ = false; bool failover_timer_callback_pending_ = false;
}; };
explicit PriorityList(XdsLb* xds_policy) : xds_policy_(xds_policy) {} ~XdsLb();
void UpdateLocked(); void ShutdownLocked() override;
void ResetBackoffLocked();
void ShutdownLocked();
void UpdateXdsPickerLocked();
const XdsPriorityListUpdate& priority_list_update() const { const char* eds_service_name() const {
return xds_policy_->priority_list_update_; if (config_ != nullptr && config_->eds_service_name() != nullptr) {
return config_->eds_service_name();
}
return server_name_.c_str();
} }
uint32_t current_priority() const { return current_priority_; }
private: XdsClient* xds_client() const {
return xds_client_from_channel_ != nullptr ? xds_client_from_channel_.get()
: xds_client_.get();
}
void UpdatePrioritiesLocked();
void UpdateXdsPickerLocked();
void MaybeCreateLocalityMapLocked(uint32_t priority); void MaybeCreateLocalityMapLocked(uint32_t priority);
void FailoverOnConnectionFailureLocked(); void FailoverOnConnectionFailureLocked();
void FailoverOnDisconnectionLocked(uint32_t failed_priority); void FailoverOnDisconnectionLocked(uint32_t failed_priority);
@ -377,19 +373,6 @@ class XdsLb : public LoadBalancingPolicy {
} }
bool Contains(uint32_t priority) { return priority < priorities_.size(); } bool Contains(uint32_t priority) { return priority < priorities_.size(); }
XdsLb* xds_policy_;
// The list of locality maps, indexed by priority. P0 is the highest
// priority.
InlinedVector<OrphanablePtr<LocalityMap>, 2> priorities_;
// The priority that is being used.
uint32_t current_priority_ = UINT32_MAX;
};
~XdsLb();
void ShutdownLocked() override;
// Methods for dealing with fallback state. // Methods for dealing with fallback state.
void MaybeCancelFallbackAtStartupChecks(); void MaybeCancelFallbackAtStartupChecks();
static void OnFallbackTimer(void* arg, grpc_error* error); static void OnFallbackTimer(void* arg, grpc_error* error);
@ -399,18 +382,6 @@ class XdsLb : public LoadBalancingPolicy {
const char* name, const grpc_channel_args* args); const char* name, const grpc_channel_args* args);
void MaybeExitFallbackMode(); void MaybeExitFallbackMode();
const char* eds_service_name() const {
if (config_ != nullptr && config_->eds_service_name() != nullptr) {
return config_->eds_service_name();
}
return server_name_.c_str();
}
XdsClient* xds_client() const {
return xds_client_from_channel_ != nullptr ? xds_client_from_channel_.get()
: xds_client_.get();
}
// Server name from target URI. // Server name from target URI.
std::string server_name_; std::string server_name_;
@ -454,8 +425,11 @@ class XdsLb : public LoadBalancingPolicy {
const grpc_millis locality_retention_interval_ms_; const grpc_millis locality_retention_interval_ms_;
const grpc_millis locality_map_failover_timeout_ms_; const grpc_millis locality_map_failover_timeout_ms_;
// A list of locality maps indexed by priority. // The list of locality maps, indexed by priority. P0 is the highest
PriorityList priority_list_; // priority.
InlinedVector<OrphanablePtr<LocalityMap>, 2> priorities_;
// The priority that is being used.
uint32_t current_priority_ = UINT32_MAX;
// The update for priority_list_. // The update for priority_list_.
XdsPriorityListUpdate priority_list_update_; XdsPriorityListUpdate priority_list_update_;
@ -642,13 +616,13 @@ class XdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface {
xds_policy_.get(), drop_config_changed); xds_policy_.get(), drop_config_changed);
} }
if (drop_config_changed) { if (drop_config_changed) {
xds_policy_->priority_list_.UpdateXdsPickerLocked(); xds_policy_->UpdateXdsPickerLocked();
} }
return; return;
} }
// Update the priority list. // Update the priority list.
xds_policy_->priority_list_update_ = std::move(update.priority_list_update); xds_policy_->priority_list_update_ = std::move(update.priority_list_update);
xds_policy_->priority_list_.UpdateLocked(); xds_policy_->UpdatePrioritiesLocked();
} }
void OnError(grpc_error* error) override { void OnError(grpc_error* error) override {
@ -701,8 +675,7 @@ XdsLb::XdsLb(Args args)
{GRPC_XDS_DEFAULT_LOCALITY_RETENTION_INTERVAL_MS, 0, INT_MAX})), {GRPC_XDS_DEFAULT_LOCALITY_RETENTION_INTERVAL_MS, 0, INT_MAX})),
locality_map_failover_timeout_ms_(grpc_channel_args_find_integer( locality_map_failover_timeout_ms_(grpc_channel_args_find_integer(
args.args, GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS, args.args, GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS,
{GRPC_XDS_DEFAULT_FAILOVER_TIMEOUT_MS, 0, INT_MAX})), {GRPC_XDS_DEFAULT_FAILOVER_TIMEOUT_MS, 0, INT_MAX})) {
priority_list_(this) {
if (xds_client_from_channel_ != nullptr && if (xds_client_from_channel_ != nullptr &&
GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Using xds client %p from channel", this, gpr_log(GPR_INFO, "[xdslb %p] Using xds client %p from channel", this,
@ -735,7 +708,7 @@ void XdsLb::ShutdownLocked() {
} }
shutting_down_ = true; shutting_down_ = true;
MaybeCancelFallbackAtStartupChecks(); MaybeCancelFallbackAtStartupChecks();
priority_list_.ShutdownLocked(); priorities_.clear();
if (fallback_policy_ != nullptr) { if (fallback_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(), grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(),
interested_parties()); interested_parties());
@ -775,7 +748,9 @@ void XdsLb::ResetBackoffLocked() {
// LB policy, this is done via the resolver, so we don't need to do it // LB policy, this is done via the resolver, so we don't need to do it
// for xds_client_from_channel_ here. // for xds_client_from_channel_ here.
if (xds_client_ != nullptr) xds_client_->ResetBackoff(); if (xds_client_ != nullptr) xds_client_->ResetBackoff();
priority_list_.ResetBackoffLocked(); for (size_t i = 0; i < priorities_.size(); ++i) {
priorities_[i]->ResetBackoffLocked();
}
if (fallback_policy_ != nullptr) { if (fallback_policy_ != nullptr) {
fallback_policy_->ResetBackoffLocked(); fallback_policy_->ResetBackoffLocked();
} }
@ -800,7 +775,7 @@ void XdsLb::UpdateLocked(UpdateArgs args) {
args_ = args.args; args_ = args.args;
args.args = nullptr; args.args = nullptr;
// Update priority list. // Update priority list.
priority_list_.UpdateLocked(); UpdatePrioritiesLocked();
// Update the existing fallback policy. The fallback policy config and/or the // Update the existing fallback policy. The fallback policy config and/or the
// fallback addresses may be new. // fallback addresses may be new.
if (fallback_policy_ != nullptr) UpdateFallbackPolicyLocked(); if (fallback_policy_ != nullptr) UpdateFallbackPolicyLocked();
@ -1047,17 +1022,16 @@ void XdsLb::MaybeExitFallbackMode() {
} }
// //
// XdsLb::PriorityList // priority list-related methods
// //
void XdsLb::PriorityList::UpdateLocked() { void XdsLb::UpdatePrioritiesLocked() {
const auto& priority_list_update = xds_policy_->priority_list_update_;
// 1. Remove from the priority list the priorities that are not in the update. // 1. Remove from the priority list the priorities that are not in the update.
DeactivatePrioritiesLowerThan(priority_list_update.LowestPriority()); DeactivatePrioritiesLowerThan(priority_list_update_.LowestPriority());
// 2. Update all the existing priorities. // 2. Update all the existing priorities.
for (uint32_t priority = 0; priority < priorities_.size(); ++priority) { for (uint32_t priority = 0; priority < priorities_.size(); ++priority) {
LocalityMap* locality_map = priorities_[priority].get(); LocalityMap* locality_map = priorities_[priority].get();
const auto* locality_map_update = priority_list_update.Find(priority); const auto* locality_map_update = priority_list_update_.Find(priority);
// Propagate locality_map_update. // Propagate locality_map_update.
// TODO(juanlishen): Find a clean way to skip duplicate update for a // TODO(juanlishen): Find a clean way to skip duplicate update for a
// priority. // priority.
@ -1075,22 +1049,14 @@ void XdsLb::PriorityList::UpdateLocked() {
} }
} }
void XdsLb::PriorityList::ResetBackoffLocked() { void XdsLb::UpdateXdsPickerLocked() {
for (size_t i = 0; i < priorities_.size(); ++i) {
priorities_[i]->ResetBackoffLocked();
}
}
void XdsLb::PriorityList::ShutdownLocked() { priorities_.clear(); }
void XdsLb::PriorityList::UpdateXdsPickerLocked() {
// If we are in fallback mode, don't generate an xds picker from localities. // If we are in fallback mode, don't generate an xds picker from localities.
if (xds_policy_->fallback_policy_ != nullptr) return; if (fallback_policy_ != nullptr) return;
if (current_priority() == UINT32_MAX) { if (current_priority_ == UINT32_MAX) {
grpc_error* error = grpc_error_set_int( grpc_error* error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ready locality map"), GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ready locality map"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
xds_policy_->channel_control_helper()->UpdateState( channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_core::MakeUnique<TransientFailurePicker>(error)); grpc_core::MakeUnique<TransientFailurePicker>(error));
return; return;
@ -1098,29 +1064,28 @@ void XdsLb::PriorityList::UpdateXdsPickerLocked() {
priorities_[current_priority_]->UpdateXdsPickerLocked(); priorities_[current_priority_]->UpdateXdsPickerLocked();
} }
void XdsLb::PriorityList::MaybeCreateLocalityMapLocked(uint32_t priority) { void XdsLb::MaybeCreateLocalityMapLocked(uint32_t priority) {
// Exhausted priorities in the update. // Exhausted priorities in the update.
if (!priority_list_update().Contains(priority)) return; if (!priority_list_update_.Contains(priority)) return;
auto new_locality_map = new LocalityMap( auto new_locality_map =
xds_policy_->Ref(DEBUG_LOCATION, "LocalityMap"), priority); new LocalityMap(Ref(DEBUG_LOCATION, "LocalityMap"), priority);
priorities_.emplace_back(OrphanablePtr<LocalityMap>(new_locality_map)); priorities_.emplace_back(OrphanablePtr<LocalityMap>(new_locality_map));
new_locality_map->UpdateLocked(*priority_list_update().Find(priority)); new_locality_map->UpdateLocked(*priority_list_update_.Find(priority));
} }
void XdsLb::PriorityList::FailoverOnConnectionFailureLocked() { void XdsLb::FailoverOnConnectionFailureLocked() {
const uint32_t failed_priority = LowestPriority(); const uint32_t failed_priority = LowestPriority();
// If we're failing over from the lowest priority, report TRANSIENT_FAILURE. // If we're failing over from the lowest priority, report TRANSIENT_FAILURE.
if (failed_priority == priority_list_update().LowestPriority()) { if (failed_priority == priority_list_update_.LowestPriority()) {
UpdateXdsPickerLocked(); UpdateXdsPickerLocked();
} }
MaybeCreateLocalityMapLocked(failed_priority + 1); MaybeCreateLocalityMapLocked(failed_priority + 1);
} }
void XdsLb::PriorityList::FailoverOnDisconnectionLocked( void XdsLb::FailoverOnDisconnectionLocked(uint32_t failed_priority) {
uint32_t failed_priority) {
current_priority_ = UINT32_MAX; current_priority_ = UINT32_MAX;
for (uint32_t next_priority = failed_priority + 1; for (uint32_t next_priority = failed_priority + 1;
next_priority <= priority_list_update().LowestPriority(); next_priority <= priority_list_update_.LowestPriority();
++next_priority) { ++next_priority) {
if (!Contains(next_priority)) { if (!Contains(next_priority)) {
MaybeCreateLocalityMapLocked(next_priority); MaybeCreateLocalityMapLocked(next_priority);
@ -1130,17 +1095,17 @@ void XdsLb::PriorityList::FailoverOnDisconnectionLocked(
} }
} }
void XdsLb::PriorityList::SwitchToHigherPriorityLocked(uint32_t priority) { void XdsLb::SwitchToHigherPriorityLocked(uint32_t priority) {
current_priority_ = priority; current_priority_ = priority;
DeactivatePrioritiesLowerThan(current_priority_); DeactivatePrioritiesLowerThan(current_priority_);
UpdateXdsPickerLocked(); UpdateXdsPickerLocked();
} }
void XdsLb::PriorityList::DeactivatePrioritiesLowerThan(uint32_t priority) { void XdsLb::DeactivatePrioritiesLowerThan(uint32_t priority) {
if (priorities_.empty()) return; if (priorities_.empty()) return;
// Deactivate the locality maps from the lowest priority. // Deactivate the locality maps from the lowest priority.
for (uint32_t p = LowestPriority(); p > priority; --p) { for (uint32_t p = LowestPriority(); p > priority; --p) {
if (xds_policy_->locality_retention_interval_ms_ == 0) { if (locality_retention_interval_ms_ == 0) {
priorities_.pop_back(); priorities_.pop_back();
} else { } else {
priorities_[p]->DeactivateLocked(); priorities_[p]->DeactivateLocked();
@ -1148,8 +1113,7 @@ void XdsLb::PriorityList::DeactivatePrioritiesLowerThan(uint32_t priority) {
} }
} }
OrphanablePtr<XdsLb::PriorityList::LocalityMap::Locality> OrphanablePtr<XdsLb::LocalityMap::Locality> XdsLb::ExtractLocalityLocked(
XdsLb::PriorityList::ExtractLocalityLocked(
const RefCountedPtr<XdsLocalityName>& name, uint32_t exclude_priority) { const RefCountedPtr<XdsLocalityName>& name, uint32_t exclude_priority) {
for (uint32_t priority = 0; priority < priorities_.size(); ++priority) { for (uint32_t priority = 0; priority < priorities_.size(); ++priority) {
if (priority == exclude_priority) continue; if (priority == exclude_priority) continue;
@ -1161,10 +1125,10 @@ XdsLb::PriorityList::ExtractLocalityLocked(
} }
// //
// XdsLb::PriorityList::LocalityMap // XdsLb::LocalityMap
// //
XdsLb::PriorityList::LocalityMap::LocalityMap(RefCountedPtr<XdsLb> xds_policy, XdsLb::LocalityMap::LocalityMap(RefCountedPtr<XdsLb> xds_policy,
uint32_t priority) uint32_t priority)
: xds_policy_(std::move(xds_policy)), priority_(priority) { : xds_policy_(std::move(xds_policy)), priority_(priority) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
@ -1189,7 +1153,7 @@ XdsLb::PriorityList::LocalityMap::LocalityMap(RefCountedPtr<XdsLb> xds_policy,
} }
} }
void XdsLb::PriorityList::LocalityMap::UpdateLocked( void XdsLb::LocalityMap::UpdateLocked(
const XdsPriorityListUpdate::LocalityMap& locality_map_update) { const XdsPriorityListUpdate::LocalityMap& locality_map_update) {
if (xds_policy_->shutting_down_) return; if (xds_policy_->shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
@ -1221,7 +1185,7 @@ void XdsLb::PriorityList::LocalityMap::UpdateLocked(
OrphanablePtr<Locality>& locality = localities_[name]; OrphanablePtr<Locality>& locality = localities_[name];
if (locality == nullptr) { if (locality == nullptr) {
// Move from another locality map if possible. // Move from another locality map if possible.
locality = priority_list()->ExtractLocalityLocked(name, priority_); locality = xds_policy_->ExtractLocalityLocked(name, priority_);
if (locality != nullptr) { if (locality != nullptr) {
locality->set_locality_map( locality->set_locality_map(
Ref(DEBUG_LOCATION, "LocalityMap+Locality_move")); Ref(DEBUG_LOCATION, "LocalityMap+Locality_move"));
@ -1237,11 +1201,11 @@ void XdsLb::PriorityList::LocalityMap::UpdateLocked(
} }
} }
void XdsLb::PriorityList::LocalityMap::ResetBackoffLocked() { void XdsLb::LocalityMap::ResetBackoffLocked() {
for (auto& p : localities_) p.second->ResetBackoffLocked(); for (auto& p : localities_) p.second->ResetBackoffLocked();
} }
void XdsLb::PriorityList::LocalityMap::UpdateXdsPickerLocked() { void XdsLb::LocalityMap::UpdateXdsPickerLocked() {
// Construct a new xds picker which maintains a map of all locality pickers // Construct a new xds picker which maintains a map of all locality pickers
// that are ready. Each locality is represented by a portion of the range // that are ready. Each locality is represented by a portion of the range
// proportional to its weight, such that the total range is the sum of the // proportional to its weight, such that the total range is the sum of the
@ -1264,8 +1228,8 @@ void XdsLb::PriorityList::LocalityMap::UpdateXdsPickerLocked() {
std::move(picker_list))); std::move(picker_list)));
} }
OrphanablePtr<XdsLb::PriorityList::LocalityMap::Locality> OrphanablePtr<XdsLb::LocalityMap::Locality>
XdsLb::PriorityList::LocalityMap::ExtractLocalityLocked( XdsLb::LocalityMap::ExtractLocalityLocked(
const RefCountedPtr<XdsLocalityName>& name) { const RefCountedPtr<XdsLocalityName>& name) {
for (auto iter = localities_.begin(); iter != localities_.end(); ++iter) { for (auto iter = localities_.begin(); iter != localities_.end(); ++iter) {
const auto& name_in_map = iter->first; const auto& name_in_map = iter->first;
@ -1278,7 +1242,7 @@ XdsLb::PriorityList::LocalityMap::ExtractLocalityLocked(
return nullptr; return nullptr;
} }
void XdsLb::PriorityList::LocalityMap::DeactivateLocked() { void XdsLb::LocalityMap::DeactivateLocked() {
// If already deactivated, don't do it again. // If already deactivated, don't do it again.
if (delayed_removal_timer_callback_pending_) return; if (delayed_removal_timer_callback_pending_) return;
MaybeCancelFailoverTimerLocked(); MaybeCancelFailoverTimerLocked();
@ -1299,24 +1263,24 @@ void XdsLb::PriorityList::LocalityMap::DeactivateLocked() {
delayed_removal_timer_callback_pending_ = true; delayed_removal_timer_callback_pending_ = true;
} }
bool XdsLb::PriorityList::LocalityMap::MaybeReactivateLocked() { bool XdsLb::LocalityMap::MaybeReactivateLocked() {
// Don't reactivate a priority that is not higher than the current one. // Don't reactivate a priority that is not higher than the current one.
if (priority_ >= priority_list()->current_priority()) return false; if (priority_ >= xds_policy_->current_priority_) return false;
// Reactivate this priority by cancelling deletion timer. // Reactivate this priority by cancelling deletion timer.
if (delayed_removal_timer_callback_pending_) { if (delayed_removal_timer_callback_pending_) {
grpc_timer_cancel(&delayed_removal_timer_); grpc_timer_cancel(&delayed_removal_timer_);
} }
// Switch to this higher priority if it's READY. // Switch to this higher priority if it's READY.
if (connectivity_state_ != GRPC_CHANNEL_READY) return false; if (connectivity_state_ != GRPC_CHANNEL_READY) return false;
priority_list()->SwitchToHigherPriorityLocked(priority_); xds_policy_->SwitchToHigherPriorityLocked(priority_);
return true; return true;
} }
void XdsLb::PriorityList::LocalityMap::MaybeCancelFailoverTimerLocked() { void XdsLb::LocalityMap::MaybeCancelFailoverTimerLocked() {
if (failover_timer_callback_pending_) grpc_timer_cancel(&failover_timer_); if (failover_timer_callback_pending_) grpc_timer_cancel(&failover_timer_);
} }
void XdsLb::PriorityList::LocalityMap::Orphan() { void XdsLb::LocalityMap::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Priority %" PRIu32 " orphaned.", xds_policy(), gpr_log(GPR_INFO, "[xdslb %p] Priority %" PRIu32 " orphaned.", xds_policy(),
priority_); priority_);
@ -1329,11 +1293,11 @@ void XdsLb::PriorityList::LocalityMap::Orphan() {
Unref(DEBUG_LOCATION, "LocalityMap+Orphan"); Unref(DEBUG_LOCATION, "LocalityMap+Orphan");
} }
void XdsLb::PriorityList::LocalityMap::OnLocalityStateUpdateLocked() { void XdsLb::LocalityMap::OnLocalityStateUpdateLocked() {
UpdateConnectivityStateLocked(); UpdateConnectivityStateLocked();
// Ignore priorities not in priority_list_update. // Ignore priorities not in priority_list_update.
if (!priority_list_update().Contains(priority_)) return; if (!priority_list_update().Contains(priority_)) return;
const uint32_t current_priority = priority_list()->current_priority(); const uint32_t current_priority = xds_policy_->current_priority_;
// Ignore lower-than-current priorities. // Ignore lower-than-current priorities.
if (priority_ > current_priority) return; if (priority_ > current_priority) return;
// Maybe update fallback state. // Maybe update fallback state.
@ -1347,13 +1311,13 @@ void XdsLb::PriorityList::LocalityMap::OnLocalityStateUpdateLocked() {
if (connectivity_state_ == GRPC_CHANNEL_READY) { if (connectivity_state_ == GRPC_CHANNEL_READY) {
MaybeCancelFailoverTimerLocked(); MaybeCancelFailoverTimerLocked();
// If a higher-than-current priority becomes READY, switch to use it. // If a higher-than-current priority becomes READY, switch to use it.
priority_list()->SwitchToHigherPriorityLocked(priority_); xds_policy_->SwitchToHigherPriorityLocked(priority_);
} else if (connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) { } else if (connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If a higher-than-current priority becomes TRANSIENT_FAILURE, only // If a higher-than-current priority becomes TRANSIENT_FAILURE, only
// handle it if it's the priority that is still in failover timeout. // handle it if it's the priority that is still in failover timeout.
if (failover_timer_callback_pending_) { if (failover_timer_callback_pending_) {
MaybeCancelFailoverTimerLocked(); MaybeCancelFailoverTimerLocked();
priority_list()->FailoverOnConnectionFailureLocked(); xds_policy_->FailoverOnConnectionFailureLocked();
} }
} }
return; return;
@ -1361,7 +1325,7 @@ void XdsLb::PriorityList::LocalityMap::OnLocalityStateUpdateLocked() {
// Update is for current priority. // Update is for current priority.
if (connectivity_state_ != GRPC_CHANNEL_READY) { if (connectivity_state_ != GRPC_CHANNEL_READY) {
// Fail over if it's no longer READY. // Fail over if it's no longer READY.
priority_list()->FailoverOnDisconnectionLocked(priority_); xds_policy_->FailoverOnDisconnectionLocked(priority_);
} }
// At this point, one of the following things has happened to the current // At this point, one of the following things has happened to the current
// priority. // priority.
@ -1369,10 +1333,10 @@ void XdsLb::PriorityList::LocalityMap::OnLocalityStateUpdateLocked() {
// 2. It changed to a lower priority due to failover. // 2. It changed to a lower priority due to failover.
// 3. It became invalid because failover didn't yield a READY priority. // 3. It became invalid because failover didn't yield a READY priority.
// In any case, update the xds picker. // In any case, update the xds picker.
priority_list()->UpdateXdsPickerLocked(); xds_policy_->UpdateXdsPickerLocked();
} }
void XdsLb::PriorityList::LocalityMap::UpdateConnectivityStateLocked() { void XdsLb::LocalityMap::UpdateConnectivityStateLocked() {
size_t num_ready = 0; size_t num_ready = 0;
size_t num_connecting = 0; size_t num_connecting = 0;
size_t num_idle = 0; size_t num_idle = 0;
@ -1420,8 +1384,7 @@ void XdsLb::PriorityList::LocalityMap::UpdateConnectivityStateLocked() {
} }
} }
void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimer( void XdsLb::LocalityMap::OnDelayedRemovalTimer(void* arg, grpc_error* error) {
void* arg, grpc_error* error) {
LocalityMap* self = static_cast<LocalityMap*>(arg); LocalityMap* self = static_cast<LocalityMap*>(arg);
self->xds_policy_->combiner()->Run( self->xds_policy_->combiner()->Run(
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_,
@ -1429,14 +1392,13 @@ void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimer(
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
} }
void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked( void XdsLb::LocalityMap::OnDelayedRemovalTimerLocked(void* arg,
void* arg, grpc_error* error) { grpc_error* error) {
LocalityMap* self = static_cast<LocalityMap*>(arg); LocalityMap* self = static_cast<LocalityMap*>(arg);
self->delayed_removal_timer_callback_pending_ = false; self->delayed_removal_timer_callback_pending_ = false;
if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) { if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) {
auto* priority_list = self->priority_list();
const bool keep = self->priority_list_update().Contains(self->priority_) && const bool keep = self->priority_list_update().Contains(self->priority_) &&
self->priority_ <= priority_list->current_priority(); self->priority_ <= self->xds_policy_->current_priority_;
if (!keep) { if (!keep) {
// This check is to make sure we always delete the locality maps from // This check is to make sure we always delete the locality maps from
// the lowest priority even if the closures of the back-to-back timers // the lowest priority even if the closures of the back-to-back timers
@ -1445,8 +1407,8 @@ void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked(
// deactivated locality maps when out-of-order closures are run. // deactivated locality maps when out-of-order closures are run.
// TODO(juanlishen): Check the timer implementation to see if this // TODO(juanlishen): Check the timer implementation to see if this
// defense is necessary. // defense is necessary.
if (self->priority_ == priority_list->LowestPriority()) { if (self->priority_ == self->xds_policy_->LowestPriority()) {
priority_list->priorities_.pop_back(); self->xds_policy_->priorities_.pop_back();
} else { } else {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"[xdslb %p] Priority %" PRIu32 "[xdslb %p] Priority %" PRIu32
@ -1459,8 +1421,7 @@ void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked(
self->Unref(DEBUG_LOCATION, "LocalityMap+timer"); self->Unref(DEBUG_LOCATION, "LocalityMap+timer");
} }
void XdsLb::PriorityList::LocalityMap::OnFailoverTimer(void* arg, void XdsLb::LocalityMap::OnFailoverTimer(void* arg, grpc_error* error) {
grpc_error* error) {
LocalityMap* self = static_cast<LocalityMap*>(arg); LocalityMap* self = static_cast<LocalityMap*>(arg);
self->xds_policy_->combiner()->Run( self->xds_policy_->combiner()->Run(
GRPC_CLOSURE_INIT(&self->on_failover_timer_, OnFailoverTimerLocked, self, GRPC_CLOSURE_INIT(&self->on_failover_timer_, OnFailoverTimerLocked, self,
@ -1468,22 +1429,20 @@ void XdsLb::PriorityList::LocalityMap::OnFailoverTimer(void* arg,
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
} }
void XdsLb::PriorityList::LocalityMap::OnFailoverTimerLocked( void XdsLb::LocalityMap::OnFailoverTimerLocked(void* arg, grpc_error* error) {
void* arg, grpc_error* error) {
LocalityMap* self = static_cast<LocalityMap*>(arg); LocalityMap* self = static_cast<LocalityMap*>(arg);
self->failover_timer_callback_pending_ = false; self->failover_timer_callback_pending_ = false;
if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) { if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) {
self->priority_list()->FailoverOnConnectionFailureLocked(); self->xds_policy_->FailoverOnConnectionFailureLocked();
} }
self->Unref(DEBUG_LOCATION, "LocalityMap+OnFailoverTimerLocked"); self->Unref(DEBUG_LOCATION, "LocalityMap+OnFailoverTimerLocked");
} }
// //
// XdsLb::PriorityList::LocalityMap::Locality // XdsLb::LocalityMap::Locality
// //
XdsLb::PriorityList::LocalityMap::Locality::Locality( XdsLb::LocalityMap::Locality::Locality(RefCountedPtr<LocalityMap> locality_map,
RefCountedPtr<LocalityMap> locality_map,
RefCountedPtr<XdsLocalityName> name) RefCountedPtr<XdsLocalityName> name)
: locality_map_(std::move(locality_map)), name_(std::move(name)) { : locality_map_(std::move(locality_map)), name_(std::move(name)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
@ -1492,7 +1451,7 @@ XdsLb::PriorityList::LocalityMap::Locality::Locality(
} }
} }
XdsLb::PriorityList::LocalityMap::Locality::~Locality() { XdsLb::LocalityMap::Locality::~Locality() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: destroying locality", gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: destroying locality",
xds_policy(), this, name_->AsHumanReadableString()); xds_policy(), this, name_->AsHumanReadableString());
@ -1500,8 +1459,7 @@ XdsLb::PriorityList::LocalityMap::Locality::~Locality() {
locality_map_.reset(DEBUG_LOCATION, "Locality"); locality_map_.reset(DEBUG_LOCATION, "Locality");
} }
grpc_channel_args* grpc_channel_args* XdsLb::LocalityMap::Locality::CreateChildPolicyArgsLocked(
XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyArgsLocked(
const grpc_channel_args* args_in) { const grpc_channel_args* args_in) {
const grpc_arg args_to_add[] = { const grpc_arg args_to_add[] = {
// A channel arg indicating if the target is a backend inferred from a // A channel arg indicating if the target is a backend inferred from a
@ -1519,7 +1477,7 @@ XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyArgsLocked(
} }
OrphanablePtr<LoadBalancingPolicy> OrphanablePtr<LoadBalancingPolicy>
XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyLocked( XdsLb::LocalityMap::Locality::CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args) { const char* name, const grpc_channel_args* args) {
Helper* helper = new Helper(this->Ref(DEBUG_LOCATION, "Helper")); Helper* helper = new Helper(this->Ref(DEBUG_LOCATION, "Helper"));
LoadBalancingPolicy::Args lb_policy_args; LoadBalancingPolicy::Args lb_policy_args;
@ -1551,8 +1509,8 @@ XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyLocked(
return lb_policy; return lb_policy;
} }
void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked( void XdsLb::LocalityMap::Locality::UpdateLocked(uint32_t locality_weight,
uint32_t locality_weight, ServerAddressList serverlist) { ServerAddressList serverlist) {
if (xds_policy()->shutting_down_) return; if (xds_policy()->shutting_down_) return;
// Update locality weight. // Update locality weight.
weight_ = locality_weight; weight_ = locality_weight;
@ -1661,7 +1619,7 @@ void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked(
policy_to_update->UpdateLocked(std::move(update_args)); policy_to_update->UpdateLocked(std::move(update_args));
} }
void XdsLb::PriorityList::LocalityMap::Locality::ShutdownLocked() { void XdsLb::LocalityMap::Locality::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: shutting down locality", gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: shutting down locality",
xds_policy(), this, name_->AsHumanReadableString()); xds_policy(), this, name_->AsHumanReadableString());
@ -1686,19 +1644,19 @@ void XdsLb::PriorityList::LocalityMap::Locality::ShutdownLocked() {
shutdown_ = true; shutdown_ = true;
} }
void XdsLb::PriorityList::LocalityMap::Locality::ResetBackoffLocked() { void XdsLb::LocalityMap::Locality::ResetBackoffLocked() {
child_policy_->ResetBackoffLocked(); child_policy_->ResetBackoffLocked();
if (pending_child_policy_ != nullptr) { if (pending_child_policy_ != nullptr) {
pending_child_policy_->ResetBackoffLocked(); pending_child_policy_->ResetBackoffLocked();
} }
} }
void XdsLb::PriorityList::LocalityMap::Locality::Orphan() { void XdsLb::LocalityMap::Locality::Orphan() {
ShutdownLocked(); ShutdownLocked();
Unref(); Unref();
} }
void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() { void XdsLb::LocalityMap::Locality::DeactivateLocked() {
// If already deactivated, don't do that again. // If already deactivated, don't do that again.
if (weight_ == 0) return; if (weight_ == 0) return;
// Set the locality weight to 0 so that future xds picker won't contain this // Set the locality weight to 0 so that future xds picker won't contain this
@ -1715,8 +1673,8 @@ void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() {
delayed_removal_timer_callback_pending_ = true; delayed_removal_timer_callback_pending_ = true;
} }
void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimer( void XdsLb::LocalityMap::Locality::OnDelayedRemovalTimer(void* arg,
void* arg, grpc_error* error) { grpc_error* error) {
Locality* self = static_cast<Locality*>(arg); Locality* self = static_cast<Locality*>(arg);
self->xds_policy()->combiner()->Run( self->xds_policy()->combiner()->Run(
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_,
@ -1724,7 +1682,7 @@ void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimer(
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
} }
void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimerLocked( void XdsLb::LocalityMap::Locality::OnDelayedRemovalTimerLocked(
void* arg, grpc_error* error) { void* arg, grpc_error* error) {
Locality* self = static_cast<Locality*>(arg); Locality* self = static_cast<Locality*>(arg);
self->delayed_removal_timer_callback_pending_ = false; self->delayed_removal_timer_callback_pending_ = false;
@ -1738,20 +1696,18 @@ void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimerLocked(
// XdsLb::Locality::Helper // XdsLb::Locality::Helper
// //
bool XdsLb::PriorityList::LocalityMap::Locality::Helper::CalledByPendingChild() bool XdsLb::LocalityMap::Locality::Helper::CalledByPendingChild() const {
const {
GPR_ASSERT(child_ != nullptr); GPR_ASSERT(child_ != nullptr);
return child_ == locality_->pending_child_policy_.get(); return child_ == locality_->pending_child_policy_.get();
} }
bool XdsLb::PriorityList::LocalityMap::Locality::Helper::CalledByCurrentChild() bool XdsLb::LocalityMap::Locality::Helper::CalledByCurrentChild() const {
const {
GPR_ASSERT(child_ != nullptr); GPR_ASSERT(child_ != nullptr);
return child_ == locality_->child_policy_.get(); return child_ == locality_->child_policy_.get();
} }
RefCountedPtr<SubchannelInterface> RefCountedPtr<SubchannelInterface>
XdsLb::PriorityList::LocalityMap::Locality::Helper::CreateSubchannel( XdsLb::LocalityMap::Locality::Helper::CreateSubchannel(
const grpc_channel_args& args) { const grpc_channel_args& args) {
if (locality_->xds_policy()->shutting_down_ || if (locality_->xds_policy()->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) { (!CalledByPendingChild() && !CalledByCurrentChild())) {
@ -1761,7 +1717,7 @@ XdsLb::PriorityList::LocalityMap::Locality::Helper::CreateSubchannel(
args); args);
} }
void XdsLb::PriorityList::LocalityMap::Locality::Helper::UpdateState( void XdsLb::LocalityMap::Locality::Helper::UpdateState(
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) { grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) {
if (locality_->xds_policy()->shutting_down_) return; if (locality_->xds_policy()->shutting_down_) return;
// If this request is from the pending child policy, ignore it until // If this request is from the pending child policy, ignore it until
@ -1797,8 +1753,8 @@ void XdsLb::PriorityList::LocalityMap::Locality::Helper::UpdateState(
locality_->locality_map_->OnLocalityStateUpdateLocked(); locality_->locality_map_->OnLocalityStateUpdateLocked();
} }
void XdsLb::PriorityList::LocalityMap::Locality::Helper::AddTraceEvent( void XdsLb::LocalityMap::Locality::Helper::AddTraceEvent(TraceSeverity severity,
TraceSeverity severity, StringView message) { StringView message) {
if (locality_->xds_policy()->shutting_down_ || if (locality_->xds_policy()->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) { (!CalledByPendingChild() && !CalledByCurrentChild())) {
return; return;

Loading…
Cancel
Save