ring_hash LB policy (#25697)

* Ring Hash Policy implementation

* Code review comment fixing

* Fixing code review comments.

* Code review comment fixing

* Fixing reconnect logic

* adding helper method for pick

* Holding on to ref to parent

* first attempt at calling AttemptToConnect

* Fixing state change

* Fixing code review comments

* Fixing the reconnect from channel watcher code

* Fixing the BUILD to include new policy

* Fixing major code review suggestion

* Fixing code review comments

* Fixing code review suggestions

* Initial 2 tests.

* Adding channel id case.

* Fixing code review comments.

* Small change to get the spread of backends

* Add header hashing tests

* Added more tests and debugging

* Fixing Header hash

* Added more tests

* cleanup

* removing debugs

* Fixing code review comments.

* code review fixing

* combining code and match design

* fixing code review comments.

* Fixed IDLE case

* Moving tests

* Fixing code review comments

* Adding more tests according to code review comments.

* Added tests with differetn types of weights

* Adding terminal policy case

* Remove hash_func as there is only 1

* Added nack invalid hash function

* Added NACK cases

* fixing build error

* fixing build

* small warning

* adding regex test

* Adding policy tests

* fixing warning

* fixing warning

* fixing code reivew comments.

* fixing IDLE case

* Code review comments.

* fixing code review comments

* Making a helper function

* fixing reattempt case

* Added afew more tests.

* Adding more tests

* Added backward compatible test

* FIxing the reattempt test

* Clean up

* fixing clang error

* fixing clang error

* Fix logic discovered during code review

* code review comments

* code review comments

* code review comment

* clean up tests

* fixing code review comments

* clean up tests

* Separated test

* Fixing test

* fixing test

* fixing clang error

* Addressing code review suggestions

* Fixing last bit of code review comments

* Fixing flaky tests

* Fixing last bit of code review comments

* clean debugs

* Remove a verbose log
pull/26238/head
donnadionne 4 years ago committed by GitHub
parent 98ccb7fd94
commit 2aefb26f5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      BUILD
  2. 1
      CMakeLists.txt
  3. 2
      Makefile
  4. 3
      build_autogenerated.yaml
  5. 1
      doc/environment_variables.md
  6. 1
      grpc.gyp
  7. 759
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  8. 10
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h
  9. 13
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  10. 78
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  11. 26
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  12. 186
      src/core/ext/xds/xds_api.cc
  13. 2
      src/core/ext/xds/xds_api.h
  14. 4
      src/core/plugin_registry/grpc_plugin_registry.cc
  15. 4
      src/core/plugin_registry/grpc_unsecure_plugin_registry.cc
  16. 38
      src/proto/grpc/testing/xds/v3/cluster.proto
  17. 11
      src/proto/grpc/testing/xds/v3/endpoint.proto
  18. 5
      src/proto/grpc/testing/xds/v3/regex.proto
  19. 72
      src/proto/grpc/testing/xds/v3/route.proto
  20. 928
      test/cpp/end2end/xds_end2end_test.cc

@ -1105,6 +1105,7 @@ grpc_cc_library(
"grpc_client_authority_filter",
"grpc_lb_policy_pick_first",
"grpc_lb_policy_priority",
"grpc_lb_policy_ring_hash",
"grpc_lb_policy_round_robin",
"grpc_lb_policy_weighted_target",
"grpc_client_idle_filter",
@ -1544,6 +1545,7 @@ grpc_cc_library(
"grpc_base",
"grpc_client_channel",
"grpc_lb_address_filtering",
"grpc_lb_policy_ring_hash",
"grpc_lb_xds_channel_args",
"grpc_lb_xds_common",
"grpc_resolver_fake",
@ -1636,6 +1638,10 @@ grpc_cc_library(
hdrs = [
"src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h",
],
external_deps = [
"absl/strings",
"xxhash",
],
language = "c++",
deps = [
"grpc_base",

@ -2326,6 +2326,7 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
src/core/ext/filters/client_channel/lb_policy_registry.cc

@ -1735,6 +1735,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc \
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
src/core/ext/filters/client_channel/lb_policy/priority/priority.cc \
src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \
src/core/ext/filters/client_channel/lb_policy_registry.cc \
@ -2669,7 +2670,6 @@ ifneq ($(OPENSSL_DEP),)
# installing headers to their final destination on the drive. We need this
# otherwise parallel compilation will fail if a source is compiled first.
src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/cds.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc: $(OPENSSL_DEP)
src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP)

@ -1618,6 +1618,7 @@ libs:
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
- src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
- src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h
- src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
- src/core/ext/filters/client_channel/lb_policy_factory.h
- src/core/ext/filters/client_channel/lb_policy_registry.h
@ -1828,6 +1829,7 @@ libs:
- src/core/lib/transport/transport.h
- src/core/lib/transport/transport_impl.h
- src/core/lib/uri/uri_parser.h
- third_party/xxhash/xxhash.h
src:
- src/core/ext/filters/census/grpc_context.cc
- src/core/ext/filters/client_channel/backend_metric.cc
@ -1854,6 +1856,7 @@ libs:
- src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
- src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
- src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
- src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
- src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
- src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
- src/core/ext/filters/client_channel/lb_policy_registry.cc

@ -75,6 +75,7 @@ some configuration as environment variables that can be set.
in DEBUG)
- priority_lb - traces priority LB policy
- resource_quota - trace resource quota objects internals
- ring_hash_lb - traces the ring hash load balancing policy
- round_robin - traces the round_robin load balancing policy
- queue_pluck
- server_channel - lightweight trace of significant server channel events

@ -1122,6 +1122,7 @@
'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
'src/core/ext/filters/client_channel/lb_policy/priority/priority.cc',
'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc',
'src/core/ext/filters/client_channel/lb_policy_registry.cc',

@ -16,8 +16,767 @@
#include <grpc/support/port_platform.h>
#include <stdlib.h>
#include <string.h>
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#define XXH_INLINE_ALL
#include "xxhash.h"
#include <grpc/support/alloc.h>
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/static_metadata.h"
namespace grpc_core {
const char* kRequestRingHashAttribute = "request_ring_hash";
TraceFlag grpc_lb_ring_hash_trace(false, "ring_hash_lb");
// Helper Parser method
void ParseRingHashLbConfig(const Json& json, size_t* min_ring_size,
size_t* max_ring_size,
std::vector<grpc_error_handle>* error_list) {
*min_ring_size = 1024;
*max_ring_size = 8388608;
if (json.type() != Json::Type::OBJECT) {
error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"ring_hash_experimental should be of type object"));
return;
}
const Json::Object& ring_hash = json.object_value();
auto ring_hash_it = ring_hash.find("min_ring_size");
if (ring_hash_it != ring_hash.end()) {
if (ring_hash_it->second.type() != Json::Type::NUMBER) {
error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:min_ring_size error: should be of type number"));
} else {
*min_ring_size = gpr_parse_nonnegative_int(
ring_hash_it->second.string_value().c_str());
}
}
ring_hash_it = ring_hash.find("max_ring_size");
if (ring_hash_it != ring_hash.end()) {
if (ring_hash_it->second.type() != Json::Type::NUMBER) {
error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:max_ring_size error: should be of type number"));
} else {
*max_ring_size = gpr_parse_nonnegative_int(
ring_hash_it->second.string_value().c_str());
}
}
if (*min_ring_size == 0 || *min_ring_size > 8388608 || *max_ring_size == 0 ||
*max_ring_size > 8388608 || *min_ring_size > *max_ring_size) {
error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:max_ring_size and or min_ring_size error: "
"values need to be in the range of 1 to 8388608 "
"and max_ring_size cannot be smaller than "
"min_ring_size"));
}
}
namespace {
constexpr char kRingHash[] = "ring_hash_experimental";
class RingHashLbConfig : public LoadBalancingPolicy::Config {
public:
RingHashLbConfig(size_t min_ring_size, size_t max_ring_size)
: min_ring_size_(min_ring_size), max_ring_size_(max_ring_size) {}
const char* name() const override { return kRingHash; }
size_t min_ring_size() const { return min_ring_size_; }
size_t max_ring_size() const { return max_ring_size_; }
private:
size_t min_ring_size_;
size_t max_ring_size_;
};
//
// ring_hash LB policy
//
class RingHash : public LoadBalancingPolicy {
public:
explicit RingHash(Args args);
const char* name() const override { return kRingHash; }
void UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
private:
~RingHash() override;
// Forward declaration.
class RingHashSubchannelList;
// Data for a particular subchannel in a subchannel list.
// This subclass adds the following functionality:
// - Tracks the previous connectivity state of the subchannel, so that
// we know how many subchannels are in each state.
class RingHashSubchannelData
: public SubchannelData<RingHashSubchannelList, RingHashSubchannelData> {
public:
RingHashSubchannelData(
SubchannelList<RingHashSubchannelList, RingHashSubchannelData>*
subchannel_list,
const ServerAddress& address,
RefCountedPtr<SubchannelInterface> subchannel)
: SubchannelData(subchannel_list, address, std::move(subchannel)),
address_(address) {}
grpc_connectivity_state connectivity_state() const {
return last_connectivity_state_;
}
const ServerAddress& address() const { return address_; }
bool seen_failure_since_ready() const { return seen_failure_since_ready_; }
// Performs connectivity state updates that need to be done both when we
// first start watching and when a watcher notification is received.
void UpdateConnectivityStateLocked(
grpc_connectivity_state connectivity_state);
private:
// Performs connectivity state updates that need to be done only
// after we have started watching.
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) override;
ServerAddress address_;
grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN;
bool seen_failure_since_ready_ = false;
};
// A list of subchannels.
class RingHashSubchannelList
: public SubchannelList<RingHashSubchannelList, RingHashSubchannelData> {
public:
RingHashSubchannelList(RingHash* policy, TraceFlag* tracer,
ServerAddressList addresses,
const grpc_channel_args& args)
: SubchannelList(policy, tracer, std::move(addresses),
policy->channel_control_helper(), args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
}
~RingHashSubchannelList() override {
RingHash* p = static_cast<RingHash*>(policy());
p->Unref(DEBUG_LOCATION, "subchannel_list");
}
// Starts watching the subchannels in this list.
void StartWatchingLocked();
// Updates the counters of subchannels in each state when a
// subchannel transitions from old_state to new_state.
void UpdateStateCountersLocked(grpc_connectivity_state old_state,
grpc_connectivity_state new_state);
// Updates the RH policy's connectivity state based on the
// subchannel list's state counters, creating new picker and new ring.
// Furthermore, return a bool indicating whether the aggregated state is
// Transient Failure.
bool UpdateRingHashConnectivityStateLocked();
private:
size_t num_idle_ = 0;
size_t num_ready_ = 0;
size_t num_connecting_ = 0;
size_t num_transient_failure_ = 0;
};
class Picker : public SubchannelPicker {
public:
Picker(RefCountedPtr<RingHash> parent,
RingHashSubchannelList* subchannel_list);
PickResult Pick(PickArgs args) override;
private:
struct RingEntry {
uint64_t hash;
RefCountedPtr<SubchannelInterface> subchannel;
grpc_connectivity_state connectivity_state;
};
// A fire-and-forget class that schedules subchannel connection attempts
// on the control plane WorkSerializer.
class SubchannelConnectionAttempter : public Orphanable {
public:
explicit SubchannelConnectionAttempter(
RefCountedPtr<RingHash> ring_hash_lb)
: ring_hash_lb_(std::move(ring_hash_lb)) {
GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
}
void AddSubchannel(RefCountedPtr<SubchannelInterface> subchannel) {
subchannels_.push_back(std::move(subchannel));
}
void Orphan() override {
// Hop into ExecCtx, so that we're not holding the data plane mutex
// while we run control-plane code.
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
private:
static void RunInExecCtx(void* arg, grpc_error* /*error*/) {
auto* self = static_cast<SubchannelConnectionAttempter*>(arg);
self->ring_hash_lb_->work_serializer()->Run(
[self]() {
if (!self->ring_hash_lb_->shutdown_) {
for (auto& subchannel : self->subchannels_) {
subchannel->AttemptToConnect();
}
}
delete self;
},
DEBUG_LOCATION);
}
RefCountedPtr<RingHash> ring_hash_lb_;
grpc_closure closure_;
absl::InlinedVector<RefCountedPtr<SubchannelInterface>, 10> subchannels_;
};
RefCountedPtr<RingHash> parent_;
// A ring of subchannels.
std::vector<RingEntry> ring_;
};
void ShutdownLocked() override;
// Current config from resolver.
RefCountedPtr<RingHashLbConfig> config_;
// list of subchannels.
OrphanablePtr<RingHashSubchannelList> subchannel_list_;
// indicating if we are shutting down.
bool shutdown_ = false;
};
//
// RingHash::Picker
//
RingHash::Picker::Picker(RefCountedPtr<RingHash> parent,
RingHashSubchannelList* subchannel_list)
: parent_(std::move(parent)) {
size_t num_subchannels = subchannel_list->num_subchannels();
// Store the weights while finding the sum.
struct AddressWeight {
std::string address;
// Default weight is 1 for the cases where a weight is not provided,
// each occurrence of the address will be counted a weight value of 1.
uint32_t weight = 1;
double normalized_weight;
};
std::vector<AddressWeight> address_weights;
size_t sum = 0;
address_weights.reserve(num_subchannels);
for (size_t i = 0; i < num_subchannels; ++i) {
RingHashSubchannelData* sd = subchannel_list->subchannel(i);
const ServerAddressWeightAttribute* weight_attribute = static_cast<
const ServerAddressWeightAttribute*>(sd->address().GetAttribute(
ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
AddressWeight address_weight;
address_weight.address =
grpc_sockaddr_to_string(&sd->address().address(), false);
if (weight_attribute != nullptr) {
GPR_ASSERT(weight_attribute->weight() != 0);
address_weight.weight = weight_attribute->weight();
}
sum += address_weight.weight;
address_weights.push_back(std::move(address_weight));
}
// Calculating normalized weights and find min and max.
double min_normalized_weight = 1.0;
double max_normalized_weight = 0.0;
for (auto& address : address_weights) {
address.normalized_weight = static_cast<double>(address.weight) / sum;
min_normalized_weight =
std::min(address.normalized_weight, min_normalized_weight);
max_normalized_weight =
std::max(address.normalized_weight, max_normalized_weight);
}
// Scale up the number of hashes per host such that the least-weighted host
// gets a whole number of hashes on the ring. Other hosts might not end up
// with whole numbers, and that's fine (the ring-building algorithm below can
// handle this). This preserves the original implementation's behavior: when
// weights aren't provided, all hosts should get an equal number of hashes. In
// the case where this number exceeds the max_ring_size, it's scaled back down
// to fit.
const size_t min_ring_size = parent_->config_->min_ring_size();
const size_t max_ring_size = parent_->config_->max_ring_size();
const double scale = std::min(
std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight,
static_cast<double>(max_ring_size));
// Reserve memory for the entire ring up front.
const uint64_t ring_size = std::ceil(scale);
ring_.reserve(ring_size);
// Populate the hash ring by walking through the (host, weight) pairs in
// normalized_host_weights, and generating (scale * weight) hashes for each
// host. Since these aren't necessarily whole numbers, we maintain running
// sums -- current_hashes and target_hashes -- which allows us to populate the
// ring in a mostly stable way.
absl::InlinedVector<char, 196> hash_key_buffer;
double current_hashes = 0.0;
double target_hashes = 0.0;
uint64_t min_hashes_per_host = ring_size;
uint64_t max_hashes_per_host = 0;
for (size_t i = 0; i < num_subchannels; ++i) {
const std::string& address_string = address_weights[i].address;
hash_key_buffer.assign(address_string.begin(), address_string.end());
hash_key_buffer.emplace_back('_');
auto offset_start = hash_key_buffer.end();
target_hashes += scale * address_weights[i].normalized_weight;
size_t count = 0;
auto current_state =
subchannel_list->subchannel(i)->subchannel()->CheckConnectivityState();
while (current_hashes < target_hashes) {
const std::string count_str = absl::StrCat(count);
hash_key_buffer.insert(offset_start, count_str.begin(), count_str.end());
absl::string_view hash_key(hash_key_buffer.data(),
hash_key_buffer.size());
const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0);
ring_.push_back({hash,
subchannel_list->subchannel(i)->subchannel()->Ref(),
current_state});
++count;
++current_hashes;
hash_key_buffer.erase(offset_start, hash_key_buffer.end());
}
min_hashes_per_host =
std::min(static_cast<uint64_t>(i), min_hashes_per_host);
max_hashes_per_host =
std::max(static_cast<uint64_t>(i), max_hashes_per_host);
}
std::sort(ring_.begin(), ring_.end(),
[](const RingEntry& lhs, const RingEntry& rhs) -> bool {
return lhs.hash < rhs.hash;
});
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
"[RH %p picker %p] created picker from subchannel_list=%p "
"with %" PRIuPTR " ring entries",
parent_.get(), this, subchannel_list, ring_.size());
// for (const auto& r : ring_) {
// gpr_log(GPR_INFO, "donn ring hash: %" PRIx64 " subchannel: %p state: %d",
// r.hash, r.subchannel.get(), r.connectivity_state);
//}
}
}
RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
PickResult result;
// Initialize to PICK_FAILED.
result.type = PickResult::PICK_FAILED;
auto hash =
args.call_state->ExperimentalGetCallAttribute(kRequestRingHashAttribute);
uint64_t h;
if (!absl::SimpleAtoi(hash, &h)) {
result.error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("xds ring hash value is not a number").c_str()),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
return result;
}
// Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c
// (ketama_get_server) NOTE: The algorithm depends on using signed integers
// for lowp, highp, and first_index. Do not change them!
int64_t lowp = 0;
int64_t highp = ring_.size();
int64_t first_index = 0;
while (true) {
first_index = (lowp + highp) / 2;
if (first_index == static_cast<int64_t>(ring_.size())) {
first_index = 0;
break;
}
uint64_t midval = ring_[first_index].hash;
uint64_t midval1 = first_index == 0 ? 0 : ring_[first_index - 1].hash;
if (h <= midval && h > midval1) {
break;
}
if (midval < h) {
lowp = first_index + 1;
} else {
highp = first_index - 1;
}
if (lowp > highp) {
first_index = 0;
break;
}
}
OrphanablePtr<SubchannelConnectionAttempter> subchannel_connection_attempter;
auto ScheduleSubchannelConnectionAttempt =
[&](RefCountedPtr<SubchannelInterface> subchannel) {
if (subchannel_connection_attempter == nullptr) {
subchannel_connection_attempter =
MakeOrphanable<SubchannelConnectionAttempter>(parent_);
}
subchannel_connection_attempter->AddSubchannel(std::move(subchannel));
};
switch (ring_[first_index].connectivity_state) {
case GRPC_CHANNEL_READY:
result.type = PickResult::PICK_COMPLETE;
result.subchannel = ring_[first_index].subchannel;
return result;
case GRPC_CHANNEL_IDLE:
ScheduleSubchannelConnectionAttempt(ring_[first_index].subchannel);
// fallthrough
case GRPC_CHANNEL_CONNECTING:
result.type = PickResult::PICK_QUEUE;
return result;
default: // GRPC_CHANNEL_TRANSIENT_FAILURE
break;
}
ScheduleSubchannelConnectionAttempt(ring_[first_index].subchannel);
// Loop through remaining subchannels to find one in READY.
// On the way, we make sure the right set of connection attempts
// will happen.
bool found_second_subchannel = false;
bool found_first_non_failed = false;
for (size_t i = 1; i < ring_.size(); ++i) {
const RingEntry& entry = ring_[(first_index + i) % ring_.size()];
if (entry.subchannel == ring_[first_index].subchannel) {
continue;
}
if (entry.connectivity_state == GRPC_CHANNEL_READY) {
result.type = PickResult::PICK_COMPLETE;
result.subchannel = entry.subchannel;
return result;
}
if (!found_second_subchannel) {
switch (entry.connectivity_state) {
case GRPC_CHANNEL_IDLE:
ScheduleSubchannelConnectionAttempt(entry.subchannel);
// fallthrough
case GRPC_CHANNEL_CONNECTING:
result.type = PickResult::PICK_QUEUE;
return result;
default:
break;
}
found_second_subchannel = true;
}
if (!found_first_non_failed) {
if (entry.connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
ScheduleSubchannelConnectionAttempt(entry.subchannel);
} else {
if (entry.connectivity_state == GRPC_CHANNEL_IDLE) {
ScheduleSubchannelConnectionAttempt(entry.subchannel);
}
found_first_non_failed = true;
}
}
}
result.error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("xds ring hash found a subchannel "
"that is in TRANSIENT_FAILURE state")
.c_str()),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
return result;
}
//
// RingHash::RingHashSubchannelList
//
void RingHash::RingHashSubchannelList::StartWatchingLocked() {
if (num_subchannels() == 0) return;
// Check current state of each subchannel synchronously.
for (size_t i = 0; i < num_subchannels(); ++i) {
grpc_connectivity_state state =
subchannel(i)->CheckConnectivityStateLocked();
subchannel(i)->UpdateConnectivityStateLocked(state);
}
// Start connectivity watch for each subchannel.
for (size_t i = 0; i < num_subchannels(); i++) {
if (subchannel(i)->subchannel() != nullptr) {
subchannel(i)->StartConnectivityWatchLocked();
}
}
RingHash* p = static_cast<RingHash*>(policy());
// Sending up the initial picker while all subchannels are in IDLE state.
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(),
absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
this));
}
void RingHash::RingHashSubchannelList::UpdateStateCountersLocked(
grpc_connectivity_state old_state, grpc_connectivity_state new_state) {
GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
if (old_state == GRPC_CHANNEL_IDLE) {
GPR_ASSERT(num_idle_ > 0);
--num_idle_;
} else if (old_state == GRPC_CHANNEL_READY) {
GPR_ASSERT(num_ready_ > 0);
--num_ready_;
} else if (old_state == GRPC_CHANNEL_CONNECTING) {
GPR_ASSERT(num_connecting_ > 0);
--num_connecting_;
} else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
GPR_ASSERT(num_transient_failure_ > 0);
--num_transient_failure_;
}
if (new_state == GRPC_CHANNEL_IDLE) {
++num_idle_;
} else if (new_state == GRPC_CHANNEL_READY) {
++num_ready_;
} else if (new_state == GRPC_CHANNEL_CONNECTING) {
++num_connecting_;
} else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
++num_transient_failure_;
}
}
// Sets the RH policy's connectivity state and generates a new picker based
// on the current subchannel list or requests an re-attempt by returning true..
bool RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked() {
RingHash* p = static_cast<RingHash*>(policy());
// Only set connectivity state if this is the current subchannel list.
if (p->subchannel_list_.get() != this) return false;
// The overall aggregation rules here are:
// 1. If there is at least one subchannel in READY state, report READY.
// 2. If there are 2 or more subchannels in TRANSIENT_FAILURE state, report
// TRANSIENT_FAILURE.
// 3. If there is at least one subchannel in CONNECTING state, report
// CONNECTING.
// 4. If there is at least one subchannel in IDLE state, report IDLE.
// 5. Otherwise, report TRANSIENT_FAILURE.
if (num_ready_ > 0) {
/* READY */
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_READY, absl::Status(),
absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
this));
return false;
}
if (num_connecting_ > 0 && num_transient_failure_ < 2) {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
absl::make_unique<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
return false;
}
if (num_idle_ > 0 && num_transient_failure_ < 2) {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE, absl::Status(),
absl::make_unique<Picker>(p->Ref(DEBUG_LOCATION, "RingHashPicker"),
this));
return false;
}
grpc_error* error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"connections to backend failing or idle"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
return true;
}
//
// RingHash::RingHashSubchannelData
//
void RingHash::RingHashSubchannelData::UpdateConnectivityStateLocked(
grpc_connectivity_state connectivity_state) {
RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(
GPR_INFO,
"[RR %p] connectivity changed for subchannel %p, subchannel_list %p "
"(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
p, subchannel(), subchannel_list(), Index(),
subchannel_list()->num_subchannels(),
ConnectivityStateName(last_connectivity_state_),
ConnectivityStateName(connectivity_state));
}
// Decide what state to report for aggregation purposes.
// If we haven't seen a failure since the last time we were in state
// READY, then we report the state change as-is. However, once we do see
// a failure, we report TRANSIENT_FAILURE and do not report any subsequent
// state changes until we go back into state READY.
if (!seen_failure_since_ready_) {
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
seen_failure_since_ready_ = true;
}
subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
connectivity_state);
} else {
if (connectivity_state == GRPC_CHANNEL_READY) {
seen_failure_since_ready_ = false;
subchannel_list()->UpdateStateCountersLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE, connectivity_state);
}
}
// Record last seen connectivity state.
last_connectivity_state_ = connectivity_state;
}
void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) {
RingHash* p = static_cast<RingHash*>(subchannel_list()->policy());
GPR_ASSERT(subchannel() != nullptr);
// If the new state is TRANSIENT_FAILURE, re-resolve.
// Only do this if we've started watching, not at startup time.
// Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
// when the subchannel list was created, we'd wind up in a constant
// loop of re-resolution.
// Also attempt to reconnect.
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
"[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
"Requesting re-resolution",
p, subchannel());
}
p->channel_control_helper()->RequestReresolution();
}
// Update state counters.
UpdateConnectivityStateLocked(connectivity_state);
// Update the RH policy's connectivity state, creating new picker and new
// ring.
bool transient_failure =
subchannel_list()->UpdateRingHashConnectivityStateLocked();
// While the ring_hash policy is reporting TRANSIENT_FAILURE, it will
// not be getting any pick requests from the priority policy.
// However, because the ring_hash policy does not attempt to
// reconnect to subchannels unless it is getting pick requests,
// it will need special handling to ensure that it will eventually
// recover from TRANSIENT_FAILURE state once the problem is resolved.
// Specifically, it will make sure that it is attempting to connect to
// at least one subchannel at any given time. After a given subchannel
// fails a connection attempt, it will move on to the next subchannel
// in the ring. It will keep doing this until one of the subchannels
// successfully connects, at which point it will report READY and stop
// proactively trying to connect. The policy will remain in
// TRANSIENT_FAILURE until at least one subchannel becomes connected,
// even if subchannels are in state CONNECTING during that time.
if (transient_failure &&
connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
size_t next_index = (Index() + 1) % subchannel_list()->num_subchannels();
RingHashSubchannelData* next_sd = subchannel_list()->subchannel(next_index);
next_sd->subchannel()->AttemptToConnect();
}
}
//
// RingHash
//
RingHash::RingHash(Args args) : LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] Created", this);
}
}
RingHash::~RingHash() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] Destroying Ring Hash policy", this);
}
GPR_ASSERT(subchannel_list_ == nullptr);
}
void RingHash::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] Shutting down", this);
}
shutdown_ = true;
subchannel_list_.reset();
}
void RingHash::ResetBackoffLocked() { subchannel_list_->ResetBackoffLocked(); }
void RingHash::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
this, args.addresses.size());
}
config_ = std::move(args.config);
// Filter out any address with weight 0.
ServerAddressList addresses;
addresses.reserve(args.addresses.size());
for (ServerAddress& address : args.addresses) {
const ServerAddressWeightAttribute* weight_attribute =
static_cast<const ServerAddressWeightAttribute*>(address.GetAttribute(
ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
if (weight_attribute == nullptr || weight_attribute->weight() > 0) {
addresses.push_back(std::move(address));
}
}
subchannel_list_ = MakeOrphanable<RingHashSubchannelList>(
this, &grpc_lb_ring_hash_trace, std::move(addresses), *args.args);
if (subchannel_list_->num_subchannels() == 0) {
// If the new list is empty, immediately transition to TRANSIENT_FAILURE.
grpc_error* error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
} else {
// Start watching the new list.
subchannel_list_->StartWatchingLocked();
}
}
//
// factory
//
class RingHashFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<RingHash>(std::move(args));
}
const char* name() const override { return kRingHash; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const Json& json, grpc_error** error) const override {
size_t min_ring_size;
size_t max_ring_size;
std::vector<grpc_error_handle> error_list;
ParseRingHashLbConfig(json, &min_ring_size, &max_ring_size, &error_list);
if (error_list.empty()) {
return MakeRefCounted<RingHashLbConfig>(min_ring_size, max_ring_size);
} else {
*error = GRPC_ERROR_CREATE_FROM_VECTOR(
"ring_hash_experimental LB policy config", &error_list);
return nullptr;
}
}
};
} // namespace
void GrpcLbPolicyRingHashInit() {
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
absl::make_unique<grpc_core::RingHashFactory>());
}
void GrpcLbPolicyRingHashShutdown() {}
} // namespace grpc_core

