From 6ced125bb3d95d20068c6c2916c9f70fa2f1460b Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Thu, 26 Sep 2019 17:20:44 -0700 Subject: [PATCH] xds failover locality handling --- include/grpc/impl/codegen/grpc_types.h | 7 + .../client_channel/lb_policy/xds/xds.cc | 958 ++++++++++++------ .../lb_policy/xds/xds_load_balancer_api.cc | 63 +- .../lb_policy/xds/xds_load_balancer_api.h | 71 +- src/core/lib/gprpp/inlined_vector.h | 4 + test/core/gprpp/inlined_vector_test.cc | 18 + test/core/util/ubsan_suppressions.txt | 4 + test/cpp/end2end/xds_end2end_test.cc | 166 ++- 8 files changed, 965 insertions(+), 326 deletions(-) diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 3bedf219dc0..cb28de0f9b1 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -339,6 +339,13 @@ typedef struct { value is 15 minutes. */ #define GRPC_ARG_LOCALITY_RETENTION_INTERVAL_MS \ "grpc.xds_locality_retention_interval_ms" +/* Timeout in milliseconds to wait for the localities of a specific priority to + complete their initial connection attempt before xDS fails over to the next + priority. Specifically, the connection attempt of a priority is considered + completed when any locality of that priority is ready or all the localities + of that priority fail to connect. If 0, failover happens immediately. Default + value is 10 seconds. */ +#define GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS "grpc.xds_failover_timeout_ms" /** If non-zero, grpc server's cronet compression workaround will be enabled */ #define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \ "grpc.workaround.cronet_compression" diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 70edfed14c5..c5b9b3dc437 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -110,6 +110,7 @@ #define GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS 10000 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000 #define GRPC_XDS_DEFAULT_LOCALITY_RETENTION_INTERVAL_MS (15 * 60 * 1000) +#define GRPC_XDS_DEFAULT_FAILOVER_TIMEOUT_MS 10000 namespace grpc_core { @@ -374,7 +375,7 @@ class XdsLb : public LoadBalancingPolicy { // We need this wrapper for the following reasons: // 1. To process per-locality load reporting. // 2. Since pickers are UniquePtrs we use this RefCounted wrapper to control - // references to it by the xds picker and the locality entry. + // references to it by the xds picker and the locality. class PickerWrapper : public RefCounted { public: PickerWrapper(UniquePtr picker, @@ -447,89 +448,184 @@ class XdsLb : public LoadBalancingPolicy { LoadBalancingPolicy* child_ = nullptr; }; - class LocalityMap { + // There is only one PriorityList instance, which has the same lifetime with + // the XdsLb instance. + class PriorityList { public: - class LocalityEntry : public InternallyRefCounted { + // Each LocalityMap holds a ref to the XdsLb. + class LocalityMap : public InternallyRefCounted { public: - LocalityEntry(RefCountedPtr parent, - RefCountedPtr name); - ~LocalityEntry(); - - void UpdateLocked(uint32_t locality_weight, ServerAddressList serverlist, - LoadBalancingPolicy::Config* child_policy_config, - const grpc_channel_args* args); - void ShutdownLocked(); + // Each Locality holds a ref to the LocalityMap it is in. + class Locality : public InternallyRefCounted { + public: + Locality(RefCountedPtr locality_map, + RefCountedPtr name); + ~Locality(); + + void UpdateLocked(uint32_t locality_weight, + ServerAddressList serverlist); + void ShutdownLocked(); + void ResetBackoffLocked(); + void DeactivateLocked(); + void Orphan() override; + + grpc_connectivity_state connectivity_state() const { + return connectivity_state_; + } + uint32_t weight() const { return weight_; } + RefCountedPtr picker_wrapper() const { + return picker_wrapper_; + } + + void set_locality_map(RefCountedPtr locality_map) { + locality_map_ = std::move(locality_map); + } + + private: + class Helper : public ChannelControlHelper { + public: + explicit Helper(RefCountedPtr locality) + : locality_(std::move(locality)) {} + + ~Helper() { locality_.reset(DEBUG_LOCATION, "Helper"); } + + RefCountedPtr CreateSubchannel( + const grpc_channel_args& args) override; + void UpdateState(grpc_connectivity_state state, + UniquePtr picker) override; + void RequestReresolution() override; + void AddTraceEvent(TraceSeverity severity, + StringView message) override; + void set_child(LoadBalancingPolicy* child) { child_ = child; } + + private: + bool CalledByPendingChild() const; + bool CalledByCurrentChild() const; + + RefCountedPtr locality_; + LoadBalancingPolicy* child_ = nullptr; + }; + + // Methods for dealing with the child policy. + OrphanablePtr CreateChildPolicyLocked( + const char* name, const grpc_channel_args* args); + grpc_channel_args* CreateChildPolicyArgsLocked( + const grpc_channel_args* args); + + static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); + + XdsLb* xds_policy() const { return locality_map_->xds_policy(); } + + // The owning locality map. + RefCountedPtr locality_map_; + + RefCountedPtr name_; + OrphanablePtr child_policy_; + OrphanablePtr pending_child_policy_; + RefCountedPtr picker_wrapper_; + grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; + uint32_t weight_; + + // States for delayed removal. + grpc_timer delayed_removal_timer_; + grpc_closure on_delayed_removal_timer_; + bool delayed_removal_timer_callback_pending_ = false; + bool shutdown_ = false; + }; + + LocalityMap(RefCountedPtr xds_policy, uint32_t priority); + + void UpdateLocked( + const XdsPriorityListUpdate::LocalityMap& locality_map_update); void ResetBackoffLocked(); + void UpdateXdsPickerLocked(); + OrphanablePtr ExtractLocalityLocked( + const RefCountedPtr& name); void DeactivateLocked(); + // Returns true if this locality map becomes the currently used one (i.e., + // its priority is selected) after reactivation. + bool MaybeReactivateLocked(); + void MaybeCancelFailoverTimerLocked(); + void Orphan() override; + XdsLb* xds_policy() const { return xds_policy_.get(); } + uint32_t priority() const { return priority_; } grpc_connectivity_state connectivity_state() const { return connectivity_state_; } - uint32_t locality_weight() const { return locality_weight_; } - RefCountedPtr picker_wrapper() const { - return picker_wrapper_; + bool failover_timer_callback_pending() const { + return failover_timer_callback_pending_; } private: - class Helper : public ChannelControlHelper { - public: - explicit Helper(RefCountedPtr entry) - : entry_(std::move(entry)) {} - - ~Helper() { entry_.reset(DEBUG_LOCATION, "Helper"); } - - RefCountedPtr CreateSubchannel( - const grpc_channel_args& args) override; - void UpdateState(grpc_connectivity_state state, - UniquePtr picker) override; - void RequestReresolution() override; - void AddTraceEvent(TraceSeverity severity, StringView message) override; - void set_child(LoadBalancingPolicy* child) { child_ = child; } - - private: - bool CalledByPendingChild() const; - bool CalledByCurrentChild() const; - - RefCountedPtr entry_; - LoadBalancingPolicy* child_ = nullptr; - }; + void OnLocalityStateUpdateLocked(); + void UpdateConnectivityStateLocked(); + static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); + static void OnFailoverTimerLocked(void* arg, grpc_error* error); - // Methods for dealing with the child policy. - OrphanablePtr CreateChildPolicyLocked( - const char* name, const grpc_channel_args* args); - grpc_channel_args* CreateChildPolicyArgsLocked( - const grpc_channel_args* args); + PriorityList* priority_list() const { + return &xds_policy_->priority_list_; + } + const XdsPriorityListUpdate& priority_list_update() const { + return xds_policy_->priority_list_update_; + } + const XdsPriorityListUpdate::LocalityMap* locality_map_update() const { + return xds_policy_->priority_list_update_.Find(priority_); + } - static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); + RefCountedPtr xds_policy_; - RefCountedPtr parent_; - RefCountedPtr name_; - OrphanablePtr child_policy_; - OrphanablePtr pending_child_policy_; - RefCountedPtr picker_wrapper_; + Map, OrphanablePtr, + XdsLocalityName::Less> + localities_; + const uint32_t priority_; grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; - uint32_t locality_weight_; - grpc_closure on_delayed_removal_timer_; + + // States for delayed removal. grpc_timer delayed_removal_timer_; + grpc_closure on_delayed_removal_timer_; bool delayed_removal_timer_callback_pending_ = false; + + // States of failover. + grpc_timer failover_timer_; + grpc_closure on_failover_timer_; + bool failover_timer_callback_pending_ = false; }; - explicit LocalityMap(XdsLb* xds_policy) : xds_policy_(xds_policy) {} + explicit PriorityList(XdsLb* xds_policy) : xds_policy_(xds_policy) {} - void UpdateLocked(const XdsLocalityList& locality_list, - LoadBalancingPolicy::Config* child_policy_config, - const grpc_channel_args* args, XdsLb* parent, - bool is_initial_update = false); - void UpdateXdsPickerLocked(); - void ShutdownLocked(); + void UpdateLocked(); void ResetBackoffLocked(); + void ShutdownLocked(); + void UpdateXdsPickerLocked(); + + const XdsPriorityListUpdate& priority_list_update() const { + return xds_policy_->priority_list_update_; + } + uint32_t current_priority() const { return current_priority_; } private: + void MaybeCreateLocalityMapLocked(uint32_t priority); + void FailoverOnConnectionFailureLocked(); + void FailoverOnDisconnectionLocked(uint32_t failed_priority); + void SwitchToHigherPriorityLocked(uint32_t priority); + void DeactivatePrioritiesLowerThan(uint32_t priority); + OrphanablePtr ExtractLocalityLocked( + const RefCountedPtr& name, uint32_t exclude_priority); + // Callers should make sure the priority list is non-empty. + uint32_t LowestPriority() const { + return static_cast(priorities_.size()) - 1; + } + bool Contains(uint32_t priority) { return priority < priorities_.size(); } + XdsLb* xds_policy_; - Map, OrphanablePtr, - XdsLocalityName::Less> - map_; + + // The list of locality maps, indexed by priority. P0 is the highest + // priority. + InlinedVector, 2> priorities_; + // The priority that is being used. + uint32_t current_priority_ = UINT32_MAX; }; ~XdsLb(); @@ -604,14 +700,11 @@ class XdsLb : public LoadBalancingPolicy { // The policy to use for the backends. RefCountedPtr child_policy_config_; const grpc_millis locality_retention_interval_ms_; - // Map of policies to use in the backend - LocalityMap locality_map_; - // TODO(mhaidry) : Add support for multiple maps of localities - // with different priorities - XdsLocalityList locality_list_; - // TODO(mhaidry) : Add a pending locality map that may be swapped with the - // the current one when new localities in the pending map are ready - // to accept connections + const grpc_millis locality_map_failover_timeout_ms_; + // A list of locality maps indexed by priority. + PriorityList priority_list_; + // The update for priority_list_. + XdsPriorityListUpdate priority_list_update_; // The config for dropping calls. RefCountedPtr drop_config_; @@ -1111,7 +1204,7 @@ void XdsLb::LbChannelState::EdsCallState::OnResponseReceivedLocked( GRPC_ERROR_UNREF(parse_error); return; } - if (update.locality_list.empty() && !update.drop_all) { + if (update.priority_list_update.empty() && !update.drop_all) { char* response_slice_str = grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX); gpr_log(GPR_ERROR, @@ -1125,38 +1218,49 @@ void XdsLb::LbChannelState::EdsCallState::OnResponseReceivedLocked( if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p] EDS response with %" PRIuPTR - " localities and %" PRIuPTR + " priorities and %" PRIuPTR " drop categories received (drop_all=%d)", - xdslb_policy, update.locality_list.size(), + xdslb_policy, update.priority_list_update.size(), update.drop_config->drop_category_list().size(), update.drop_all); - for (size_t i = 0; i < update.locality_list.size(); ++i) { - const XdsLocalityInfo& locality = update.locality_list[i]; + for (size_t priority = 0; priority < update.priority_list_update.size(); + ++priority) { + const auto* locality_map_update = + update.priority_list_update.Find(static_cast(priority)); gpr_log(GPR_INFO, - "[xdslb %p] Locality %" PRIuPTR " %s contains %" PRIuPTR - " server addresses", - xdslb_policy, i, - locality.locality_name->AsHumanReadableString(), - locality.serverlist.size()); - for (size_t j = 0; j < locality.serverlist.size(); ++j) { - char* ipport; - grpc_sockaddr_to_string(&ipport, &locality.serverlist[j].address(), - false); + "[xdslb %p] Priority %" PRIuPTR " contains %" PRIuPTR + " localities", + xdslb_policy, priority, locality_map_update->size()); + size_t locality_count = 0; + for (const auto& p : locality_map_update->localities) { + const auto& locality = p.second; gpr_log(GPR_INFO, - "[xdslb %p] Locality %" PRIuPTR - " %s, server address %" PRIuPTR ": %s", - xdslb_policy, i, - locality.locality_name->AsHumanReadableString(), j, ipport); - gpr_free(ipport); + "[xdslb %p] Priority %" PRIuPTR ", locality %" PRIuPTR + " %s contains %" PRIuPTR " server addresses", + xdslb_policy, priority, locality_count, + locality.name->AsHumanReadableString(), + locality.serverlist.size()); + for (size_t i = 0; i < locality.serverlist.size(); ++i) { + char* ipport; + grpc_sockaddr_to_string(&ipport, &locality.serverlist[i].address(), + false); + gpr_log(GPR_INFO, + "[xdslb %p] Priority %" PRIuPTR ", locality %" PRIuPTR + " %s, server address %" PRIuPTR ": %s", + xdslb_policy, priority, locality_count, + locality.name->AsHumanReadableString(), i, ipport); + gpr_free(ipport); + } + ++locality_count; + } + for (size_t i = 0; i < update.drop_config->drop_category_list().size(); + ++i) { + const XdsDropConfig::DropCategory& drop_category = + update.drop_config->drop_category_list()[i]; + gpr_log(GPR_INFO, + "[xdslb %p] Drop category %s has drop rate %d per million", + xdslb_policy, drop_category.name.get(), + drop_category.parts_per_million); } - } - for (size_t i = 0; i < update.drop_config->drop_category_list().size(); - ++i) { - const XdsDropConfig::DropCategory& drop_category = - update.drop_config->drop_category_list()[i]; - gpr_log(GPR_INFO, - "[xdslb %p] Drop category %s has drop rate %d per million", - xdslb_policy, drop_category.name.get(), - drop_category.parts_per_million); } } // Pending LB channel receives a response; promote it. @@ -1187,24 +1291,22 @@ void XdsLb::LbChannelState::EdsCallState::OnResponseReceivedLocked( *xdslb_policy->drop_config_ != *update.drop_config; xdslb_policy->drop_config_ = std::move(update.drop_config); // Ignore identical locality update. - if (xdslb_policy->locality_list_ == update.locality_list) { + if (xdslb_policy->priority_list_update_ == update.priority_list_update) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, - "[xdslb %p] Incoming locality list identical to current, " + "[xdslb %p] Incoming locality update identical to current, " "ignoring. (drop_config_changed=%d)", xdslb_policy, drop_config_changed); } if (drop_config_changed) { - xdslb_policy->locality_map_.UpdateXdsPickerLocked(); + xdslb_policy->priority_list_.UpdateXdsPickerLocked(); } return; } - // Update the locality list. - xdslb_policy->locality_list_ = std::move(update.locality_list); - // Update the locality map. - xdslb_policy->locality_map_.UpdateLocked( - xdslb_policy->locality_list_, xdslb_policy->child_policy_config_.get(), - xdslb_policy->args_, xdslb_policy); + // Update the priority list. + xdslb_policy->priority_list_update_ = + std::move(update.priority_list_update); + xdslb_policy->priority_list_.UpdateLocked(); }(); grpc_slice_unref_internal(response_slice); if (xdslb_policy->shutting_down_) { @@ -1709,7 +1811,10 @@ XdsLb::XdsLb(Args args) locality_retention_interval_ms_(grpc_channel_args_find_integer( args.args, GRPC_ARG_LOCALITY_RETENTION_INTERVAL_MS, {GRPC_XDS_DEFAULT_LOCALITY_RETENTION_INTERVAL_MS, 0, INT_MAX})), - locality_map_(this) { + locality_map_failover_timeout_ms_(grpc_channel_args_find_integer( + args.args, GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS, + {GRPC_XDS_DEFAULT_FAILOVER_TIMEOUT_MS, 0, INT_MAX})), + priority_list_(this) { // Record server name. const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); const char* server_uri = grpc_channel_arg_get_string(arg); @@ -1731,7 +1836,6 @@ XdsLb::~XdsLb() { } gpr_free((void*)server_name_); grpc_channel_args_destroy(args_); - locality_list_.clear(); } void XdsLb::ShutdownLocked() { @@ -1742,7 +1846,7 @@ void XdsLb::ShutdownLocked() { if (fallback_at_startup_checks_pending_) { grpc_timer_cancel(&lb_fallback_timer_); } - locality_map_.ShutdownLocked(); + priority_list_.ShutdownLocked(); if (fallback_policy_ != nullptr) { grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(), interested_parties()); @@ -1770,7 +1874,7 @@ void XdsLb::ResetBackoffLocked() { if (pending_lb_chand_ != nullptr) { grpc_channel_reset_connect_backoff(pending_lb_chand_->channel()); } - locality_map_.ResetBackoffLocked(); + priority_list_.ResetBackoffLocked(); if (fallback_policy_ != nullptr) { fallback_policy_->ResetBackoffLocked(); } @@ -1835,8 +1939,7 @@ void XdsLb::UpdateLocked(UpdateArgs args) { return; } ProcessAddressesAndChannelArgsLocked(std::move(args.addresses), *args.args); - locality_map_.UpdateLocked(locality_list_, child_policy_config_.get(), args_, - this, is_initial_update); + priority_list_.UpdateLocked(); // Update the existing fallback policy. The fallback policy config and/or the // fallback addresses may be new. if (fallback_policy_ != nullptr) UpdateFallbackPolicyLocked(); @@ -2031,150 +2134,444 @@ void XdsLb::MaybeExitFallbackMode() { } // -// XdsLb::LocalityMap +// XdsLb::PriorityList // -void XdsLb::LocalityMap::UpdateLocked( - const XdsLocalityList& locality_list, - LoadBalancingPolicy::Config* child_policy_config, - const grpc_channel_args* args, XdsLb* parent, bool is_initial_update) { - if (parent->shutting_down_) return; - // Add or update the localities in locality_list. - for (size_t i = 0; i < locality_list.size(); i++) { - auto& locality_name = locality_list[i].locality_name; - auto iter = map_.find(locality_name); - // Add a new entry in the locality map if a new locality is received in the - // locality list. - if (iter == map_.end()) { - OrphanablePtr new_entry = MakeOrphanable( - parent->Ref(DEBUG_LOCATION, "LocalityEntry"), locality_name); - iter = map_.emplace(locality_name, std::move(new_entry)).first; +void XdsLb::PriorityList::UpdateLocked() { + const auto& priority_list_update = xds_policy_->priority_list_update_; + // 1. Remove from the priority list the priorities that are not in the update. + DeactivatePrioritiesLowerThan(priority_list_update.LowestPriority()); + // 2. Update all the existing priorities. + for (uint32_t priority = 0; priority < priorities_.size(); ++priority) { + LocalityMap* locality_map = priorities_[priority].get(); + const auto* locality_map_update = priority_list_update.Find(priority); + // Propagate locality_map_update. + // TODO(juanlishen): Find a clean way to skip duplicate update for a + // priority. + locality_map->UpdateLocked(*locality_map_update); + } + // 3. Only create a new locality map if all the existing ones have failed. + if (priorities_.empty() || + !priorities_[priorities_.size() - 1]->failover_timer_callback_pending()) { + const uint32_t new_priority = static_cast(priorities_.size()); + // Create a new locality map. Note that in some rare cases (e.g., the + // locality map reports TRANSIENT_FAILURE synchronously due to subchannel + // sharing), the following invocation may result in multiple locality maps + // to be created. + MaybeCreateLocalityMapLocked(new_priority); + } +} + +void XdsLb::PriorityList::ResetBackoffLocked() { + for (size_t i = 0; i < priorities_.size(); ++i) { + priorities_[i]->ResetBackoffLocked(); + } +} + +void XdsLb::PriorityList::ShutdownLocked() { priorities_.clear(); } + +void XdsLb::PriorityList::UpdateXdsPickerLocked() { + // If we are in fallback mode, don't generate an xds picker from localities. + if (xds_policy_->fallback_policy_ != nullptr) return; + if (current_priority() == UINT32_MAX) { + grpc_error* error = grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ready locality map"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); + xds_policy_->channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, + UniquePtr(New(error))); + return; + } + priorities_[current_priority_]->UpdateXdsPickerLocked(); +} + +void XdsLb::PriorityList::MaybeCreateLocalityMapLocked(uint32_t priority) { + // Exhausted priorities in the update. + if (!priority_list_update().Contains(priority)) return; + auto new_locality_map = New( + xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+LocalityMap"), priority); + priorities_.emplace_back(OrphanablePtr(new_locality_map)); + new_locality_map->UpdateLocked(*priority_list_update().Find(priority)); +} + +void XdsLb::PriorityList::FailoverOnConnectionFailureLocked() { + const uint32_t failed_priority = LowestPriority(); + // If we're failing over from the lowest priority, report TRANSIENT_FAILURE. + if (failed_priority == priority_list_update().LowestPriority()) { + UpdateXdsPickerLocked(); + } + MaybeCreateLocalityMapLocked(failed_priority + 1); +} + +void XdsLb::PriorityList::FailoverOnDisconnectionLocked( + uint32_t failed_priority) { + current_priority_ = UINT32_MAX; + for (uint32_t next_priority = failed_priority + 1; + next_priority <= priority_list_update().LowestPriority(); + ++next_priority) { + if (!Contains(next_priority)) { + MaybeCreateLocalityMapLocked(next_priority); + return; } - // Keep a copy of serverlist in locality_list_ so that we can compare it - // with the future ones. - iter->second->UpdateLocked(locality_list[i].lb_weight, - locality_list[i].serverlist, child_policy_config, - args); - } - // Remove (later) the localities not in locality_list. - for (auto& p : map_) { - const XdsLocalityName* locality_name = p.first.get(); - LocalityEntry* locality_entry = p.second.get(); - bool in_locality_list = false; - for (size_t i = 0; i < locality_list.size(); ++i) { - if (*locality_list[i].locality_name == *locality_name) { - in_locality_list = true; - break; + if (priorities_[next_priority]->MaybeReactivateLocked()) return; + } +} + +void XdsLb::PriorityList::SwitchToHigherPriorityLocked(uint32_t priority) { + current_priority_ = priority; + DeactivatePrioritiesLowerThan(current_priority_); + UpdateXdsPickerLocked(); +} + +void XdsLb::PriorityList::DeactivatePrioritiesLowerThan(uint32_t priority) { + if (priorities_.empty()) return; + // Deactivate the locality maps from the lowest priority. + for (uint32_t p = LowestPriority(); p > priority; --p) { + if (xds_policy_->locality_retention_interval_ms_ == 0) { + priorities_.pop_back(); + } else { + priorities_[p]->DeactivateLocked(); + } + } +} + +OrphanablePtr +XdsLb::PriorityList::ExtractLocalityLocked( + const RefCountedPtr& name, uint32_t exclude_priority) { + for (uint32_t priority = 0; priority < priorities_.size(); ++priority) { + if (priority == exclude_priority) continue; + LocalityMap* locality_map = priorities_[priority].get(); + auto locality = locality_map->ExtractLocalityLocked(name); + if (locality != nullptr) return locality; + } + return nullptr; +} + +// +// XdsLb::PriorityList::LocalityMap +// + +XdsLb::PriorityList::LocalityMap::LocalityMap(RefCountedPtr xds_policy, + uint32_t priority) + : xds_policy_(std::move(xds_policy)), priority_(priority) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { + gpr_log(GPR_INFO, "[xdslb %p] Creating priority %" PRIu32, + xds_policy_.get(), priority_); + } + GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked, + this, grpc_combiner_scheduler(xds_policy_->combiner())); + GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimerLocked, this, + grpc_combiner_scheduler(xds_policy_->combiner())); + // Start the failover timer. + Ref(DEBUG_LOCATION, "LocalityMap+OnFailoverTimerLocked").release(); + grpc_timer_init( + &failover_timer_, + ExecCtx::Get()->Now() + xds_policy_->locality_map_failover_timeout_ms_, + &on_failover_timer_); + failover_timer_callback_pending_ = true; + // This is the first locality map ever created, report CONNECTING. + if (priority_ == 0) { + xds_policy_->channel_control_helper()->UpdateState( + GRPC_CHANNEL_CONNECTING, + UniquePtr( + New(xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker")))); + } +} + +void XdsLb::PriorityList::LocalityMap::UpdateLocked( + const XdsPriorityListUpdate::LocalityMap& locality_map_update) { + if (xds_policy_->shutting_down_) return; + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { + gpr_log(GPR_INFO, "[xdslb %p] Start Updating priority %" PRIu32, + xds_policy(), priority_); + } + // Maybe reactivate the locality map in case all the active locality maps have + // failed. + MaybeReactivateLocked(); + // Remove (later) the localities not in locality_map_update. + for (auto iter = localities_.begin(); iter != localities_.end();) { + const auto& name = iter->first; + Locality* locality = iter->second.get(); + if (locality_map_update.Contains(name)) { + ++iter; + continue; + } + if (xds_policy()->locality_retention_interval_ms_ == 0) { + iter = localities_.erase(iter); + } else { + locality->DeactivateLocked(); + ++iter; + } + } + // Add or update the localities in locality_map_update. + for (const auto& p : locality_map_update.localities) { + const auto& name = p.first; + const auto& locality_update = p.second; + OrphanablePtr& locality = localities_[name]; + if (locality == nullptr) { + // Move from another locality map if possible. + locality = priority_list()->ExtractLocalityLocked(name, priority_); + if (locality != nullptr) { + locality->set_locality_map( + Ref(DEBUG_LOCATION, "LocalityMap+Locality_move")); + } else { + locality = MakeOrphanable( + Ref(DEBUG_LOCATION, "LocalityMap+Locality"), name); } } - if (!in_locality_list) locality_entry->DeactivateLocked(); + // Keep a copy of serverlist in the update so that we can compare it + // with the future ones. + locality->UpdateLocked(locality_update.lb_weight, + locality_update.serverlist); } - // Generate a new xds picker immediately. - if (!is_initial_update) UpdateXdsPickerLocked(); } -void XdsLb::LocalityMap::UpdateXdsPickerLocked() { - // If we are in fallback mode, don't generate an xds picker from localities. - if (xds_policy_->fallback_policy_ != nullptr) return; +void XdsLb::PriorityList::LocalityMap::ResetBackoffLocked() { + for (auto& p : localities_) p.second->ResetBackoffLocked(); +} + +void XdsLb::PriorityList::LocalityMap::UpdateXdsPickerLocked() { // Construct a new xds picker which maintains a map of all locality pickers // that are ready. Each locality is represented by a portion of the range // proportional to its weight, such that the total range is the sum of the // weights of all localities. + Picker::PickerList picker_list; uint32_t end = 0; + for (const auto& p : localities_) { + const auto& locality_name = p.first; + const Locality* locality = p.second.get(); + // Skip the localities that are not in the latest locality map update. + if (!locality_map_update()->Contains(locality_name)) continue; + if (locality->connectivity_state() != GRPC_CHANNEL_READY) continue; + end += locality->weight(); + picker_list.push_back(MakePair(end, locality->picker_wrapper())); + } + xds_policy()->channel_control_helper()->UpdateState( + GRPC_CHANNEL_READY, UniquePtr(New( + xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+Picker"), + std::move(picker_list)))); +} + +OrphanablePtr +XdsLb::PriorityList::LocalityMap::ExtractLocalityLocked( + const RefCountedPtr& name) { + for (auto iter = localities_.begin(); iter != localities_.end(); ++iter) { + const auto& name_in_map = iter->first; + if (*name_in_map == *name) { + auto locality = std::move(iter->second); + localities_.erase(iter); + return locality; + } + } + return nullptr; +} + +void XdsLb::PriorityList::LocalityMap::DeactivateLocked() { + // If already deactivated, don't do it again. + if (delayed_removal_timer_callback_pending_) return; + MaybeCancelFailoverTimerLocked(); + // Start a timer to delete the locality. + Ref(DEBUG_LOCATION, "LocalityMap+timer").release(); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { + gpr_log(GPR_INFO, + "[xdslb %p] Will remove priority %" PRIu32 " in %" PRId64 " ms.", + xds_policy(), priority_, + xds_policy()->locality_retention_interval_ms_); + } + grpc_timer_init( + &delayed_removal_timer_, + ExecCtx::Get()->Now() + xds_policy()->locality_retention_interval_ms_, + &on_delayed_removal_timer_); + delayed_removal_timer_callback_pending_ = true; +} + +bool XdsLb::PriorityList::LocalityMap::MaybeReactivateLocked() { + // Don't reactivate a priority that is not higher than the current one. + if (priority_ >= priority_list()->current_priority()) return false; + // Reactivate this priority by cancelling deletion timer. + if (delayed_removal_timer_callback_pending_) { + grpc_timer_cancel(&delayed_removal_timer_); + } + // Switch to this higher priority if it's READY. + if (connectivity_state_ != GRPC_CHANNEL_READY) return false; + priority_list()->SwitchToHigherPriorityLocked(priority_); + return true; +} + +void XdsLb::PriorityList::LocalityMap::MaybeCancelFailoverTimerLocked() { + if (failover_timer_callback_pending_) grpc_timer_cancel(&failover_timer_); +} + +void XdsLb::PriorityList::LocalityMap::Orphan() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { + gpr_log(GPR_INFO, "[xdslb %p] Priority %" PRIu32 " orphaned.", xds_policy(), + priority_); + } + MaybeCancelFailoverTimerLocked(); + if (delayed_removal_timer_callback_pending_) { + grpc_timer_cancel(&delayed_removal_timer_); + } + localities_.clear(); + Unref(DEBUG_LOCATION, "LocalityMap+Orphan"); +} + +void XdsLb::PriorityList::LocalityMap::OnLocalityStateUpdateLocked() { + UpdateConnectivityStateLocked(); + // Ignore priorities not in priority_list_update. + if (!priority_list_update().Contains(priority_)) return; + const uint32_t current_priority = priority_list()->current_priority(); + // Ignore lower-than-current priorities. + if (priority_ > current_priority) return; + // Maybe update fallback state. + if (connectivity_state_ == GRPC_CHANNEL_READY) { + xds_policy_->MaybeCancelFallbackAtStartupChecks(); + xds_policy_->MaybeExitFallbackMode(); + } + // Update is for a higher-than-current priority. (Special case: update is for + // any active priority if there is no current priority.) + if (priority_ < current_priority) { + if (connectivity_state_ == GRPC_CHANNEL_READY) { + MaybeCancelFailoverTimerLocked(); + // If a higher-than-current priority becomes READY, switch to use it. + priority_list()->SwitchToHigherPriorityLocked(priority_); + } else if (connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // If a higher-than-current priority becomes TRANSIENT_FAILURE, only + // handle it if it's the priority that is still in failover timeout. + if (failover_timer_callback_pending_) { + MaybeCancelFailoverTimerLocked(); + priority_list()->FailoverOnConnectionFailureLocked(); + } + } + return; + } + // Update is for current priority. + if (connectivity_state_ != GRPC_CHANNEL_READY) { + // Fail over if it's no longer READY. + priority_list()->FailoverOnDisconnectionLocked(priority_); + } + // At this point, one of the following things has happened to the current + // priority. + // 1. It remained the same (but received picker update from its localities). + // 2. It changed to a lower priority due to failover. + // 3. It became invalid because failover didn't yield a READY priority. + // In any case, update the xds picker. + priority_list()->UpdateXdsPickerLocked(); +} + +void XdsLb::PriorityList::LocalityMap::UpdateConnectivityStateLocked() { + size_t num_ready = 0; size_t num_connecting = 0; size_t num_idle = 0; size_t num_transient_failures = 0; - Picker::PickerList pickers; - for (auto& p : map_) { - const LocalityEntry* entry = p.second.get(); - if (entry->locality_weight() == 0) continue; - switch (entry->connectivity_state()) { + for (const auto& p : localities_) { + const auto& locality_name = p.first; + const Locality* locality = p.second.get(); + // Skip the localities that are not in the latest locality map update. + if (!locality_map_update()->Contains(locality_name)) continue; + switch (locality->connectivity_state()) { case GRPC_CHANNEL_READY: { - end += entry->locality_weight(); - pickers.push_back(MakePair(end, entry->picker_wrapper())); + ++num_ready; break; } case GRPC_CHANNEL_CONNECTING: { - num_connecting++; + ++num_connecting; break; } case GRPC_CHANNEL_IDLE: { - num_idle++; + ++num_idle; break; } case GRPC_CHANNEL_TRANSIENT_FAILURE: { - num_transient_failures++; + ++num_transient_failures; break; } default: GPR_UNREACHABLE_CODE(return ); } } - // Pass on the constructed xds picker if it has any ready pickers in their map - // otherwise pass a QueuePicker if any of the locality pickers are in a - // connecting or idle state, finally return a transient failure picker if all - // locality pickers are in transient failure. - if (!pickers.empty()) { - xds_policy_->channel_control_helper()->UpdateState( - GRPC_CHANNEL_READY, - UniquePtr( - New(xds_policy_->Ref(DEBUG_LOCATION, "XdsLb+Picker"), - std::move(pickers)))); + if (num_ready > 0) { + connectivity_state_ = GRPC_CHANNEL_READY; } else if (num_connecting > 0) { - xds_policy_->channel_control_helper()->UpdateState( - GRPC_CHANNEL_CONNECTING, - UniquePtr( - New(xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker")))); + connectivity_state_ = GRPC_CHANNEL_CONNECTING; } else if (num_idle > 0) { - xds_policy_->channel_control_helper()->UpdateState( - GRPC_CHANNEL_IDLE, - UniquePtr( - New(xds_policy_->Ref(DEBUG_LOCATION, "QueuePicker")))); + connectivity_state_ = GRPC_CHANNEL_IDLE; } else { - grpc_error* error = - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "connections to all active localities failing"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); - xds_policy_->channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, - UniquePtr(New(error))); + connectivity_state_ = GRPC_CHANNEL_TRANSIENT_FAILURE; + } + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { + gpr_log(GPR_INFO, + "[xdslb %p] Priority %" PRIu32 " (%p) connectivity changed to %s", + xds_policy(), priority_, this, + ConnectivityStateName(connectivity_state_)); } } -void XdsLb::LocalityMap::ShutdownLocked() { map_.clear(); } +void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked( + void* arg, grpc_error* error) { + LocalityMap* self = static_cast(arg); + self->delayed_removal_timer_callback_pending_ = false; + if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) { + auto* priority_list = self->priority_list(); + const bool keep = self->priority_list_update().Contains(self->priority_) && + self->priority_ <= priority_list->current_priority(); + if (!keep) { + // This check is to make sure we always delete the locality maps from the + // lowest priority even if the closures of the back-to-back timers are not + // run in FIFO order. + // TODO(juanlishen): Eliminate unnecessary maintenance overhead for some + // deactivated locality maps when out-of-order closures are run. + // TODO(juanlishen): Check the timer implementation to see if this defense + // is necessary. + if (self->priority_ == priority_list->LowestPriority()) { + priority_list->priorities_.pop_back(); + } else { + gpr_log(GPR_ERROR, + "[xdslb %p] Priority %" PRIu32 + " is not the lowest priority (highest numeric value) but is " + "attempted to be deleted.", + self->xds_policy(), self->priority_); + } + } + } + self->Unref(DEBUG_LOCATION, "LocalityMap+timer"); +} -void XdsLb::LocalityMap::ResetBackoffLocked() { - for (auto& p : map_) { - p.second->ResetBackoffLocked(); +void XdsLb::PriorityList::LocalityMap::OnFailoverTimerLocked( + void* arg, grpc_error* error) { + LocalityMap* self = static_cast(arg); + self->failover_timer_callback_pending_ = false; + if (error == GRPC_ERROR_NONE && !self->xds_policy_->shutting_down_) { + self->priority_list()->FailoverOnConnectionFailureLocked(); } + self->Unref(DEBUG_LOCATION, "LocalityMap+OnFailoverTimerLocked"); } // -// XdsLb::LocalityMap::LocalityEntry +// XdsLb::PriorityList::LocalityMap::Locality // -XdsLb::LocalityMap::LocalityEntry::LocalityEntry( - RefCountedPtr parent, RefCountedPtr name) - : parent_(std::move(parent)), name_(std::move(name)) { +XdsLb::PriorityList::LocalityMap::Locality::Locality( + RefCountedPtr locality_map, + RefCountedPtr name) + : locality_map_(std::move(locality_map)), name_(std::move(name)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log(GPR_INFO, "[xdslb %p] created LocalityEntry %p for %s", - parent_.get(), this, name_->AsHumanReadableString()); + gpr_log(GPR_INFO, "[xdslb %p] created Locality %p for %s", xds_policy(), + this, name_->AsHumanReadableString()); } GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked, - this, grpc_combiner_scheduler(parent_->combiner())); + this, grpc_combiner_scheduler(xds_policy()->combiner())); } -XdsLb::LocalityMap::LocalityEntry::~LocalityEntry() { +XdsLb::PriorityList::LocalityMap::Locality::~Locality() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log(GPR_INFO, - "[xdslb %p] LocalityEntry %p %s: destroying locality entry", - parent_.get(), this, name_->AsHumanReadableString()); + gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: destroying locality", + xds_policy(), this, name_->AsHumanReadableString()); } - parent_.reset(DEBUG_LOCATION, "LocalityEntry"); + locality_map_.reset(DEBUG_LOCATION, "Locality"); } grpc_channel_args* -XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyArgsLocked( +XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyArgsLocked( const grpc_channel_args* args_in) { const grpc_arg args_to_add[] = { // A channel arg indicating if the target is a backend inferred from a @@ -2192,11 +2589,11 @@ XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyArgsLocked( } OrphanablePtr -XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyLocked( +XdsLb::PriorityList::LocalityMap::Locality::CreateChildPolicyLocked( const char* name, const grpc_channel_args* args) { Helper* helper = New(this->Ref(DEBUG_LOCATION, "Helper")); LoadBalancingPolicy::Args lb_policy_args; - lb_policy_args.combiner = parent_->combiner(); + lb_policy_args.combiner = xds_policy()->combiner(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = UniquePtr(helper); @@ -2205,41 +2602,40 @@ XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyLocked( name, std::move(lb_policy_args)); if (GPR_UNLIKELY(lb_policy == nullptr)) { gpr_log(GPR_ERROR, - "[xdslb %p] LocalityEntry %p %s: failure creating child policy %s", - parent_.get(), this, name_->AsHumanReadableString(), name); + "[xdslb %p] Locality %p %s: failure creating child policy %s", + locality_map_.get(), this, name_->AsHumanReadableString(), name); return nullptr; } helper->set_child(lb_policy.get()); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, - "[xdslb %p] LocalityEntry %p %s: Created new child policy %s (%p)", - parent_.get(), this, name_->AsHumanReadableString(), name, + "[xdslb %p] Locality %p %s: Created new child policy %s (%p)", + locality_map_.get(), this, name_->AsHumanReadableString(), name, lb_policy.get()); } // Add the xDS's interested_parties pollset_set to that of the newly created // child policy. This will make the child policy progress upon activity on xDS // LB, which in turn is tied to the application's call. grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), - parent_->interested_parties()); + xds_policy()->interested_parties()); return lb_policy; } -void XdsLb::LocalityMap::LocalityEntry::UpdateLocked( - uint32_t locality_weight, ServerAddressList serverlist, - LoadBalancingPolicy::Config* child_policy_config, - const grpc_channel_args* args_in) { - if (parent_->shutting_down_) return; +void XdsLb::PriorityList::LocalityMap::Locality::UpdateLocked( + uint32_t locality_weight, ServerAddressList serverlist) { + if (xds_policy()->shutting_down_) return; // Update locality weight. - locality_weight_ = locality_weight; + weight_ = locality_weight; if (delayed_removal_timer_callback_pending_) { grpc_timer_cancel(&delayed_removal_timer_); } // Construct update args. UpdateArgs update_args; update_args.addresses = std::move(serverlist); - update_args.config = - child_policy_config == nullptr ? nullptr : child_policy_config->Ref(); - update_args.args = CreateChildPolicyArgsLocked(args_in); + update_args.config = xds_policy()->child_policy_config_ == nullptr + ? nullptr + : xds_policy()->child_policy_config_->Ref(); + update_args.args = CreateChildPolicyArgsLocked(xds_policy()->args_); // If the child policy name changes, we need to create a new child // policy. When this happens, we leave child_policy_ as-is and store // the new child policy in pending_child_policy_. Once the new child @@ -2291,9 +2687,10 @@ void XdsLb::LocalityMap::LocalityEntry::UpdateLocked( // when the new child transitions into state READY. // TODO(juanlishen): If the child policy is not configured via service config, // use whatever algorithm is specified by the balancer. - const char* child_policy_name = child_policy_config == nullptr - ? "round_robin" - : child_policy_config->name(); + const char* child_policy_name = + xds_policy()->child_policy_config_ == nullptr + ? "round_robin" + : xds_policy()->child_policy_config_->name(); const bool create_policy = // case 1 child_policy_ == nullptr || @@ -2310,8 +2707,8 @@ void XdsLb::LocalityMap::LocalityEntry::UpdateLocked( // pending_child_policy_ (cases 2b and 3b). if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, - "[xdslb %p] LocalityEntry %p %s: Creating new %schild policy %s", - parent_.get(), this, name_->AsHumanReadableString(), + "[xdslb %p] Locality %p %s: Creating new %schild policy %s", + locality_map_.get(), this, name_->AsHumanReadableString(), child_policy_ == nullptr ? "" : "pending ", child_policy_name); } auto& lb_policy = @@ -2329,30 +2726,28 @@ void XdsLb::LocalityMap::LocalityEntry::UpdateLocked( GPR_ASSERT(policy_to_update != nullptr); // Update the policy. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log(GPR_INFO, - "[xdslb %p] LocalityEntry %p %s: Updating %schild policy %p", - parent_.get(), this, name_->AsHumanReadableString(), + gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: Updating %schild policy %p", + locality_map_.get(), this, name_->AsHumanReadableString(), policy_to_update == pending_child_policy_.get() ? "pending " : "", policy_to_update); } policy_to_update->UpdateLocked(std::move(update_args)); } -void XdsLb::LocalityMap::LocalityEntry::ShutdownLocked() { +void XdsLb::PriorityList::LocalityMap::Locality::ShutdownLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { - gpr_log(GPR_INFO, - "[xdslb %p] LocalityEntry %p %s: shutting down locality entry", - parent_.get(), this, name_->AsHumanReadableString()); + gpr_log(GPR_INFO, "[xdslb %p] Locality %p %s: shutting down locality", + locality_map_.get(), this, name_->AsHumanReadableString()); } // Remove the child policy's interested_parties pollset_set from the // xDS policy. grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), - parent_->interested_parties()); + xds_policy()->interested_parties()); child_policy_.reset(); if (pending_child_policy_ != nullptr) { grpc_pollset_set_del_pollset_set( pending_child_policy_->interested_parties(), - parent_->interested_parties()); + xds_policy()->interested_parties()); pending_child_policy_.reset(); } // Drop our ref to the child's picker, in case it's holding a ref to @@ -2361,141 +2756,138 @@ void XdsLb::LocalityMap::LocalityEntry::ShutdownLocked() { if (delayed_removal_timer_callback_pending_) { grpc_timer_cancel(&delayed_removal_timer_); } + shutdown_ = true; } -void XdsLb::LocalityMap::LocalityEntry::ResetBackoffLocked() { +void XdsLb::PriorityList::LocalityMap::Locality::ResetBackoffLocked() { child_policy_->ResetBackoffLocked(); if (pending_child_policy_ != nullptr) { pending_child_policy_->ResetBackoffLocked(); } } -void XdsLb::LocalityMap::LocalityEntry::Orphan() { +void XdsLb::PriorityList::LocalityMap::Locality::Orphan() { ShutdownLocked(); Unref(); } -void XdsLb::LocalityMap::LocalityEntry::DeactivateLocked() { - // If locality retaining is disabled, delete the locality immediately. - if (parent_->locality_retention_interval_ms_ == 0) { - parent_->locality_map_.map_.erase(name_); - return; - } +void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() { // If already deactivated, don't do that again. - if (locality_weight_ == 0) return; + if (weight_ == 0) return; // Set the locality weight to 0 so that future xds picker won't contain this // locality. - locality_weight_ = 0; + weight_ = 0; // Start a timer to delete the locality. - Ref(DEBUG_LOCATION, "LocalityEntry+timer").release(); + Ref(DEBUG_LOCATION, "Locality+timer").release(); grpc_timer_init( &delayed_removal_timer_, - ExecCtx::Get()->Now() + parent_->locality_retention_interval_ms_, + ExecCtx::Get()->Now() + xds_policy()->locality_retention_interval_ms_, &on_delayed_removal_timer_); delayed_removal_timer_callback_pending_ = true; } -void XdsLb::LocalityMap::LocalityEntry::OnDelayedRemovalTimerLocked( +void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimerLocked( void* arg, grpc_error* error) { - LocalityEntry* self = static_cast(arg); + Locality* self = static_cast(arg); self->delayed_removal_timer_callback_pending_ = false; - if (error == GRPC_ERROR_NONE && self->locality_weight_ == 0) { - self->parent_->locality_map_.map_.erase(self->name_); + if (error == GRPC_ERROR_NONE && !self->shutdown_ && self->weight_ == 0) { + self->locality_map_->localities_.erase(self->name_); } - self->Unref(DEBUG_LOCATION, "LocalityEntry+timer"); + self->Unref(DEBUG_LOCATION, "Locality+timer"); } // -// XdsLb::LocalityEntry::Helper +// XdsLb::Locality::Helper // -bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByPendingChild() const { +bool XdsLb::PriorityList::LocalityMap::Locality::Helper::CalledByPendingChild() + const { GPR_ASSERT(child_ != nullptr); - return child_ == entry_->pending_child_policy_.get(); + return child_ == locality_->pending_child_policy_.get(); } -bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByCurrentChild() const { +bool XdsLb::PriorityList::LocalityMap::Locality::Helper::CalledByCurrentChild() + const { GPR_ASSERT(child_ != nullptr); - return child_ == entry_->child_policy_.get(); + return child_ == locality_->child_policy_.get(); } RefCountedPtr -XdsLb::LocalityMap::LocalityEntry::Helper::CreateSubchannel( +XdsLb::PriorityList::LocalityMap::Locality::Helper::CreateSubchannel( const grpc_channel_args& args) { - if (entry_->parent_->shutting_down_ || + if (locality_->xds_policy()->shutting_down_ || (!CalledByPendingChild() && !CalledByCurrentChild())) { return nullptr; } - return entry_->parent_->channel_control_helper()->CreateSubchannel(args); + return locality_->xds_policy()->channel_control_helper()->CreateSubchannel( + args); } -void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState( +void XdsLb::PriorityList::LocalityMap::Locality::Helper::UpdateState( grpc_connectivity_state state, UniquePtr picker) { - if (entry_->parent_->shutting_down_) return; + if (locality_->xds_policy()->shutting_down_) return; // If this request is from the pending child policy, ignore it until // it reports READY, at which point we swap it into place. if (CalledByPendingChild()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p helper %p] pending child policy %p reports state=%s", - entry_->parent_.get(), this, entry_->pending_child_policy_.get(), + locality_->xds_policy(), this, + locality_->pending_child_policy_.get(), ConnectivityStateName(state)); } if (state != GRPC_CHANNEL_READY) return; grpc_pollset_set_del_pollset_set( - entry_->child_policy_->interested_parties(), - entry_->parent_->interested_parties()); - entry_->child_policy_ = std::move(entry_->pending_child_policy_); + locality_->child_policy_->interested_parties(), + locality_->xds_policy()->interested_parties()); + locality_->child_policy_ = std::move(locality_->pending_child_policy_); } else if (!CalledByCurrentChild()) { // This request is from an outdated child, so ignore it. return; } - // At this point, child_ must be the current child policy. - if (state == GRPC_CHANNEL_READY) { - entry_->parent_->MaybeCancelFallbackAtStartupChecks(); - entry_->parent_->MaybeExitFallbackMode(); - } - GPR_ASSERT(entry_->parent_->lb_chand_ != nullptr); - // Cache the picker and its state in the entry. - entry_->picker_wrapper_ = MakeRefCounted( + GPR_ASSERT(locality_->xds_policy()->lb_chand_ != nullptr); + // Cache the picker and its state in the locality. + locality_->picker_wrapper_ = MakeRefCounted( std::move(picker), - entry_->parent_->client_stats_.FindLocalityStats(entry_->name_)); - entry_->connectivity_state_ = state; - // Construct a new xds picker and pass it to the channel. - entry_->parent_->locality_map_.UpdateXdsPickerLocked(); + locality_->xds_policy()->client_stats_.FindLocalityStats( + locality_->name_)); + locality_->connectivity_state_ = state; + // Notify the locality map. + locality_->locality_map_->OnLocalityStateUpdateLocked(); } -void XdsLb::LocalityMap::LocalityEntry::Helper::RequestReresolution() { - if (entry_->parent_->shutting_down_) return; +void XdsLb::PriorityList::LocalityMap::Locality::Helper::RequestReresolution() { + if (locality_->xds_policy()->shutting_down_) return; // If there is a pending child policy, ignore re-resolution requests // from the current child policy (or any outdated child). - if (entry_->pending_child_policy_ != nullptr && !CalledByPendingChild()) { + if (locality_->pending_child_policy_ != nullptr && !CalledByPendingChild()) { return; } if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { gpr_log(GPR_INFO, "[xdslb %p] Re-resolution requested from the internal RR policy " "(%p).", - entry_->parent_.get(), entry_->child_policy_.get()); + locality_->xds_policy(), locality_->child_policy_.get()); } - GPR_ASSERT(entry_->parent_->lb_chand_ != nullptr); + GPR_ASSERT(locality_->xds_policy()->lb_chand_ != nullptr); // If we are talking to a balancer, we expect to get updated addresses // from the balancer, so we can ignore the re-resolution request from // the child policy. Otherwise, pass the re-resolution request up to the // channel. - if (entry_->parent_->lb_chand_->eds_calld() == nullptr || - !entry_->parent_->lb_chand_->eds_calld()->seen_response()) { - entry_->parent_->channel_control_helper()->RequestReresolution(); + if (locality_->xds_policy()->lb_chand_->eds_calld() == nullptr || + !locality_->xds_policy()->lb_chand_->eds_calld()->seen_response()) { + locality_->xds_policy()->channel_control_helper()->RequestReresolution(); } } -void XdsLb::LocalityMap::LocalityEntry::Helper::AddTraceEvent( +void XdsLb::PriorityList::LocalityMap::Locality::Helper::AddTraceEvent( TraceSeverity severity, StringView message) { - if (entry_->parent_->shutting_down_ || + if (locality_->xds_policy()->shutting_down_ || (!CalledByPendingChild() && !CalledByCurrentChild())) { return; } - entry_->parent_->channel_control_helper()->AddTraceEvent(severity, message); + locality_->xds_policy()->channel_control_helper()->AddTraceEvent(severity, + message); } // diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc index e1954719204..a7666b34e53 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.cc @@ -53,6 +53,42 @@ constexpr char kEndpointRequired[] = "endpointRequired"; } // namespace +bool XdsPriorityListUpdate::operator==( + const XdsPriorityListUpdate& other) const { + if (priorities_.size() != other.priorities_.size()) return false; + for (size_t i = 0; i < priorities_.size(); ++i) { + if (priorities_[i].localities != other.priorities_[i].localities) { + return false; + } + } + return true; +} + +void XdsPriorityListUpdate::Add( + XdsPriorityListUpdate::LocalityMap::Locality locality) { + // Pad the missing priorities in case the localities are not ordered by + // priority. + // TODO(juanlishen): Implement InlinedVector::resize() and use that instead. + while (!Contains(locality.priority)) priorities_.emplace_back(); + LocalityMap& locality_map = priorities_[locality.priority]; + locality_map.localities.emplace(locality.name, std::move(locality)); +} + +const XdsPriorityListUpdate::LocalityMap* XdsPriorityListUpdate::Find( + uint32_t priority) const { + if (!Contains(priority)) return nullptr; + return &priorities_[priority]; +} + +bool XdsPriorityListUpdate::Contains( + const RefCountedPtr& name) { + for (size_t i = 0; i < priorities_.size(); ++i) { + const LocalityMap& locality_map = priorities_[i]; + if (locality_map.Contains(name)) return true; + } + return false; +} + bool XdsDropConfig::ShouldDrop(const UniquePtr** category_name) const { for (size_t i = 0; i < drop_category_list_.size(); ++i) { const auto& drop_category = drop_category_list_[i]; @@ -136,7 +172,7 @@ UniquePtr StringCopy(const upb_strview& strview) { grpc_error* LocalityParse( const envoy_api_v2_endpoint_LocalityLbEndpoints* locality_lb_endpoints, - XdsLocalityInfo* locality_info) { + XdsPriorityListUpdate::LocalityMap::Locality* output_locality) { // Parse LB weight. const google_protobuf_UInt32Value* lb_weight = envoy_api_v2_endpoint_LocalityLbEndpoints_load_balancing_weight( @@ -144,13 +180,13 @@ grpc_error* LocalityParse( // If LB weight is not specified, it means this locality is assigned no load. // TODO(juanlishen): When we support CDS to configure the inter-locality // policy, we should change the LB weight handling. - locality_info->lb_weight = + output_locality->lb_weight = lb_weight != nullptr ? google_protobuf_UInt32Value_value(lb_weight) : 0; - if (locality_info->lb_weight == 0) return GRPC_ERROR_NONE; + if (output_locality->lb_weight == 0) return GRPC_ERROR_NONE; // Parse locality name. const envoy_api_v2_core_Locality* locality = envoy_api_v2_endpoint_LocalityLbEndpoints_locality(locality_lb_endpoints); - locality_info->locality_name = MakeRefCounted( + output_locality->name = MakeRefCounted( StringCopy(envoy_api_v2_core_Locality_region(locality)), StringCopy(envoy_api_v2_core_Locality_zone(locality)), StringCopy(envoy_api_v2_core_Locality_sub_zone(locality))); @@ -160,12 +196,12 @@ grpc_error* LocalityParse( envoy_api_v2_endpoint_LocalityLbEndpoints_lb_endpoints( locality_lb_endpoints, &size); for (size_t i = 0; i < size; ++i) { - grpc_error* error = ServerAddressParseAndAppend(lb_endpoints[i], - &locality_info->serverlist); + grpc_error* error = ServerAddressParseAndAppend( + lb_endpoints[i], &output_locality->serverlist); if (error != GRPC_ERROR_NONE) return error; } // Parse the priority. - locality_info->priority = + output_locality->priority = envoy_api_v2_endpoint_LocalityLbEndpoints_priority(locality_lb_endpoints); return GRPC_ERROR_NONE; } @@ -253,18 +289,13 @@ grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response, envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment, &size); for (size_t i = 0; i < size; ++i) { - XdsLocalityInfo locality_info; - grpc_error* error = LocalityParse(endpoints[i], &locality_info); + XdsPriorityListUpdate::LocalityMap::Locality locality; + grpc_error* error = LocalityParse(endpoints[i], &locality); if (error != GRPC_ERROR_NONE) return error; // Filter out locality with weight 0. - if (locality_info.lb_weight == 0) continue; - update->locality_list.push_back(std::move(locality_info)); + if (locality.lb_weight == 0) continue; + update->priority_list_update.Add(locality); } - // The locality list is sorted here into deterministic order so that it's - // easier to check if two locality lists contain the same set of localities. - std::sort(update->locality_list.data(), - update->locality_list.data() + update->locality_list.size(), - XdsLocalityInfo::Less()); // Get the drop config. update->drop_config = MakeRefCounted(); const envoy_api_v2_ClusterLoadAssignment_Policy* policy = diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h index 03155d8450b..1b56bef7d81 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_load_balancer_api.h @@ -23,33 +23,66 @@ #include +#include #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" #include "src/core/ext/filters/client_channel/server_address.h" namespace grpc_core { -struct XdsLocalityInfo { - bool operator==(const XdsLocalityInfo& other) const { - return *locality_name == *other.locality_name && - serverlist == other.serverlist && lb_weight == other.lb_weight && - priority == other.priority; - } - - // This comparator only compares the locality names. - struct Less { - bool operator()(const XdsLocalityInfo& lhs, - const XdsLocalityInfo& rhs) const { - return XdsLocalityName::Less()(lhs.locality_name, rhs.locality_name); +class XdsPriorityListUpdate { + public: + struct LocalityMap { + struct Locality { + bool operator==(const Locality& other) const { + return *name == *other.name && serverlist == other.serverlist && + lb_weight == other.lb_weight && priority == other.priority; + } + + // This comparator only compares the locality names. + struct Less { + bool operator()(const Locality& lhs, const Locality& rhs) const { + return XdsLocalityName::Less()(lhs.name, rhs.name); + } + }; + + RefCountedPtr name; + ServerAddressList serverlist; + uint32_t lb_weight; + uint32_t priority; + }; + + bool Contains(const RefCountedPtr& name) const { + return localities.find(name) != localities.end(); } + + size_t size() const { return localities.size(); } + + Map, Locality, XdsLocalityName::Less> + localities; }; - RefCountedPtr locality_name; - ServerAddressList serverlist; - uint32_t lb_weight; - uint32_t priority; -}; + bool operator==(const XdsPriorityListUpdate& other) const; + + void Add(LocalityMap::Locality locality); + + const LocalityMap* Find(uint32_t priority) const; -using XdsLocalityList = InlinedVector; + bool Contains(uint32_t priority) const { + return priority < priorities_.size(); + } + bool Contains(const RefCountedPtr& name); + + bool empty() const { return priorities_.empty(); } + size_t size() const { return priorities_.size(); } + + // Callers should make sure the priority list is non-empty. + uint32_t LowestPriority() const { + return static_cast(priorities_.size()) - 1; + } + + private: + InlinedVector priorities_; +}; // There are two phases of accessing this class's content: // 1. to initialize in the control plane combiner; @@ -93,7 +126,7 @@ class XdsDropConfig : public RefCounted { }; struct XdsUpdate { - XdsLocalityList locality_list; + XdsPriorityListUpdate priority_list_update; RefCountedPtr drop_config; bool drop_all = false; }; diff --git a/src/core/lib/gprpp/inlined_vector.h b/src/core/lib/gprpp/inlined_vector.h index c5ae0e8e65d..74e012e1623 100644 --- a/src/core/lib/gprpp/inlined_vector.h +++ b/src/core/lib/gprpp/inlined_vector.h @@ -107,6 +107,10 @@ class InlinedVector { return true; } + bool operator!=(const InlinedVector& other) const { + return !(*this == other); + } + void reserve(size_t capacity) { if (capacity > capacity_) { T* new_dynamic = diff --git a/test/core/gprpp/inlined_vector_test.cc b/test/core/gprpp/inlined_vector_test.cc index 4b7e46761c0..f2279992ee0 100644 --- a/test/core/gprpp/inlined_vector_test.cc +++ b/test/core/gprpp/inlined_vector_test.cc @@ -127,6 +127,24 @@ TEST(InlinedVectorTest, EqualOperator) { EXPECT_FALSE(v1 == v2); } +TEST(InlinedVectorTest, NotEqualOperator) { + constexpr int kNumElements = 10; + // Both v1 and v2 are empty. + InlinedVector v1; + InlinedVector v2; + EXPECT_FALSE(v1 != v2); + // Both v1 and v2 contains the same data. + FillVector(&v1, kNumElements); + FillVector(&v2, kNumElements); + EXPECT_FALSE(v1 != v2); + // The sizes of v1 and v2 are different. + v1.push_back(0); + EXPECT_TRUE(v1 != v2); + // The contents of v1 and v2 are different although their sizes are the same. + v2.push_back(1); + EXPECT_TRUE(v1 != v2); +} + // the following constants and typedefs are used for copy/move // construction/assignment const size_t kInlinedLength = 8; diff --git a/test/core/util/ubsan_suppressions.txt b/test/core/util/ubsan_suppressions.txt index 06533d9eb62..015f4b89d78 100644 --- a/test/core/util/ubsan_suppressions.txt +++ b/test/core/util/ubsan_suppressions.txt @@ -22,3 +22,7 @@ signed-integer-overflow:chrono enum:grpc_http2_error_to_grpc_status enum:grpc_chttp2_cancel_stream enum:api_fuzzer +# TODO(juanlishen): Remove this supression after +# https://github.com/GoogleCloudPlatform/layer-definitions/issues/531 is +# addressed. +alignment:grpc_core::XdsPriorityListUpdate::* diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 2a34b5a8151..942216028e6 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -99,6 +99,7 @@ constexpr char kDefaultLocalityZone[] = "xds_default_locality_zone"; constexpr char kLbDropType[] = "lb"; constexpr char kThrottleDropType[] = "throttle"; constexpr int kDefaultLocalityWeight = 3; +constexpr int kDefaultLocalityPriority = 0; template class CountedService : public ServiceType { @@ -262,7 +263,8 @@ class EdsServiceImpl : public EdsService { struct ResponseArgs { struct Locality { Locality(const grpc::string& sub_zone, std::vector ports, - int lb_weight = kDefaultLocalityWeight, int priority = 0) + int lb_weight = kDefaultLocalityWeight, + int priority = kDefaultLocalityPriority) : sub_zone(std::move(sub_zone)), ports(std::move(ports)), lb_weight(lb_weight), @@ -566,7 +568,7 @@ class XdsEnd2endTest : public ::testing::Test { void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); } - void ResetStub(int fallback_timeout = 0, + void ResetStub(int fallback_timeout = 0, int failover_timeout = 0, const grpc::string& expected_targets = "", grpc::string scheme = "") { ChannelArguments args; @@ -574,6 +576,9 @@ class XdsEnd2endTest : public ::testing::Test { if (fallback_timeout > 0) { args.SetInt(GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS, fallback_timeout); } + if (failover_timeout > 0) { + args.SetInt(GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS, failover_timeout); + } args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_.get()); if (!expected_targets.empty()) { @@ -922,7 +927,7 @@ class XdsResolverTest : public XdsEnd2endTest { // used. TEST_F(XdsResolverTest, XdsResolverIsUsed) { // Use xds-experimental scheme in URI. - ResetStub(0, "", "xds-experimental"); + ResetStub(0, 0, "", "xds-experimental"); // Send an RPC to trigger resolution. auto unused_result = SendRpc(); // Xds resolver returns xds_experimental as the LB policy. @@ -1067,7 +1072,7 @@ using SecureNamingTest = BasicTest; // Tests that secure naming check passes if target name is expected. TEST_F(SecureNamingTest, TargetNameIsExpected) { // TODO(juanlishen): Use separate fake creds for the balancer channel. - ResetStub(0, kApplicationTargetName_ + ";lb"); + ResetStub(0, 0, kApplicationTargetName_ + ";lb"); SetNextResolution({}, kDefaultServiceConfig_.c_str()); SetNextResolutionForLbChannel({balancers_[0]->port()}); const size_t kNumRpcsPerAddress = 100; @@ -1098,7 +1103,7 @@ TEST_F(SecureNamingTest, TargetNameIsUnexpected) { // the name from the balancer doesn't match expectations. ASSERT_DEATH_IF_SUPPORTED( { - ResetStub(0, kApplicationTargetName_ + ";lb"); + ResetStub(0, 0, kApplicationTargetName_ + ";lb"); SetNextResolution({}, "{\n" " \"loadBalancingConfig\":[\n" @@ -1287,6 +1292,151 @@ TEST_F(LocalityMapTest, UpdateMap) { EXPECT_EQ(2U, balancers_[0]->eds_service()->response_count()); } +class FailoverTest : public BasicTest { + public: + FailoverTest() { ResetStub(0, 100, "", ""); } +}; + +// Localities with the highest priority are used when multiple priority exist. +TEST_F(FailoverTest, ChooseHighestPriority) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + EdsServiceImpl::ResponseArgs args({ + {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1}, + {"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 2}, + {"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 3}, + {"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 0}, + }); + ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0); + WaitForBackend(3, false); + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ(0, backends_[i]->backend_service()->request_count()); + } + // The EDS service got a single request, and sent a single response. + EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count()); + EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count()); +} + +// If the higher priority localities are not reachable, failover to the highest +// priority among the rest. +TEST_F(FailoverTest, Failover) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + EdsServiceImpl::ResponseArgs args({ + {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1}, + {"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 2}, + {"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 3}, + {"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 0}, + }); + ShutdownBackend(3); + ShutdownBackend(0); + ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0); + WaitForBackend(1, false); + for (size_t i = 0; i < 4; ++i) { + if (i == 1) continue; + EXPECT_EQ(0, backends_[i]->backend_service()->request_count()); + } + // The EDS service got a single request, and sent a single response. + EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count()); + EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count()); +} + +// If a locality with higher priority than the current one becomes ready, +// switch to it. +TEST_F(FailoverTest, SwitchBackToHigherPriority) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + const size_t kNumRpcs = 100; + EdsServiceImpl::ResponseArgs args({ + {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1}, + {"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 2}, + {"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 3}, + {"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 0}, + }); + ShutdownBackend(3); + ShutdownBackend(0); + ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0); + WaitForBackend(1, false); + for (size_t i = 0; i < 4; ++i) { + if (i == 1) continue; + EXPECT_EQ(0, backends_[i]->backend_service()->request_count()); + } + StartBackend(0); + WaitForBackend(0); + CheckRpcSendOk(kNumRpcs); + EXPECT_EQ(kNumRpcs, backends_[0]->backend_service()->request_count()); + // The EDS service got a single request, and sent a single response. + EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count()); + EXPECT_EQ(1U, balancers_[0]->eds_service()->response_count()); +} + +// The first update only contains unavailable priorities. The second update +// contains available priorities. +TEST_F(FailoverTest, UpdateInitialUnavailable) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + EdsServiceImpl::ResponseArgs args({ + {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 0}, + {"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 1}, + }); + ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0); + args = EdsServiceImpl::ResponseArgs({ + {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 0}, + {"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 1}, + {"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 2}, + {"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 3}, + }); + ShutdownBackend(0); + ShutdownBackend(1); + ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 1000); + gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(500, GPR_TIMESPAN)); + // Send 0.5 second worth of RPCs. + do { + CheckRpcSendFailure(); + } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0); + WaitForBackend(2, false); + for (size_t i = 0; i < 4; ++i) { + if (i == 2) continue; + EXPECT_EQ(0, backends_[i]->backend_service()->request_count()); + } + // The EDS service got a single request, and sent a single response. + EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count()); + EXPECT_EQ(2U, balancers_[0]->eds_service()->response_count()); +} + +// Tests that after the localities' priorities are updated, we still choose the +// highest READY priority with the updated localities. +TEST_F(FailoverTest, UpdatePriority) { + SetNextResolution({}, kDefaultServiceConfig_.c_str()); + SetNextResolutionForLbChannelAllBalancers(); + const size_t kNumRpcs = 100; + EdsServiceImpl::ResponseArgs args({ + {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 1}, + {"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 2}, + {"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 3}, + {"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 0}, + }); + ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 0); + args = EdsServiceImpl::ResponseArgs({ + {"locality0", GetBackendPorts(0, 1), kDefaultLocalityWeight, 2}, + {"locality1", GetBackendPorts(1, 2), kDefaultLocalityWeight, 0}, + {"locality2", GetBackendPorts(2, 3), kDefaultLocalityWeight, 1}, + {"locality3", GetBackendPorts(3, 4), kDefaultLocalityWeight, 3}, + }); + ScheduleResponseForBalancer(0, EdsServiceImpl::BuildResponse(args), 1000); + WaitForBackend(3, false); + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ(0, backends_[i]->backend_service()->request_count()); + } + WaitForBackend(1); + CheckRpcSendOk(kNumRpcs); + EXPECT_EQ(kNumRpcs, backends_[1]->backend_service()->request_count()); + // The EDS service got a single request, and sent a single response. + EXPECT_EQ(1U, balancers_[0]->eds_service()->request_count()); + EXPECT_EQ(2U, balancers_[0]->eds_service()->response_count()); +} + using DropTest = BasicTest; // Tests that RPCs are dropped according to the drop config. @@ -1760,9 +1910,9 @@ TEST_F(FallbackTest, FallbackModeIsExitedAfterChildRready) { SetNextResolutionForLbChannelAllBalancers(); // The state (TRANSIENT_FAILURE) update from the child policy will be ignored // because we are still in fallback mode. - gpr_timespec deadline = gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(5000, GPR_TIMESPAN)); - // Send 5 seconds worth of RPCs. + gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(500, GPR_TIMESPAN)); + // Send 0.5 second worth of RPCs. do { CheckRpcSendOk(); } while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0);