From d48b0d2879b62b02c747fdac814c65fcb3c30de2 Mon Sep 17 00:00:00 2001 From: Moiz Haidry Date: Fri, 19 Apr 2019 10:01:50 -0700 Subject: [PATCH] Created consolidated xds picker that stored and picks from the locality pickers --- .../client_channel/lb_policy/xds/xds.cc | 161 ++++++++++++++++-- 1 file changed, 143 insertions(+), 18 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index dd782ac53c3..000de59448b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -118,6 +118,7 @@ namespace { constexpr char kXds[] = "xds_experimental"; constexpr char kDefaultLocalityName[] = "xds_default_locality"; +constexpr uint32_t kDefaultLocalityWeight = 3; class XdsLb : public LoadBalancingPolicy { public: @@ -259,26 +260,51 @@ class XdsLb : public LoadBalancingPolicy { bool retry_timer_callback_pending_ = false; }; + // Since pickers are UniquePtrs we use this RefCounted wrapper + // to control references to it by the xds picker and the locality + // entry + class PickerRef : public RefCounted { + public: + explicit PickerRef(UniquePtr picker) + : picker_(std::move(picker)) {} + PickResult Pick(PickArgs* pick, grpc_error** error) { + return picker_->Pick(pick, error); + } + + private: + UniquePtr picker_; + }; + + // The picker will use a stateless weighting algorithm to pick the locality to + // use for each request. class Picker : public SubchannelPicker { public: - Picker(UniquePtr child_picker, - RefCountedPtr client_stats) - : child_picker_(std::move(child_picker)), - client_stats_(std::move(client_stats)) {} + // Maintains a weighted list of pickers from each locality that is in ready + // state. The first element in the pair represents the end of a range + // proportional to the locality's weight. The start of the range is the + // previous value in the vector and is 0 for the first element. + using PickerList = + InlinedVector>, 1>; + Picker(RefCountedPtr client_stats, PickerList pickers) + : client_stats_(std::move(client_stats)), + pickers_(std::move(pickers)) {} PickResult Pick(PickArgs* pick, grpc_error** error) override; private: - UniquePtr child_picker_; + // Calls the picker of the locality that the key falls within + PickResult PickFromLocality(const uint32_t key, PickArgs* pick, + grpc_error** error); RefCountedPtr client_stats_; + PickerList pickers_; }; class LocalityMap { public: class LocalityEntry : public InternallyRefCounted { public: - explicit LocalityEntry(RefCountedPtr parent) - : parent_(std::move(parent)) {} + LocalityEntry(RefCountedPtr parent, uint32_t locality_weight) + : parent_(std::move(parent)), locality_weight_(locality_weight) {} ~LocalityEntry() = default; void UpdateLocked(xds_grpclb_serverlist* serverlist, @@ -323,6 +349,9 @@ class XdsLb : public LoadBalancingPolicy { // pending_child_policy_. Mutex child_policy_mu_; RefCountedPtr parent_; + RefCountedPtr picker_ref_; + grpc_connectivity_state connectivity_state_; + uint32_t locality_weight_; }; void UpdateLocked(const LocalityList& locality_list, @@ -346,7 +375,9 @@ class XdsLb : public LoadBalancingPolicy { gpr_free(locality_name); xds_grpclb_destroy_serverlist(serverlist); } + char* locality_name; + uint32_t locality_weight; // The deserialized response from the balancer. May be nullptr until one // such response has arrived. xds_grpclb_serverlist* serverlist; @@ -412,6 +443,8 @@ class XdsLb : public LoadBalancingPolicy { RefCountedPtr child_policy_config_; // Map of policies to use in the backend LocalityMap locality_map_; + // TODO(mhaidry) : Add support for multiple maps of localities + // with different priorities LocalityList locality_serverlist_; // TODO(mhaidry) : Add a pending locality map that may be swapped with the // the current one when new localities in the pending map are ready @@ -424,8 +457,12 @@ class XdsLb : public LoadBalancingPolicy { XdsLb::PickResult XdsLb::Picker::Pick(PickArgs* pick, grpc_error** error) { // TODO(roth): Add support for drop handling. - // Forward pick to child policy. - PickResult result = child_picker_->Pick(pick, error); + // Generate a random number between 0 and the total weight + const uint32_t key = + (rand() * pickers_[pickers_.size() - 1].first) / RAND_MAX; + // Forward pick to whichever locality maps to the range in which the + // random number falls in. + PickResult result = PickFromLocality(key, pick, error); // If pick succeeded, add client stats. if (result == PickResult::PICK_COMPLETE && pick->connected_subchannel != nullptr && client_stats_ != nullptr) { @@ -434,6 +471,29 @@ XdsLb::PickResult XdsLb::Picker::Pick(PickArgs* pick, grpc_error** error) { return result; } +XdsLb::PickResult XdsLb::Picker::PickFromLocality(const uint32_t key, + PickArgs* pick, + grpc_error** error) { + size_t mid = 0; + size_t start_index = 0; + size_t end_index = pickers_.size() - 1; + size_t index = 0; + while (end_index > start_index) { + mid = (start_index + end_index) / 2; + if (pickers_[mid].first > key) { + end_index = mid; + } else if (pickers_[mid].first < key) { + start_index = mid + 1; + } else { + index = mid + 1; + break; + } + } + if (index == 0) index = start_index; + GPR_ASSERT(pickers_[index].first > key); + return pickers_[index].second->Pick(pick, error); +} + // // serverlist parsing code // @@ -935,6 +995,8 @@ void XdsLb::BalancerChannelState::BalancerCallState:: MakeUnique()); xdslb_policy->locality_serverlist_[0]->locality_name = static_cast(gpr_strdup(kDefaultLocalityName)); + xdslb_policy->locality_serverlist_[0]->locality_weight = + kDefaultLocalityWeight; } // and update the copy in the XdsLb instance. This // serverlist instance will be destroyed either upon the next @@ -1316,8 +1378,8 @@ void XdsLb::LocalityMap::UpdateLocked( gpr_strdup(locality_serverlist[i]->locality_name)); auto iter = map_.find(locality_name); if (iter == map_.end()) { - OrphanablePtr new_entry = - MakeOrphanable(parent->Ref()); + OrphanablePtr new_entry = MakeOrphanable( + parent->Ref(), locality_serverlist[i]->locality_weight); MutexLock lock(&child_refs_mu_); iter = map_.emplace(std::move(locality_name), std::move(new_entry)).first; } @@ -1335,8 +1397,8 @@ void grpc_core::XdsLb::LocalityMap::ShutdownLocked() { } void grpc_core::XdsLb::LocalityMap::ResetBackoffLocked() { - for (auto iter = map_.begin(); iter != map_.end(); iter++) { - iter->second->ResetBackoffLocked(); + for (auto& p : map_) { + p.second->ResetBackoffLocked(); } } @@ -1344,8 +1406,8 @@ void grpc_core::XdsLb::LocalityMap::FillChildRefsForChannelz( channelz::ChildRefsList* child_subchannels, channelz::ChildRefsList* child_channels) { MutexLock lock(&child_refs_mu_); - for (auto iter = map_.begin(); iter != map_.end(); iter++) { - iter->second->FillChildRefsForChannelz(child_subchannels, child_channels); + for (auto& p : map_) { + p.second->FillChildRefsForChannelz(child_subchannels, child_channels); } } @@ -1617,9 +1679,72 @@ void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState( entry_->parent_->lb_chand_->lb_calld() == nullptr ? nullptr : entry_->parent_->lb_chand_->lb_calld()->client_stats(); - entry_->parent_->channel_control_helper()->UpdateState( - state, UniquePtr( - New(std::move(picker), std::move(client_stats)))); + // Cache the picker and its state in the entry + entry_->picker_ref_ = MakeRefCounted(std::move(picker)); + entry_->connectivity_state_ = state; + // Construct a new xds picker which maintains a map of all locality pickers + // that are ready. Each locality is represented by a portion of the range + // proportional to its weight, such that the total range is the sum of the + // weights of all localities + uint32_t end = 0; + size_t num_connecting = 0; + size_t num_idle = 0; + size_t num_transient_failures = 0; + auto& locality_map = this->entry_->parent_->locality_map_.map_; + Picker::PickerList pickers; + for (auto& p : locality_map) { + const LocalityEntry* entry = p.second.get(); + grpc_connectivity_state connectivity_state = entry->connectivity_state_; + switch (connectivity_state) { + case GRPC_CHANNEL_READY: { + end += entry->locality_weight_; + pickers.push_back(MakePair(end, entry->picker_ref_)); + break; + } + case GRPC_CHANNEL_CONNECTING: { + num_connecting++; + break; + } + case GRPC_CHANNEL_IDLE: { + num_idle++; + break; + } + case GRPC_CHANNEL_TRANSIENT_FAILURE: { + num_transient_failures++; + break; + } + default: { + gpr_log(GPR_ERROR, "Invalid locality connectivity state - %d", + connectivity_state); + } + } + } + // Pass on the constructed xds picker if it has any ready pickers in their map + // otherwise pass a QueuePicker if any of the locality pickers are in a + // connecting or idle state, finally return a transient failure picker if all + // locality pickers are in transient failure + if (pickers.size() > 0) { + entry_->parent_->channel_control_helper()->UpdateState( + GRPC_CHANNEL_READY, + UniquePtr( + New(std::move(client_stats), std::move(pickers)))); + } else if (num_connecting > 0) { + entry_->parent_->channel_control_helper()->UpdateState( + GRPC_CHANNEL_CONNECTING, + UniquePtr(New(this->entry_->parent_))); + } else if (num_idle > 0) { + entry_->parent_->channel_control_helper()->UpdateState( + GRPC_CHANNEL_IDLE, + UniquePtr(New(this->entry_->parent_))); + } else { + GPR_ASSERT(num_transient_failures == locality_map.size()); + grpc_error* error = + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "connections to all localities failing"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); + entry_->parent_->channel_control_helper()->UpdateState( + state, UniquePtr(New(error))); + } } void XdsLb::LocalityMap::LocalityEntry::Helper::RequestReresolution() {