ring_hash: don't recreate ring when individual subchannel states change (#28338)

pull/28103/head
Mark D. Roth 3 years ago committed by GitHub
parent 90a56a3d15
commit a9a14a461b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 144
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  2. 25
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h

@ -106,6 +106,7 @@ class RingHashLbConfig : public LoadBalancingPolicy::Config {
//
// ring_hash LB policy
//
class RingHash : public LoadBalancingPolicy {
public:
explicit RingHash(Args args);
@ -118,8 +119,9 @@ class RingHash : public LoadBalancingPolicy {
private:
~RingHash() override;
// Forward declaration.
// Forward declarations.
class RingHashSubchannelList;
class Ring;
// Data for a particular subchannel in a subchannel list.
// This subclass adds the following functionality:
@ -136,9 +138,10 @@ class RingHash : public LoadBalancingPolicy {
: SubchannelData(subchannel_list, address, std::move(subchannel)),
address_(address) {}
grpc_connectivity_state connectivity_state() const {
return last_connectivity_state_;
grpc_connectivity_state GetConnectivityState() const {
return connectivity_state_for_picker_.load(std::memory_order_relaxed);
}
const ServerAddress& address() const { return address_; }
bool seen_failure_since_ready() const { return seen_failure_since_ready_; }
@ -156,6 +159,8 @@ class RingHash : public LoadBalancingPolicy {
ServerAddress address_;
grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN;
std::atomic<grpc_connectivity_state> connectivity_state_for_picker_{
GRPC_CHANNEL_IDLE};
bool seen_failure_since_ready_ = false;
};
@ -193,6 +198,9 @@ class RingHash : public LoadBalancingPolicy {
// Transient Failure.
bool UpdateRingHashConnectivityStateLocked();
// Create a new ring from this subchannel list.
RefCountedPtr<Ring> MakeRing();
private:
size_t num_idle_ = 0;
size_t num_ready_ = 0;
@ -200,20 +208,31 @@ class RingHash : public LoadBalancingPolicy {
size_t num_transient_failure_ = 0;
};
class Ring : public RefCounted<Ring> {
public:
struct Entry {
uint64_t hash;
RingHashSubchannelData* subchannel;
};
Ring(RingHash* parent,
RefCountedPtr<RingHashSubchannelList> subchannel_list);
const std::vector<Entry>& ring() const { return ring_; }
private:
RefCountedPtr<RingHashSubchannelList> subchannel_list_;
std::vector<Entry> ring_;
};
class Picker : public SubchannelPicker {
public:
Picker(RefCountedPtr<RingHash> parent,
RingHashSubchannelList* subchannel_list);
Picker(RefCountedPtr<RingHash> parent, RefCountedPtr<Ring> ring)
: parent_(std::move(parent)), ring_(std::move(ring)) {}
PickResult Pick(PickArgs args) override;
private:
struct RingEntry {
uint64_t hash;
RefCountedPtr<SubchannelInterface> subchannel;
grpc_connectivity_state connectivity_state;
};
// A fire-and-forget class that schedules subchannel connection attempts
// on the control plane WorkSerializer.
class SubchannelConnectionAttempter : public Orphanable {
@ -255,9 +274,7 @@ class RingHash : public LoadBalancingPolicy {
};
RefCountedPtr<RingHash> parent_;
// A ring of subchannels.
std::vector<RingEntry> ring_;
RefCountedPtr<Ring> ring_;
};
void ShutdownLocked() override;
@ -269,16 +286,19 @@ class RingHash : public LoadBalancingPolicy {
OrphanablePtr<RingHashSubchannelList> subchannel_list_;
// indicating if we are shutting down.
bool shutdown_ = false;
// Current ring.
RefCountedPtr<Ring> ring_;
};
//
// RingHash::Picker
// RingHash::Ring
//
RingHash::Picker::Picker(RefCountedPtr<RingHash> parent,
RingHashSubchannelList* subchannel_list)
: parent_(std::move(parent)) {
size_t num_subchannels = subchannel_list->num_subchannels();
RingHash::Ring::Ring(RingHash* parent,
RefCountedPtr<RingHashSubchannelList> subchannel_list)
: subchannel_list_(std::move(subchannel_list)) {
size_t num_subchannels = subchannel_list_->num_subchannels();
// Store the weights while finding the sum.
struct AddressWeight {
std::string address;
@ -291,7 +311,7 @@ RingHash::Picker::Picker(RefCountedPtr<RingHash> parent,
size_t sum = 0;
address_weights.reserve(num_subchannels);
for (size_t i = 0; i < num_subchannels; ++i) {
RingHashSubchannelData* sd = subchannel_list->subchannel(i);
RingHashSubchannelData* sd = subchannel_list_->subchannel(i);
const ServerAddressWeightAttribute* weight_attribute = static_cast<
const ServerAddressWeightAttribute*>(sd->address().GetAttribute(
ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
@ -322,8 +342,8 @@ RingHash::Picker::Picker(RefCountedPtr<RingHash> parent,
// weights aren't provided, all hosts should get an equal number of hashes. In
// the case where this number exceeds the max_ring_size, it's scaled back down
// to fit.
const size_t min_ring_size = parent_->config_->min_ring_size();
const size_t max_ring_size = parent_->config_->max_ring_size();
const size_t min_ring_size = parent->config_->min_ring_size();
const size_t max_ring_size = parent->config_->max_ring_size();
const double scale = std::min(
std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight,
static_cast<double>(max_ring_size));
@ -347,17 +367,13 @@ RingHash::Picker::Picker(RefCountedPtr<RingHash> parent,
auto offset_start = hash_key_buffer.end();
target_hashes += scale * address_weights[i].normalized_weight;
size_t count = 0;
auto current_state =
subchannel_list->subchannel(i)->subchannel()->CheckConnectivityState();
while (current_hashes < target_hashes) {
const std::string count_str = absl::StrCat(count);
hash_key_buffer.insert(offset_start, count_str.begin(), count_str.end());
absl::string_view hash_key(hash_key_buffer.data(),
hash_key_buffer.size());
const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0);
ring_.push_back({hash,
subchannel_list->subchannel(i)->subchannel()->Ref(),
current_state});
ring_.push_back({hash, subchannel_list_->subchannel(i)});
++count;
++current_hashes;
hash_key_buffer.erase(offset_start, hash_key_buffer.end());
@ -368,17 +384,21 @@ RingHash::Picker::Picker(RefCountedPtr<RingHash> parent,
std::max(static_cast<uint64_t>(i), max_hashes_per_host);
}
std::sort(ring_.begin(), ring_.end(),
[](const RingEntry& lhs, const RingEntry& rhs) -> bool {
[](const Entry& lhs, const Entry& rhs) -> bool {
return lhs.hash < rhs.hash;
});
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
"[RH %p picker %p] created picker from subchannel_list=%p "
"[RH %p picker %p] created ring from subchannel_list=%p "
"with %" PRIuPTR " ring entries",
parent_.get(), this, subchannel_list, ring_.size());
parent, this, subchannel_list_.get(), ring_.size());
}
}
//
// RingHash::Picker
//
RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
auto hash =
args.call_state->ExperimentalGetCallAttribute(kRequestRingHashAttribute);
@ -387,20 +407,21 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
return PickResult::Fail(
absl::InternalError("xds ring hash value is not a number"));
}
const std::vector<Ring::Entry>& ring = ring_->ring();
// Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c
// (ketama_get_server) NOTE: The algorithm depends on using signed integers
// for lowp, highp, and first_index. Do not change them!
int64_t lowp = 0;
int64_t highp = ring_.size();
int64_t highp = ring.size();
int64_t first_index = 0;
while (true) {
first_index = (lowp + highp) / 2;
if (first_index == static_cast<int64_t>(ring_.size())) {
if (first_index == static_cast<int64_t>(ring.size())) {
first_index = 0;
break;
}
uint64_t midval = ring_[first_index].hash;
uint64_t midval1 = first_index == 0 ? 0 : ring_[first_index - 1].hash;
uint64_t midval = ring[first_index].hash;
uint64_t midval1 = first_index == 0 ? 0 : ring[first_index - 1].hash;
if (h <= midval && h > midval1) {
break;
}
@ -423,35 +444,41 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
}
subchannel_connection_attempter->AddSubchannel(std::move(subchannel));
};
switch (ring_[first_index].connectivity_state) {
switch (ring[first_index].subchannel->GetConnectivityState()) {
case GRPC_CHANNEL_READY:
return PickResult::Complete(ring_[first_index].subchannel);
return PickResult::Complete(
ring[first_index].subchannel->subchannel()->Ref());
case GRPC_CHANNEL_IDLE:
ScheduleSubchannelConnectionAttempt(ring_[first_index].subchannel);
ScheduleSubchannelConnectionAttempt(
ring[first_index].subchannel->subchannel()->Ref());
ABSL_FALLTHROUGH_INTENDED;
case GRPC_CHANNEL_CONNECTING:
return PickResult::Queue();
default: // GRPC_CHANNEL_TRANSIENT_FAILURE
break;
}
ScheduleSubchannelConnectionAttempt(ring_[first_index].subchannel);
ScheduleSubchannelConnectionAttempt(
ring[first_index].subchannel->subchannel()->Ref());
// Loop through remaining subchannels to find one in READY.
// On the way, we make sure the right set of connection attempts
// will happen.
bool found_second_subchannel = false;
bool found_first_non_failed = false;
for (size_t i = 1; i < ring_.size(); ++i) {
const RingEntry& entry = ring_[(first_index + i) % ring_.size()];
if (entry.subchannel == ring_[first_index].subchannel) {
for (size_t i = 1; i < ring.size(); ++i) {
const Ring::Entry& entry = ring[(first_index + i) % ring.size()];
if (entry.subchannel == ring[first_index].subchannel) {
continue;
}
if (entry.connectivity_state == GRPC_CHANNEL_READY) {
return PickResult::Complete(entry.subchannel);
grpc_connectivity_state connectivity_state =
entry.subchannel->GetConnectivityState();
if (connectivity_state == GRPC_CHANNEL_READY) {
return PickResult::Complete(entry.subchannel->subchannel()->Ref());
}
if (!found_second_subchannel) {
switch (entry.connectivity_state) {
switch (connectivity_state) {
case GRPC_CHANNEL_IDLE:
ScheduleSubchannelConnectionAttempt(entry.subchannel);
ScheduleSubchannelConnectionAttempt(
entry.subchannel->subchannel()->Ref());
ABSL_FALLTHROUGH_INTENDED;
case GRPC_CHANNEL_CONNECTING:
return PickResult::Queue();
@ -461,11 +488,13 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
found_second_subchannel = true;
}
if (!found_first_non_failed) {
if (entry.connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
ScheduleSubchannelConnectionAttempt(entry.subchannel);
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
ScheduleSubchannelConnectionAttempt(
entry.subchannel->subchannel()->Ref());
} else {
if (entry.connectivity_state == GRPC_CHANNEL_IDLE) {
ScheduleSubchannelConnectionAttempt(entry.subchannel);
if (connectivity_state == GRPC_CHANNEL_IDLE) {
ScheduleSubchannelConnectionAttempt(
entry.subchannel->subchannel()->Ref());
}
found_first_non_failed = true;
}
@ -498,7 +527,7 @@ void RingHash::RingHashSubchannelList::StartWatchingLocked() {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(),
absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
this));
p->ring_));
}
void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
@ -547,7 +576,7 @@ bool RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked() {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(),
absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
this));
p->ring_));
return false;
}
if (num_connecting_ > 0 && num_transient_failure_ < 2) {
@ -560,7 +589,7 @@ bool RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked() {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, absl::Status(),
absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
this));
p->ring_));
return false;
}
absl::Status status =
@ -571,6 +600,11 @@ bool RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked() {
return true;
}
RefCountedPtr<RingHash::Ring> RingHash::RingHashSubchannelList::MakeRing() {
RingHash* p = static_cast<RingHash*>(policy());
return MakeRefCounted<Ring>(p, Ref(DEBUG_LOCATION, "Ring"));
}
//
// RingHash::RingHashSubchannelData
//
@ -614,6 +648,9 @@ void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) {
RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
GPR_ASSERT(subchannel() != nullptr);
// Update connectivity state used by picker.
connectivity_state_for_picker_.store(connectivity_state,
std::memory_order_relaxed);
// If the new state is TRANSIENT_FAILURE, re-resolve.
// Only do this if we've started watching, not at startup time.
// Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
@ -680,6 +717,7 @@ void RingHash::ShutdownLocked() {
}
shutdown_ = true;
subchannel_list_.reset();
ring_.reset(DEBUG_LOCATION, "RingHash");
}
void RingHash::ResetBackoffLocked() { subchannel_list_->ResetBackoffLocked(); }
@ -723,6 +761,8 @@ void RingHash::UpdateLocked(UpdateArgs args) {
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
} else {
// Build the ring.
ring_ = subchannel_list_->MakeRing();
// Start watching the new list.
subchannel_list_->StartWatchingLocked();
}

@ -29,13 +29,11 @@
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/server_address.h"
// TODO(roth): Should not need the include of subchannel.h here, since
// that implementation should be hidden from the LB policy API.
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_interface.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -175,13 +173,18 @@ class SubchannelData {
template <typename SubchannelListType, typename SubchannelDataType>
class SubchannelList : public InternallyRefCounted<SubchannelListType> {
public:
typedef absl::InlinedVector<SubchannelDataType, 10> SubchannelVector;
// We use ManualConstructor here to support SubchannelDataType classes
// that are not copyable.
typedef absl::InlinedVector<ManualConstructor<SubchannelDataType>, 10>
SubchannelVector;
// The number of subchannels in the list.
size_t num_subchannels() const { return subchannels_.size(); }
// The data for the subchannel at a particular index.
SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
SubchannelDataType* subchannel(size_t index) {
return subchannels_[index].get();
}
// Returns true if the subchannel list is shutting down.
bool shutting_down() const { return shutting_down_; }
@ -386,7 +389,8 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
tracer_->name(), policy_, this, subchannels_.size(),
subchannel.get(), address.ToString().c_str());
}
subchannels_.emplace_back(this, std::move(address), std::move(subchannel));
subchannels_.emplace_back();
subchannels_.back().Init(this, std::move(address), std::move(subchannel));
}
}
@ -396,6 +400,9 @@ SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(),
policy_, this);
}
for (auto& sd : subchannels_) {
sd.Destroy();
}
}
template <typename SubchannelListType, typename SubchannelDataType>
@ -406,8 +413,7 @@ void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
}
GPR_ASSERT(!shutting_down_);
shutting_down_ = true;
for (size_t i = 0; i < subchannels_.size(); i++) {
SubchannelDataType* sd = &subchannels_[i];
for (auto& sd : subchannels_) {
sd->ShutdownLocked();
}
}
@ -415,8 +421,7 @@ void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelList<SubchannelListType,
SubchannelDataType>::ResetBackoffLocked() {
for (size_t i = 0; i < subchannels_.size(); i++) {
SubchannelDataType* sd = &subchannels_[i];
for (auto& sd : subchannels_) {
sd->ResetBackoffLocked();
}
}

Loading…
Cancel
Save