@ -19,9 +19,19 @@
#include <grpc/support/port_platform.h>
#include <stdlib.h>
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/json/json.h"
namespace grpc_core {
extern const char* kRequestRingHashAttribute;
// Helper Parsing method to parse ring hash policy configs; for example, ring
// hash size validity.
void ParseRingHashLbConfig(const Json& json, size_t* min_ring_size,
size_t* max_ring_size,
std::vector<grpc_error_handle>* error_list);
} // namespace grpc_core
#endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_RING_HASH_RING_HASH_H

@ -452,22 +452,9 @@ void CdsLb::OnClusterChanged(const std::string& name,
// Construct config for child policy.
Json::Object xds_lb_policy;
if (cluster_data.lb_policy == "RING_HASH") {
std::string hash_function;
switch (cluster_data.hash_function) {
case XdsApi::CdsUpdate::HashFunction::XX_HASH:
hash_function = "XX_HASH";
break;
case XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2:
hash_function = "MURMUR_HASH_2";
break;
default:
GPR_ASSERT(0);
break;
}
xds_lb_policy["RING_HASH"] = Json::Object{
{"min_ring_size", cluster_data.min_ring_size},
{"max_ring_size", cluster_data.max_ring_size},
{"hash_function", hash_function},
};
} else {
xds_lb_policy["ROUND_ROBIN"] = Json::Object();

@ -28,6 +28,7 @@
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
@ -834,6 +835,13 @@ ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
std::vector<std::string> hierarchical_path = {
priority_child_name, locality_name->AsHumanReadableString()};
for (const auto& endpoint : locality.endpoints) {
const ServerAddressWeightAttribute* weight_attribute = static_cast<
const ServerAddressWeightAttribute*>(endpoint.GetAttribute(
ServerAddressWeightAttribute::kServerAddressWeightAttributeKey));
uint32_t weight = locality.lb_weight;
if (weight_attribute != nullptr) {
weight = locality.lb_weight * weight_attribute->weight();
}
addresses.emplace_back(
endpoint
.WithAttribute(kHierarchicalPathAttributeKey,
@ -841,10 +849,10 @@ ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
.WithAttribute(kXdsLocalityNameAttributeKey,
absl::make_unique<XdsLocalityAttribute>(
locality_name->Ref()))
.WithAttribute(ServerAddressWeightAttribute::
kServerAddressWeightAttributeKey,
absl::make_unique<ServerAddressWeightAttribute>(
locality.lb_weight)));
.WithAttribute(
ServerAddressWeightAttribute::
kServerAddressWeightAttributeKey,
absl::make_unique<ServerAddressWeightAttribute>(weight)));
}
}
}
@ -1201,65 +1209,11 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
}
policy_it = policy.find("RING_HASH");
if (policy_it != policy.end()) {
if (policy_it->second.type() != Json::Type::OBJECT) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:RING_HASH error:type should be object"));
continue;
}
// TODO(donnadionne): Move this to a method in
// ring_hash_experimental and call it here.
const Json::Object& ring_hash = policy_it->second.object_value();
xds_lb_policy = array[i];
size_t min_ring_size = 1024;
size_t max_ring_size = 8388608;
auto ring_hash_it = ring_hash.find("min_ring_size");
if (ring_hash_it == ring_hash.end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:min_ring_size missing"));
} else if (ring_hash_it->second.type() != Json::Type::NUMBER) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:min_ring_size error: should be of "
"number"));
} else {
min_ring_size = gpr_parse_nonnegative_int(
ring_hash_it->second.string_value().c_str());
}
ring_hash_it = ring_hash.find("max_ring_size");
if (ring_hash_it == ring_hash.end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:max_ring_size missing"));
} else if (ring_hash_it->second.type() != Json::Type::NUMBER) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:max_ring_size error: should be of "
"number"));
} else {
max_ring_size = gpr_parse_nonnegative_int(
ring_hash_it->second.string_value().c_str());
}
if (min_ring_size <= 0 || min_ring_size > 8388608 ||
max_ring_size <= 0 || max_ring_size > 8388608 ||
min_ring_size > max_ring_size) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:max_ring_size and or min_ring_size error: "
"values need to be in the range of 1 to 8388608 "
"and max_ring_size cannot be smaller than "
"min_ring_size"));
}
ring_hash_it = ring_hash.find("hash_function");
if (ring_hash_it == ring_hash.end()) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:hash_function missing"));
} else if (ring_hash_it->second.type() != Json::Type::STRING) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:hash_function error: should be a "
"string"));
} else if (ring_hash_it->second.string_value() != "XX_HASH" &&
ring_hash_it->second.string_value() != "MURMUR_HASH_2") {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"field:hash_function error: unsupported "
"hash_function"));
}
break;
size_t min_ring_size;
size_t max_ring_size;
ParseRingHashLbConfig(policy_it->second, &min_ring_size,
&max_ring_size, &error_list);
}
}
}

@ -568,6 +568,9 @@ absl::optional<uint64_t> HeaderHashHelper(
std::string value_buffer;
absl::optional<absl::string_view> header_value =
GetHeaderValue(initial_metadata, policy.header_name, &value_buffer);
if (!header_value.has_value()) {
return absl::nullopt;
}
if (policy.regex != nullptr) {
// If GetHeaderValue() did not already store the value in
// value_buffer, copy it there now, so we can modify it.
@ -649,10 +652,13 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
case XdsApi::Route::HashPolicy::HEADER:
new_hash = HeaderHashHelper(hash_policy, args.initial_metadata);
break;
case XdsApi::Route::HashPolicy::CHANNEL_ID:
new_hash =
static_cast<uint64_t>(reinterpret_cast<uintptr_t>(resolver));
case XdsApi::Route::HashPolicy::CHANNEL_ID: {
std::string address_str = absl::StrFormat(
"%" PRIu64,
static_cast<uint64_t>(reinterpret_cast<uintptr_t>(resolver)));
new_hash = XXH64(address_str.c_str(), address_str.length(), 0);
break;
}
default:
GPR_ASSERT(0);
}
@ -671,7 +677,11 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
}
if (!hash.has_value()) {
// If there is no hash, we just choose a random value as a default.
hash = rand();
// We cannot directly use the result of rand() as the hash value,
// since it is a 32-bit number and not a 64-bit number and will
// therefore not be evenly distributed.
std::string rand_str = absl::StrCat(rand());
hash = XXH64(rand_str.c_str(), rand_str.length(), 0);
}
CallConfig call_config;
if (method_config != nullptr) {
@ -680,8 +690,12 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
call_config.service_config = std::move(method_config);
}
call_config.call_attributes[kXdsClusterAttribute] = it->first;
call_config.call_attributes[kRequestRingHashAttribute] =
absl::StrFormat("%" PRIu64, hash.value());
std::string hash_string = absl::StrCat(hash.value());
char* hash_value =
static_cast<char*>(args.arena->Alloc(hash_string.size() + 1));
memcpy(hash_value, hash_string.c_str(), hash_string.size());
hash_value[hash_string.size()] = '\0';
call_config.call_attributes[kRequestRingHashAttribute] = hash_value;
call_config.on_call_committed = [resolver, cluster_state]() {
cluster_state->Unref();
ExecCtx::Run(

@ -1605,40 +1605,35 @@ grpc_error_handle RouteActionParse(const EncodingContext& context,
regex_rewrite =
envoy_config_route_v3_RouteAction_HashPolicy_Header_regex_rewrite(
header);
if (regex_rewrite == nullptr) {
gpr_log(
GPR_DEBUG,
"RouteAction HashPolicy contains policy specifier Header with "
"RegexMatchAndSubstitution but Regex is missing");
continue;
}
const envoy_type_matcher_v3_RegexMatcher* regex_matcher =
envoy_type_matcher_v3_RegexMatchAndSubstitute_pattern(
regex_rewrite);
if (regex_matcher == nullptr) {
gpr_log(
GPR_DEBUG,
"RouteAction HashPolicy contains policy specifier Header with "
"RegexMatchAndSubstitution but RegexMatcher pattern is "
"missing");
continue;
}
RE2::Options options;
policy.regex = absl::make_unique<RE2>(
UpbStringToStdString(
envoy_type_matcher_v3_RegexMatcher_regex(regex_matcher)),
options);
if (!policy.regex->ok()) {
gpr_log(
GPR_DEBUG,
"RouteAction HashPolicy contains policy specifier Header with "
"RegexMatchAndSubstitution but RegexMatcher pattern does not "
"compile");
continue;
if (regex_rewrite != nullptr) {
const envoy_type_matcher_v3_RegexMatcher* regex_matcher =
envoy_type_matcher_v3_RegexMatchAndSubstitute_pattern(
regex_rewrite);
if (regex_matcher == nullptr) {
gpr_log(
GPR_DEBUG,
"RouteAction HashPolicy contains policy specifier Header with "
"RegexMatchAndSubstitution but RegexMatcher pattern is "
"missing");
continue;
}
RE2::Options options;
policy.regex = absl::make_unique<RE2>(
UpbStringToStdString(
envoy_type_matcher_v3_RegexMatcher_regex(regex_matcher)),
options);
if (!policy.regex->ok()) {
gpr_log(
GPR_DEBUG,
"RouteAction HashPolicy contains policy specifier Header with "
"RegexMatchAndSubstitution but RegexMatcher pattern does not "
"compile");
continue;
}
policy.regex_substitution = UpbStringToStdString(
envoy_type_matcher_v3_RegexMatchAndSubstitute_substitution(
regex_rewrite));
}
policy.regex_substitution = UpbStringToStdString(
envoy_type_matcher_v3_RegexMatchAndSubstitute_substitution(
regex_rewrite));
} else if ((filter_state =
envoy_config_route_v3_RouteAction_HashPolicy_filter_state(
hash_policy)) != nullptr) {
@ -2815,75 +2810,61 @@ grpc_error_handle CdsResponseParse(
// Record ring hash lb config
auto* ring_hash_config =
envoy_config_cluster_v3_Cluster_ring_hash_lb_config(cluster);
if (ring_hash_config == nullptr) {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(cluster_name,
": ring hash lb config required but not present.")
.c_str()));
resource_names_failed->insert(cluster_name);
continue;
}
const google_protobuf_UInt64Value* max_ring_size =
envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size(
ring_hash_config);
if (max_ring_size != nullptr) {
cds_update.max_ring_size =
google_protobuf_UInt64Value_value(max_ring_size);
if (cds_update.max_ring_size > 8388608 ||
cds_update.max_ring_size == 0) {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(
cluster_name,
": max_ring_size is not in the range of 1 to 8388608.")
.c_str()));
resource_names_failed->insert(cluster_name);
continue;
if (ring_hash_config != nullptr) {
const google_protobuf_UInt64Value* max_ring_size =
envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size(
ring_hash_config);
if (max_ring_size != nullptr) {
cds_update.max_ring_size =
google_protobuf_UInt64Value_value(max_ring_size);
if (cds_update.max_ring_size > 8388608 ||
cds_update.max_ring_size == 0) {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(
cluster_name,
": max_ring_size is not in the range of 1 to 8388608.")
.c_str()));
resource_names_failed->insert(cluster_name);
continue;
}
}
}
const google_protobuf_UInt64Value* min_ring_size =
envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size(
ring_hash_config);
if (min_ring_size != nullptr) {
cds_update.min_ring_size =
google_protobuf_UInt64Value_value(min_ring_size);
if (cds_update.min_ring_size > 8388608 ||
cds_update.min_ring_size == 0) {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(
cluster_name,
": min_ring_size is not in the range of 1 to 8388608.")
.c_str()));
resource_names_failed->insert(cluster_name);
continue;
const google_protobuf_UInt64Value* min_ring_size =
envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size(
ring_hash_config);
if (min_ring_size != nullptr) {
cds_update.min_ring_size =
google_protobuf_UInt64Value_value(min_ring_size);
if (cds_update.min_ring_size > 8388608 ||
cds_update.min_ring_size == 0) {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(
cluster_name,
": min_ring_size is not in the range of 1 to 8388608.")
.c_str()));
resource_names_failed->insert(cluster_name);
continue;
}
if (cds_update.min_ring_size > cds_update.max_ring_size) {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(
cluster_name,
": min_ring_size cannot be greater than max_ring_size.")
.c_str()));
resource_names_failed->insert(cluster_name);
continue;
}
}
if (cds_update.min_ring_size > cds_update.max_ring_size) {
if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function(
ring_hash_config) !=
envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(
cluster_name,
": min_ring_size cannot be greater than max_ring_size.")
absl::StrCat(cluster_name,
": ring hash lb config has invalid hash function.")
.c_str()));
resource_names_failed->insert(cluster_name);
continue;
}
}
if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function(
ring_hash_config) ==
envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) {
cds_update.hash_function = XdsApi::CdsUpdate::HashFunction::XX_HASH;
} else if (
envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function(
ring_hash_config) ==
envoy_config_cluster_v3_Cluster_RingHashLbConfig_MURMUR_HASH_2) {
cds_update.hash_function =
XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2;
} else {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(cluster_name,
": ring hash lb config has invalid hash function.")
.c_str()));
resource_names_failed->insert(cluster_name);
continue;
}
} else {
errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat(cluster_name, ": LB policy is not supported.").c_str()));
@ -3014,13 +2995,28 @@ grpc_error_handle ServerAddressParseAndAppend(
if (GPR_UNLIKELY(port >> 16) != 0) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Invalid port.");
}
// Find load_balancing_weight for the endpoint.
const google_protobuf_UInt32Value* load_balancing_weight =
envoy_config_endpoint_v3_LbEndpoint_load_balancing_weight(lb_endpoint);
const int32_t weight =
load_balancing_weight != nullptr
? google_protobuf_UInt32Value_value(load_balancing_weight)
: 500;
if (weight == 0) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Invalid endpoint weight of 0.");
}
// Populate grpc_resolved_address.
grpc_resolved_address addr;
grpc_error_handle error =
grpc_string_to_sockaddr(&addr, address_str.c_str(), port);
if (error != GRPC_ERROR_NONE) return error;
// Append the address to the list.
list->emplace_back(addr, nullptr);
std::map<const char*, std::unique_ptr<ServerAddress::AttributeInterface>>
attributes;
attributes[ServerAddressWeightAttribute::kServerAddressWeightAttributeKey] =
absl::make_unique<ServerAddressWeightAttribute>(weight);
list->emplace_back(addr, nullptr, std::move(attributes));
return GRPC_ERROR_NONE;
}

@ -411,8 +411,6 @@ class XdsApi {
// Used for RING_HASH LB policy only.
uint64_t min_ring_size = 1024;
uint64_t max_ring_size = 8388608;
enum HashFunction { XX_HASH, MURMUR_HASH_2 };
HashFunction hash_function;
// Maximum number of outstanding requests can be made to the upstream
// cluster.
uint32_t max_concurrent_requests = 1024;

@ -63,6 +63,8 @@ void grpc_workaround_cronet_compression_filter_shutdown(void);
namespace grpc_core {
void FaultInjectionFilterInit(void);
void FaultInjectionFilterShutdown(void);
void GrpcLbPolicyRingHashInit(void);
void GrpcLbPolicyRingHashShutdown(void);
} // namespace grpc_core
#ifndef GRPC_NO_XDS
@ -115,6 +117,8 @@ void grpc_register_built_in_plugins(void) {
grpc_lb_policy_pick_first_shutdown);
grpc_register_plugin(grpc_lb_policy_round_robin_init,
grpc_lb_policy_round_robin_shutdown);
grpc_register_plugin(grpc_core::GrpcLbPolicyRingHashInit,
grpc_core::GrpcLbPolicyRingHashShutdown);
grpc_register_plugin(grpc_resolver_dns_ares_init,
grpc_resolver_dns_ares_shutdown);
grpc_register_plugin(grpc_resolver_dns_native_init,

@ -57,6 +57,8 @@ void grpc_message_size_filter_shutdown(void);
namespace grpc_core {
void FaultInjectionFilterInit(void);
void FaultInjectionFilterShutdown(void);
void GrpcLbPolicyRingHashInit(void);
void GrpcLbPolicyRingHashShutdown(void);
} // namespace grpc_core
void grpc_service_config_channel_arg_filter_init(void);
void grpc_service_config_channel_arg_filter_shutdown(void);
@ -94,6 +96,8 @@ void grpc_register_built_in_plugins(void) {
grpc_lb_policy_pick_first_shutdown);
grpc_register_plugin(grpc_lb_policy_round_robin_init,
grpc_lb_policy_round_robin_shutdown);
grpc_register_plugin(grpc_core::GrpcLbPolicyRingHashInit,
grpc_core::GrpcLbPolicyRingHashShutdown);
grpc_register_plugin(grpc_client_idle_filter_init,
grpc_client_idle_filter_shutdown);
grpc_register_plugin(grpc_max_age_filter_init,

@ -153,12 +153,50 @@ message Cluster {
// Configuration to use for EDS updates for the Cluster.
EdsClusterConfig eds_cluster_config = 3;
// Specific configuration for the :ref:`RingHash<arch_overview_load_balancing_types_ring_hash>`
// load balancing policy.
message RingHashLbConfig {
// The hash function used to hash hosts onto the ketama ring.
enum HashFunction {
// Use `xxHash <https://github.com/Cyan4973/xxHash>`_, this is the default hash function.
XX_HASH = 0;
MURMUR_HASH_2 = 1;
}
reserved 2;
// Minimum hash ring size. The larger the ring is (that is, the more hashes there are for each
// provided host) the better the request distribution will reflect the desired weights. Defaults
// to 1024 entries, and limited to 8M entries. See also
// :ref:`maximum_ring_size<envoy_api_field_config.cluster.v3.Cluster.RingHashLbConfig.maximum_ring_size>`.
google.protobuf.UInt64Value minimum_ring_size = 1;
// The hash function used to hash hosts onto the ketama ring. The value defaults to
// :ref:`XX_HASH<envoy_api_enum_value_config.cluster.v3.Cluster.RingHashLbConfig.HashFunction.XX_HASH>`.
HashFunction hash_function = 3;
// Maximum hash ring size. Defaults to 8M entries, and limited to 8M entries, but can be lowered
// to further constrain resource use. See also
// :ref:`minimum_ring_size<envoy_api_field_config.cluster.v3.Cluster.RingHashLbConfig.minimum_ring_size>`.
google.protobuf.UInt64Value maximum_ring_size = 4;
}
// The :ref:`load balancer type <arch_overview_load_balancing_types>` to use
// when picking a host in the cluster.
LbPolicy lb_policy = 6;
CircuitBreakers circuit_breakers = 10;
// Optional configuration for the load balancing algorithm selected by
// LbPolicy. Currently only
// :ref:`RING_HASH<envoy_api_enum_value_config.cluster.v3.Cluster.LbPolicy.RING_HASH>`,
// Specifying ring_hash_lb_config without setting the corresponding
// LbPolicy will generate an error at runtime.
oneof lb_config {
// Optional configuration for the Ring Hash load balancing policy.
RingHashLbConfig ring_hash_lb_config = 23;
}
// Optional custom transport socket implementation to use for upstream connections.
// To setup TLS, set a transport socket with name `tls` and
// :ref:`UpstreamTlsContexts <envoy_api_msg_extensions.transport_sockets.tls.v3.UpstreamTlsContext>` in the `typed_config`.

@ -76,6 +76,17 @@ message LbEndpoint {
// Optional health status when known and supplied by EDS server.
HealthStatus health_status = 2;
// The optional load balancing weight of the upstream host; at least 1.
// Envoy uses the load balancing weight in some of the built in load
// balancers. The load balancing weight for an endpoint is divided by the sum
// of the weights of all endpoints in the endpoint's locality to produce a
// percentage of traffic for the endpoint. This percentage is then further
// weighted by the endpoint's locality's load balancing weight from
// LocalityLbEndpoints. If unspecified, each host is presumed to have equal
// weight in a locality. The sum of the weights of all endpoints in the
// endpoint's locality must not exceed uint32_t maximal value (4294967295).
google.protobuf.UInt32Value load_balancing_weight = 4;
}
// A group of endpoints belonging to a Locality.

@ -36,3 +36,8 @@ message RegexMatcher {
// The regex match string. The string must be supported by the configured engine.
string regex = 2;
}
message RegexMatchAndSubstitute {
RegexMatcher pattern = 1;
string substitution = 2;
}

@ -246,6 +246,78 @@ message RouteAction {
// for additional documentation.
WeightedCluster weighted_clusters = 3;
}
message HashPolicy {
message Header {
// The name of the request header that will be used to obtain the hash
// key. If the request header is not present, no hash will be produced.
string header_name = 1;
// If specified, the request header value will be rewritten and used
// to produce the hash key.
type.matcher.v3.RegexMatchAndSubstitute regex_rewrite = 2;
}
message Cookie {
string name = 1;
}
message ConnectionProperties {
bool source_ip = 1;
}
message QueryParameter {
string name = 1;
}
message FilterState {
// The name of the Object in the per-request filterState, which is an
// Envoy::Http::Hashable object. If there is no data associated with the key,
// or the stored object is not Envoy::Http::Hashable, no hash will be produced.
string key = 1;
}
oneof policy_specifier {
// Header hash policy.
Header header = 1;
// Cookie hash policy.
Cookie cookie = 2;
// Connection properties hash policy.
ConnectionProperties connection_properties = 3;
// Query parameter hash policy.
QueryParameter query_parameter = 5;
// Filter state hash policy.
FilterState filter_state = 6;
}
// The flag that short-circuits the hash computing. This field provides a
// 'fallback' style of configuration: "if a terminal policy doesn't work,
// fallback to rest of the policy list", it saves time when the terminal
// policy works.
//
// If true, and there is already a hash computed, ignore rest of the
// list of hash polices.
// For example, if the following hash methods are configured:
//
// ========= ========
// specifier terminal
// ========= ========
// Header A true
// Header B false
// Header C false
// ========= ========
//
// The generateHash process ends if policy "header A" generates a hash, as
// it's a terminal policy.
bool terminal = 4;
}
repeated HashPolicy hash_policy = 15;
// Specifies the maximum stream duration for this route.
MaxStreamDuration max_stream_duration = 36;
}

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save