From c55c7c065e271f58f70b660ea7c908e97df5d568 Mon Sep 17 00:00:00 2001 From: donnadionne Date: Tue, 18 May 2021 16:00:06 -0700 Subject: [PATCH] ring_hash LB policy implementation (#26285) * ring_hash LB policy (#25697) * Ring Hash Policy implementation * Code review comment fixing * Fixing code review comments. * Code review comment fixing * Fixing reconnect logic * adding helper method for pick * Holding on to ref to parent * first attempt at calling AttemptToConnect * Fixing state change * Fixing code review comments * Fixing the reconnect from channel watcher code * Fixing the BUILD to include new policy * Fixing major code review suggestion * Fixing code review comments * Fixing code review suggestions * Initial 2 tests. * Adding channel id case. * Fixing code review comments. * Small change to get the spread of backends * Add header hashing tests * Added more tests and debugging * Fixing Header hash * Added more tests * cleanup * removing debugs * Fixing code review comments. * code review fixing * combining code and match design * fixing code review comments. * Fixed IDLE case * Moving tests * Fixing code review comments * Adding more tests according to code review comments. * Added tests with differetn types of weights * Adding terminal policy case * Remove hash_func as there is only 1 * Added nack invalid hash function * Added NACK cases * fixing build error * fixing build * small warning * adding regex test * Adding policy tests * fixing warning * fixing warning * fixing code reivew comments. * fixing IDLE case * Code review comments. * fixing code review comments * Making a helper function * fixing reattempt case * Added afew more tests. * Adding more tests * Added backward compatible test * FIxing the reattempt test * Clean up * fixing clang error * fixing clang error * Fix logic discovered during code review * code review comments * code review comments * code review comment * clean up tests * fixing code review comments * clean up tests * Separated test * Fixing test * fixing test * fixing clang error * Addressing code review suggestions * Fixing last bit of code review comments * Fixing flaky tests * Fixing last bit of code review comments * clean debugs * Remove a verbose log * Relaxing deadline exceeded for 1st RPC until ring is optimized. Making Hash more efficient for random case. --- BUILD | 6 + CMakeLists.txt | 1 + Makefile | 2 +- build_autogenerated.yaml | 3 + doc/environment_variables.md | 1 + grpc.gyp | 1 + .../lb_policy/ring_hash/ring_hash.cc | 759 ++++++++++++++ .../lb_policy/ring_hash/ring_hash.h | 10 + .../client_channel/lb_policy/xds/cds.cc | 13 - .../lb_policy/xds/xds_cluster_resolver.cc | 78 +- .../resolver/xds/xds_resolver.cc | 27 +- src/core/ext/xds/xds_api.cc | 186 ++-- src/core/ext/xds/xds_api.h | 2 - .../plugin_registry/grpc_plugin_registry.cc | 4 + .../grpc_unsecure_plugin_registry.cc | 4 + src/proto/grpc/testing/xds/v3/cluster.proto | 38 + src/proto/grpc/testing/xds/v3/endpoint.proto | 11 + src/proto/grpc/testing/xds/v3/regex.proto | 5 + src/proto/grpc/testing/xds/v3/route.proto | 72 ++ test/cpp/end2end/xds_end2end_test.cc | 963 ++++++++++++++++-- 20 files changed, 1934 insertions(+), 252 deletions(-) diff --git a/BUILD b/BUILD index 34226c80de2..af813341781 100644 --- a/BUILD +++ b/BUILD @@ -1105,6 +1105,7 @@ grpc_cc_library( "grpc_client_authority_filter", "grpc_lb_policy_pick_first", "grpc_lb_policy_priority", + "grpc_lb_policy_ring_hash", "grpc_lb_policy_round_robin", "grpc_lb_policy_weighted_target", "grpc_client_idle_filter", @@ -1544,6 +1545,7 @@ grpc_cc_library( "grpc_base", "grpc_client_channel", "grpc_lb_address_filtering", + "grpc_lb_policy_ring_hash", "grpc_lb_xds_channel_args", "grpc_lb_xds_common", "grpc_resolver_fake", @@ -1636,6 +1638,10 @@ grpc_cc_library( hdrs = [ "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h", ], + external_deps = [ + "absl/strings", + "xxhash", + ], language = "c++", deps = [ "grpc_base", diff --git a/CMakeLists.txt b/CMakeLists.txt index 5887f7561f2..2e10b03cdca 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2326,6 +2326,7 @@ add_library(grpc_unsecure src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc src/core/ext/filters/client_channel/lb_policy/priority/priority.cc + src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc src/core/ext/filters/client_channel/lb_policy_registry.cc diff --git a/Makefile b/Makefile index 5abbc892158..c91e53a66d7 100644 --- a/Makefile +++ b/Makefile @@ -1735,6 +1735,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc \ src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \ src/core/ext/filters/client_channel/lb_policy/priority/priority.cc \ + src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc \ src/core/ext/filters/client_channel/lb_policy_registry.cc \ @@ -2669,7 +2670,6 @@ ifneq ($(OPENSSL_DEP),) # installing headers to their final destination on the drive. We need this # otherwise parallel compilation will fail if a source is compiled first. src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.cc: $(OPENSSL_DEP) -src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/cds.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc: $(OPENSSL_DEP) src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc: $(OPENSSL_DEP) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 31c9d54da07..7514911bab3 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -1618,6 +1618,7 @@ libs: - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h - src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h + - src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h - src/core/ext/filters/client_channel/lb_policy/subchannel_list.h - src/core/ext/filters/client_channel/lb_policy_factory.h - src/core/ext/filters/client_channel/lb_policy_registry.h @@ -1828,6 +1829,7 @@ libs: - src/core/lib/transport/transport.h - src/core/lib/transport/transport_impl.h - src/core/lib/uri/uri_parser.h + - third_party/xxhash/xxhash.h src: - src/core/ext/filters/census/grpc_context.cc - src/core/ext/filters/client_channel/backend_metric.cc @@ -1854,6 +1856,7 @@ libs: - src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc - src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc - src/core/ext/filters/client_channel/lb_policy/priority/priority.cc + - src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc - src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc - src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc - src/core/ext/filters/client_channel/lb_policy_registry.cc diff --git a/doc/environment_variables.md b/doc/environment_variables.md index bdb54a06136..1aa0719a3d9 100644 --- a/doc/environment_variables.md +++ b/doc/environment_variables.md @@ -75,6 +75,7 @@ some configuration as environment variables that can be set. in DEBUG) - priority_lb - traces priority LB policy - resource_quota - trace resource quota objects internals + - ring_hash_lb - traces the ring hash load balancing policy - round_robin - traces the round_robin load balancing policy - queue_pluck - server_channel - lightweight trace of significant server channel events diff --git a/grpc.gyp b/grpc.gyp index 2418290d261..b717f768564 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -1122,6 +1122,7 @@ 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/priority/priority.cc', + 'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc', 'src/core/ext/filters/client_channel/lb_policy_registry.cc', diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc index 921bd2e0078..ad001d87b12 100644 --- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc +++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc @@ -16,8 +16,767 @@ #include +#include +#include + +#include "absl/strings/numbers.h" +#include "absl/strings/str_cat.h" +#define XXH_INLINE_ALL +#include "xxhash.h" + +#include +#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" +#include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/lib/address_utils/sockaddr_utils.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/error_utils.h" +#include "src/core/lib/transport/static_metadata.h" + namespace grpc_core { const char* kRequestRingHashAttribute = "request_ring_hash"; +TraceFlag grpc_lb_ring_hash_trace(false, "ring_hash_lb"); + +// Helper Parser method +void ParseRingHashLbConfig(const Json& json, size_t* min_ring_size, + size_t* max_ring_size, + std::vector* error_list) { + *min_ring_size = 1024; + *max_ring_size = 8388608; + if (json.type() != Json::Type::OBJECT) { + error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "ring_hash_experimental should be of type object")); + return; + } + const Json::Object& ring_hash = json.object_value(); + auto ring_hash_it = ring_hash.find("min_ring_size"); + if (ring_hash_it != ring_hash.end()) { + if (ring_hash_it->second.type() != Json::Type::NUMBER) { + error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:min_ring_size error: should be of type number")); + } else { + *min_ring_size = gpr_parse_nonnegative_int( + ring_hash_it->second.string_value().c_str()); + } + } + ring_hash_it = ring_hash.find("max_ring_size"); + if (ring_hash_it != ring_hash.end()) { + if (ring_hash_it->second.type() != Json::Type::NUMBER) { + error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:max_ring_size error: should be of type number")); + } else { + *max_ring_size = gpr_parse_nonnegative_int( + ring_hash_it->second.string_value().c_str()); + } + } + if (*min_ring_size == 0 || *min_ring_size > 8388608 || *max_ring_size == 0 || + *max_ring_size > 8388608 || *min_ring_size > *max_ring_size) { + error_list->push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:max_ring_size and or min_ring_size error: " + "values need to be in the range of 1 to 8388608 " + "and max_ring_size cannot be smaller than " + "min_ring_size")); + } +} + +namespace { + +constexpr char kRingHash[] = "ring_hash_experimental"; + +class RingHashLbConfig : public LoadBalancingPolicy::Config { + public: + RingHashLbConfig(size_t min_ring_size, size_t max_ring_size) + : min_ring_size_(min_ring_size), max_ring_size_(max_ring_size) {} + const char* name() const override { return kRingHash; } + size_t min_ring_size() const { return min_ring_size_; } + size_t max_ring_size() const { return max_ring_size_; } + + private: + size_t min_ring_size_; + size_t max_ring_size_; +}; + +// +// ring_hash LB policy +// +class RingHash : public LoadBalancingPolicy { + public: + explicit RingHash(Args args); + + const char* name() const override { return kRingHash; } + + void UpdateLocked(UpdateArgs args) override; + void ResetBackoffLocked() override; + + private: + ~RingHash() override; + + // Forward declaration. + class RingHashSubchannelList; + + // Data for a particular subchannel in a subchannel list. + // This subclass adds the following functionality: + // - Tracks the previous connectivity state of the subchannel, so that + // we know how many subchannels are in each state. + class RingHashSubchannelData + : public SubchannelData { + public: + RingHashSubchannelData( + SubchannelList* + subchannel_list, + const ServerAddress& address, + RefCountedPtr subchannel) + : SubchannelData(subchannel_list, address, std::move(subchannel)), + address_(address) {} + + grpc_connectivity_state connectivity_state() const { + return last_connectivity_state_; + } + const ServerAddress& address() const { return address_; } + + bool seen_failure_since_ready() const { return seen_failure_since_ready_; } + + // Performs connectivity state updates that need to be done both when we + // first start watching and when a watcher notification is received. + void UpdateConnectivityStateLocked( + grpc_connectivity_state connectivity_state); + + private: + // Performs connectivity state updates that need to be done only + // after we have started watching. + void ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state) override; + + ServerAddress address_; + grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN; + bool seen_failure_since_ready_ = false; + }; + + // A list of subchannels. + class RingHashSubchannelList + : public SubchannelList { + public: + RingHashSubchannelList(RingHash* policy, TraceFlag* tracer, + ServerAddressList addresses, + const grpc_channel_args& args) + : SubchannelList(policy, tracer, std::move(addresses), + policy->channel_control_helper(), args) { + // Need to maintain a ref to the LB policy as long as we maintain + // any references to subchannels, since the subchannels' + // pollset_sets will include the LB policy's pollset_set. + policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); + } + + ~RingHashSubchannelList() override { + RingHash* p = static_cast(policy()); + p->Unref(DEBUG_LOCATION, "subchannel_list"); + } + + // Starts watching the subchannels in this list. + void StartWatchingLocked(); + + // Updates the counters of subchannels in each state when a + // subchannel transitions from old_state to new_state. + void UpdateStateCountersLocked(grpc_connectivity_state old_state, + grpc_connectivity_state new_state); + + // Updates the RH policy's connectivity state based on the + // subchannel list's state counters, creating new picker and new ring. + // Furthermore, return a bool indicating whether the aggregated state is + // Transient Failure. + bool UpdateRingHashConnectivityStateLocked(); + + private: + size_t num_idle_ = 0; + size_t num_ready_ = 0; + size_t num_connecting_ = 0; + size_t num_transient_failure_ = 0; + }; + + class Picker : public SubchannelPicker { + public: + Picker(RefCountedPtr parent, + RingHashSubchannelList* subchannel_list); + + PickResult Pick(PickArgs args) override; + + private: + struct RingEntry { + uint64_t hash; + RefCountedPtr subchannel; + grpc_connectivity_state connectivity_state; + }; + + // A fire-and-forget class that schedules subchannel connection attempts + // on the control plane WorkSerializer. + class SubchannelConnectionAttempter : public Orphanable { + public: + explicit SubchannelConnectionAttempter( + RefCountedPtr ring_hash_lb) + : ring_hash_lb_(std::move(ring_hash_lb)) { + GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr); + } + + void AddSubchannel(RefCountedPtr subchannel) { + subchannels_.push_back(std::move(subchannel)); + } + + void Orphan() override { + // Hop into ExecCtx, so that we're not holding the data plane mutex + // while we run control-plane code. + ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); + } + + private: + static void RunInExecCtx(void* arg, grpc_error* /*error*/) { + auto* self = static_cast(arg); + self->ring_hash_lb_->work_serializer()->Run( + [self]() { + if (!self->ring_hash_lb_->shutdown_) { + for (auto& subchannel : self->subchannels_) { + subchannel->AttemptToConnect(); + } + } + delete self; + }, + DEBUG_LOCATION); + } + + RefCountedPtr ring_hash_lb_; + grpc_closure closure_; + absl::InlinedVector, 10> subchannels_; + }; + + RefCountedPtr parent_; + + // A ring of subchannels. + std::vector ring_; + }; + + void ShutdownLocked() override; + + // Current config from resolver. + RefCountedPtr config_; + + // list of subchannels. + OrphanablePtr subchannel_list_; + // indicating if we are shutting down. + bool shutdown_ = false; +}; + +// +// RingHash::Picker +// + +RingHash::Picker::Picker(RefCountedPtr parent, + RingHashSubchannelList* subchannel_list) + : parent_(std::move(parent)) { + size_t num_subchannels = subchannel_list->num_subchannels(); + // Store the weights while finding the sum. + struct AddressWeight { + std::string address; + // Default weight is 1 for the cases where a weight is not provided, + // each occurrence of the address will be counted a weight value of 1. + uint32_t weight = 1; + double normalized_weight; + }; + std::vector address_weights; + size_t sum = 0; + address_weights.reserve(num_subchannels); + for (size_t i = 0; i < num_subchannels; ++i) { + RingHashSubchannelData* sd = subchannel_list->subchannel(i); + const ServerAddressWeightAttribute* weight_attribute = static_cast< + const ServerAddressWeightAttribute*>(sd->address().GetAttribute( + ServerAddressWeightAttribute::kServerAddressWeightAttributeKey)); + AddressWeight address_weight; + address_weight.address = + grpc_sockaddr_to_string(&sd->address().address(), false); + if (weight_attribute != nullptr) { + GPR_ASSERT(weight_attribute->weight() != 0); + address_weight.weight = weight_attribute->weight(); + } + sum += address_weight.weight; + address_weights.push_back(std::move(address_weight)); + } + // Calculating normalized weights and find min and max. + double min_normalized_weight = 1.0; + double max_normalized_weight = 0.0; + for (auto& address : address_weights) { + address.normalized_weight = static_cast(address.weight) / sum; + min_normalized_weight = + std::min(address.normalized_weight, min_normalized_weight); + max_normalized_weight = + std::max(address.normalized_weight, max_normalized_weight); + } + // Scale up the number of hashes per host such that the least-weighted host + // gets a whole number of hashes on the ring. Other hosts might not end up + // with whole numbers, and that's fine (the ring-building algorithm below can + // handle this). This preserves the original implementation's behavior: when + // weights aren't provided, all hosts should get an equal number of hashes. In + // the case where this number exceeds the max_ring_size, it's scaled back down + // to fit. + const size_t min_ring_size = parent_->config_->min_ring_size(); + const size_t max_ring_size = parent_->config_->max_ring_size(); + const double scale = std::min( + std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight, + static_cast(max_ring_size)); + // Reserve memory for the entire ring up front. + const uint64_t ring_size = std::ceil(scale); + ring_.reserve(ring_size); + // Populate the hash ring by walking through the (host, weight) pairs in + // normalized_host_weights, and generating (scale * weight) hashes for each + // host. Since these aren't necessarily whole numbers, we maintain running + // sums -- current_hashes and target_hashes -- which allows us to populate the + // ring in a mostly stable way. + absl::InlinedVector hash_key_buffer; + double current_hashes = 0.0; + double target_hashes = 0.0; + uint64_t min_hashes_per_host = ring_size; + uint64_t max_hashes_per_host = 0; + for (size_t i = 0; i < num_subchannels; ++i) { + const std::string& address_string = address_weights[i].address; + hash_key_buffer.assign(address_string.begin(), address_string.end()); + hash_key_buffer.emplace_back('_'); + auto offset_start = hash_key_buffer.end(); + target_hashes += scale * address_weights[i].normalized_weight; + size_t count = 0; + auto current_state = + subchannel_list->subchannel(i)->subchannel()->CheckConnectivityState(); + while (current_hashes < target_hashes) { + const std::string count_str = absl::StrCat(count); + hash_key_buffer.insert(offset_start, count_str.begin(), count_str.end()); + absl::string_view hash_key(hash_key_buffer.data(), + hash_key_buffer.size()); + const uint64_t hash = XXH64(hash_key.data(), hash_key.size(), 0); + ring_.push_back({hash, + subchannel_list->subchannel(i)->subchannel()->Ref(), + current_state}); + ++count; + ++current_hashes; + hash_key_buffer.erase(offset_start, hash_key_buffer.end()); + } + min_hashes_per_host = + std::min(static_cast(i), min_hashes_per_host); + max_hashes_per_host = + std::max(static_cast(i), max_hashes_per_host); + } + std::sort(ring_.begin(), ring_.end(), + [](const RingEntry& lhs, const RingEntry& rhs) -> bool { + return lhs.hash < rhs.hash; + }); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { + gpr_log(GPR_INFO, + "[RH %p picker %p] created picker from subchannel_list=%p " + "with %" PRIuPTR " ring entries", + parent_.get(), this, subchannel_list, ring_.size()); + // for (const auto& r : ring_) { + // gpr_log(GPR_INFO, "donn ring hash: %" PRIx64 " subchannel: %p state: %d", + // r.hash, r.subchannel.get(), r.connectivity_state); + //} + } +} + +RingHash::PickResult RingHash::Picker::Pick(PickArgs args) { + PickResult result; + // Initialize to PICK_FAILED. + result.type = PickResult::PICK_FAILED; + auto hash = + args.call_state->ExperimentalGetCallAttribute(kRequestRingHashAttribute); + uint64_t h; + if (!absl::SimpleAtoi(hash, &h)) { + result.error = grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("xds ring hash value is not a number").c_str()), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL); + return result; + } + // Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c + // (ketama_get_server) NOTE: The algorithm depends on using signed integers + // for lowp, highp, and first_index. Do not change them! + int64_t lowp = 0; + int64_t highp = ring_.size(); + int64_t first_index = 0; + while (true) { + first_index = (lowp + highp) / 2; + if (first_index == static_cast(ring_.size())) { + first_index = 0; + break; + } + uint64_t midval = ring_[first_index].hash; + uint64_t midval1 = first_index == 0 ? 0 : ring_[first_index - 1].hash; + if (h <= midval && h > midval1) { + break; + } + if (midval < h) { + lowp = first_index + 1; + } else { + highp = first_index - 1; + } + if (lowp > highp) { + first_index = 0; + break; + } + } + OrphanablePtr subchannel_connection_attempter; + auto ScheduleSubchannelConnectionAttempt = + [&](RefCountedPtr subchannel) { + if (subchannel_connection_attempter == nullptr) { + subchannel_connection_attempter = + MakeOrphanable(parent_); + } + subchannel_connection_attempter->AddSubchannel(std::move(subchannel)); + }; + switch (ring_[first_index].connectivity_state) { + case GRPC_CHANNEL_READY: + result.type = PickResult::PICK_COMPLETE; + result.subchannel = ring_[first_index].subchannel; + return result; + case GRPC_CHANNEL_IDLE: + ScheduleSubchannelConnectionAttempt(ring_[first_index].subchannel); + // fallthrough + case GRPC_CHANNEL_CONNECTING: + result.type = PickResult::PICK_QUEUE; + return result; + default: // GRPC_CHANNEL_TRANSIENT_FAILURE + break; + } + ScheduleSubchannelConnectionAttempt(ring_[first_index].subchannel); + // Loop through remaining subchannels to find one in READY. + // On the way, we make sure the right set of connection attempts + // will happen. + bool found_second_subchannel = false; + bool found_first_non_failed = false; + for (size_t i = 1; i < ring_.size(); ++i) { + const RingEntry& entry = ring_[(first_index + i) % ring_.size()]; + if (entry.subchannel == ring_[first_index].subchannel) { + continue; + } + if (entry.connectivity_state == GRPC_CHANNEL_READY) { + result.type = PickResult::PICK_COMPLETE; + result.subchannel = entry.subchannel; + return result; + } + if (!found_second_subchannel) { + switch (entry.connectivity_state) { + case GRPC_CHANNEL_IDLE: + ScheduleSubchannelConnectionAttempt(entry.subchannel); + // fallthrough + case GRPC_CHANNEL_CONNECTING: + result.type = PickResult::PICK_QUEUE; + return result; + default: + break; + } + found_second_subchannel = true; + } + if (!found_first_non_failed) { + if (entry.connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + ScheduleSubchannelConnectionAttempt(entry.subchannel); + } else { + if (entry.connectivity_state == GRPC_CHANNEL_IDLE) { + ScheduleSubchannelConnectionAttempt(entry.subchannel); + } + found_first_non_failed = true; + } + } + } + result.error = + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat("xds ring hash found a subchannel " + "that is in TRANSIENT_FAILURE state") + .c_str()), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL); + return result; +} + +// +// RingHash::RingHashSubchannelList +// + +void RingHash::RingHashSubchannelList::StartWatchingLocked() { + if (num_subchannels() == 0) return; + // Check current state of each subchannel synchronously. + for (size_t i = 0; i < num_subchannels(); ++i) { + grpc_connectivity_state state = + subchannel(i)->CheckConnectivityStateLocked(); + subchannel(i)->UpdateConnectivityStateLocked(state); + } + // Start connectivity watch for each subchannel. + for (size_t i = 0; i < num_subchannels(); i++) { + if (subchannel(i)->subchannel() != nullptr) { + subchannel(i)->StartConnectivityWatchLocked(); + } + } + RingHash* p = static_cast(policy()); + // Sending up the initial picker while all subchannels are in IDLE state. + p->channel_control_helper()->UpdateState( + GRPC_CHANNEL_READY, absl::Status(), + absl::make_unique(p->Ref(DEBUG_LOCATION, "RingHashPicker"), + this)); +} + +void RingHash::RingHashSubchannelList::UpdateStateCountersLocked( + grpc_connectivity_state old_state, grpc_connectivity_state new_state) { + GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN); + if (old_state == GRPC_CHANNEL_IDLE) { + GPR_ASSERT(num_idle_ > 0); + --num_idle_; + } else if (old_state == GRPC_CHANNEL_READY) { + GPR_ASSERT(num_ready_ > 0); + --num_ready_; + } else if (old_state == GRPC_CHANNEL_CONNECTING) { + GPR_ASSERT(num_connecting_ > 0); + --num_connecting_; + } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + GPR_ASSERT(num_transient_failure_ > 0); + --num_transient_failure_; + } + if (new_state == GRPC_CHANNEL_IDLE) { + ++num_idle_; + } else if (new_state == GRPC_CHANNEL_READY) { + ++num_ready_; + } else if (new_state == GRPC_CHANNEL_CONNECTING) { + ++num_connecting_; + } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + ++num_transient_failure_; + } +} + +// Sets the RH policy's connectivity state and generates a new picker based +// on the current subchannel list or requests an re-attempt by returning true.. +bool RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked() { + RingHash* p = static_cast(policy()); + // Only set connectivity state if this is the current subchannel list. + if (p->subchannel_list_.get() != this) return false; + // The overall aggregation rules here are: + // 1. If there is at least one subchannel in READY state, report READY. + // 2. If there are 2 or more subchannels in TRANSIENT_FAILURE state, report + // TRANSIENT_FAILURE. + // 3. If there is at least one subchannel in CONNECTING state, report + // CONNECTING. + // 4. If there is at least one subchannel in IDLE state, report IDLE. + // 5. Otherwise, report TRANSIENT_FAILURE. + if (num_ready_ > 0) { + /* READY */ + p->channel_control_helper()->UpdateState( + GRPC_CHANNEL_READY, absl::Status(), + absl::make_unique(p->Ref(DEBUG_LOCATION, "RingHashPicker"), + this)); + return false; + } + if (num_connecting_ > 0 && num_transient_failure_ < 2) { + p->channel_control_helper()->UpdateState( + GRPC_CHANNEL_CONNECTING, absl::Status(), + absl::make_unique(p->Ref(DEBUG_LOCATION, "QueuePicker"))); + return false; + } + if (num_idle_ > 0 && num_transient_failure_ < 2) { + p->channel_control_helper()->UpdateState( + GRPC_CHANNEL_IDLE, absl::Status(), + absl::make_unique(p->Ref(DEBUG_LOCATION, "RingHashPicker"), + this)); + return false; + } + grpc_error* error = + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "connections to backend failing or idle"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); + p->channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), + absl::make_unique(error)); + return true; +} + +// +// RingHash::RingHashSubchannelData +// + +void RingHash::RingHashSubchannelData::UpdateConnectivityStateLocked( + grpc_connectivity_state connectivity_state) { + RingHash* p = static_cast(subchannel_list()->policy()); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { + gpr_log( + GPR_INFO, + "[RR %p] connectivity changed for subchannel %p, subchannel_list %p " + "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s", + p, subchannel(), subchannel_list(), Index(), + subchannel_list()->num_subchannels(), + ConnectivityStateName(last_connectivity_state_), + ConnectivityStateName(connectivity_state)); + } + // Decide what state to report for aggregation purposes. + // If we haven't seen a failure since the last time we were in state + // READY, then we report the state change as-is. However, once we do see + // a failure, we report TRANSIENT_FAILURE and do not report any subsequent + // state changes until we go back into state READY. + if (!seen_failure_since_ready_) { + if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + seen_failure_since_ready_ = true; + } + subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_, + connectivity_state); + } else { + if (connectivity_state == GRPC_CHANNEL_READY) { + seen_failure_since_ready_ = false; + subchannel_list()->UpdateStateCountersLocked( + GRPC_CHANNEL_TRANSIENT_FAILURE, connectivity_state); + } + } + // Record last seen connectivity state. + last_connectivity_state_ = connectivity_state; +} + +void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked( + grpc_connectivity_state connectivity_state) { + RingHash* p = static_cast(subchannel_list()->policy()); + GPR_ASSERT(subchannel() != nullptr); + // If the new state is TRANSIENT_FAILURE, re-resolve. + // Only do this if we've started watching, not at startup time. + // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE + // when the subchannel list was created, we'd wind up in a constant + // loop of re-resolution. + // Also attempt to reconnect. + if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { + gpr_log(GPR_INFO, + "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " + "Requesting re-resolution", + p, subchannel()); + } + p->channel_control_helper()->RequestReresolution(); + } + // Update state counters. + UpdateConnectivityStateLocked(connectivity_state); + // Update the RH policy's connectivity state, creating new picker and new + // ring. + bool transient_failure = + subchannel_list()->UpdateRingHashConnectivityStateLocked(); + // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will + // not be getting any pick requests from the priority policy. + // However, because the ring_hash policy does not attempt to + // reconnect to subchannels unless it is getting pick requests, + // it will need special handling to ensure that it will eventually + // recover from TRANSIENT_FAILURE state once the problem is resolved. + // Specifically, it will make sure that it is attempting to connect to + // at least one subchannel at any given time. After a given subchannel + // fails a connection attempt, it will move on to the next subchannel + // in the ring. It will keep doing this until one of the subchannels + // successfully connects, at which point it will report READY and stop + // proactively trying to connect. The policy will remain in + // TRANSIENT_FAILURE until at least one subchannel becomes connected, + // even if subchannels are in state CONNECTING during that time. + if (transient_failure && + connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + size_t next_index = (Index() + 1) % subchannel_list()->num_subchannels(); + RingHashSubchannelData* next_sd = subchannel_list()->subchannel(next_index); + next_sd->subchannel()->AttemptToConnect(); + } +} + +// +// RingHash +// + +RingHash::RingHash(Args args) : LoadBalancingPolicy(std::move(args)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { + gpr_log(GPR_INFO, "[RH %p] Created", this); + } +} + +RingHash::~RingHash() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { + gpr_log(GPR_INFO, "[RH %p] Destroying Ring Hash policy", this); + } + GPR_ASSERT(subchannel_list_ == nullptr); +} + +void RingHash::ShutdownLocked() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { + gpr_log(GPR_INFO, "[RH %p] Shutting down", this); + } + shutdown_ = true; + subchannel_list_.reset(); +} + +void RingHash::ResetBackoffLocked() { subchannel_list_->ResetBackoffLocked(); } + +void RingHash::UpdateLocked(UpdateArgs args) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { + gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses", + this, args.addresses.size()); + } + config_ = std::move(args.config); + // Filter out any address with weight 0. + ServerAddressList addresses; + addresses.reserve(args.addresses.size()); + for (ServerAddress& address : args.addresses) { + const ServerAddressWeightAttribute* weight_attribute = + static_cast(address.GetAttribute( + ServerAddressWeightAttribute::kServerAddressWeightAttributeKey)); + if (weight_attribute == nullptr || weight_attribute->weight() > 0) { + addresses.push_back(std::move(address)); + } + } + subchannel_list_ = MakeOrphanable( + this, &grpc_lb_ring_hash_trace, std::move(addresses), *args.args); + if (subchannel_list_->num_subchannels() == 0) { + // If the new list is empty, immediately transition to TRANSIENT_FAILURE. + grpc_error* error = + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); + channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), + absl::make_unique(error)); + } else { + // Start watching the new list. + subchannel_list_->StartWatchingLocked(); + } +} + +// +// factory +// + +class RingHashFactory : public LoadBalancingPolicyFactory { + public: + OrphanablePtr CreateLoadBalancingPolicy( + LoadBalancingPolicy::Args args) const override { + return MakeOrphanable(std::move(args)); + } + + const char* name() const override { return kRingHash; } + + RefCountedPtr ParseLoadBalancingConfig( + const Json& json, grpc_error** error) const override { + size_t min_ring_size; + size_t max_ring_size; + std::vector error_list; + ParseRingHashLbConfig(json, &min_ring_size, &max_ring_size, &error_list); + if (error_list.empty()) { + return MakeRefCounted(min_ring_size, max_ring_size); + } else { + *error = GRPC_ERROR_CREATE_FROM_VECTOR( + "ring_hash_experimental LB policy config", &error_list); + return nullptr; + } + } +}; + +} // namespace + +void GrpcLbPolicyRingHashInit() { + grpc_core::LoadBalancingPolicyRegistry::Builder:: + RegisterLoadBalancingPolicyFactory( + absl::make_unique()); +} + +void GrpcLbPolicyRingHashShutdown() {} } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h index dc176c2dc49..f0f86b25239 100644 --- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h +++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h @@ -19,9 +19,19 @@ #include +#include + +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/json/json.h" + namespace grpc_core { extern const char* kRequestRingHashAttribute; +// Helper Parsing method to parse ring hash policy configs; for example, ring +// hash size validity. +void ParseRingHashLbConfig(const Json& json, size_t* min_ring_size, + size_t* max_ring_size, + std::vector* error_list); } // namespace grpc_core #endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_RING_HASH_RING_HASH_H diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc index 49bff276958..3531ca508bf 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/cds.cc @@ -452,22 +452,9 @@ void CdsLb::OnClusterChanged(const std::string& name, // Construct config for child policy. Json::Object xds_lb_policy; if (cluster_data.lb_policy == "RING_HASH") { - std::string hash_function; - switch (cluster_data.hash_function) { - case XdsApi::CdsUpdate::HashFunction::XX_HASH: - hash_function = "XX_HASH"; - break; - case XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2: - hash_function = "MURMUR_HASH_2"; - break; - default: - GPR_ASSERT(0); - break; - } xds_lb_policy["RING_HASH"] = Json::Object{ {"min_ring_size", cluster_data.min_ring_size}, {"max_ring_size", cluster_data.max_ring_size}, - {"hash_function", hash_function}, }; } else { xds_lb_policy["ROUND_ROBIN"] = Json::Object(); diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index bcb8194561b..62851e8d68a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -28,6 +28,7 @@ #include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h" #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" +#include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h" #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" @@ -834,6 +835,13 @@ ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() { std::vector hierarchical_path = { priority_child_name, locality_name->AsHumanReadableString()}; for (const auto& endpoint : locality.endpoints) { + const ServerAddressWeightAttribute* weight_attribute = static_cast< + const ServerAddressWeightAttribute*>(endpoint.GetAttribute( + ServerAddressWeightAttribute::kServerAddressWeightAttributeKey)); + uint32_t weight = locality.lb_weight; + if (weight_attribute != nullptr) { + weight = locality.lb_weight * weight_attribute->weight(); + } addresses.emplace_back( endpoint .WithAttribute(kHierarchicalPathAttributeKey, @@ -841,10 +849,10 @@ ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() { .WithAttribute(kXdsLocalityNameAttributeKey, absl::make_unique( locality_name->Ref())) - .WithAttribute(ServerAddressWeightAttribute:: - kServerAddressWeightAttributeKey, - absl::make_unique( - locality.lb_weight))); + .WithAttribute( + ServerAddressWeightAttribute:: + kServerAddressWeightAttributeKey, + absl::make_unique(weight))); } } } @@ -1201,65 +1209,11 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory { } policy_it = policy.find("RING_HASH"); if (policy_it != policy.end()) { - if (policy_it->second.type() != Json::Type::OBJECT) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:RING_HASH error:type should be object")); - continue; - } - // TODO(donnadionne): Move this to a method in - // ring_hash_experimental and call it here. - const Json::Object& ring_hash = policy_it->second.object_value(); xds_lb_policy = array[i]; - size_t min_ring_size = 1024; - size_t max_ring_size = 8388608; - auto ring_hash_it = ring_hash.find("min_ring_size"); - if (ring_hash_it == ring_hash.end()) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:min_ring_size missing")); - } else if (ring_hash_it->second.type() != Json::Type::NUMBER) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:min_ring_size error: should be of " - "number")); - } else { - min_ring_size = gpr_parse_nonnegative_int( - ring_hash_it->second.string_value().c_str()); - } - ring_hash_it = ring_hash.find("max_ring_size"); - if (ring_hash_it == ring_hash.end()) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:max_ring_size missing")); - } else if (ring_hash_it->second.type() != Json::Type::NUMBER) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:max_ring_size error: should be of " - "number")); - } else { - max_ring_size = gpr_parse_nonnegative_int( - ring_hash_it->second.string_value().c_str()); - } - if (min_ring_size <= 0 || min_ring_size > 8388608 || - max_ring_size <= 0 || max_ring_size > 8388608 || - min_ring_size > max_ring_size) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:max_ring_size and or min_ring_size error: " - "values need to be in the range of 1 to 8388608 " - "and max_ring_size cannot be smaller than " - "min_ring_size")); - } - ring_hash_it = ring_hash.find("hash_function"); - if (ring_hash_it == ring_hash.end()) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:hash_function missing")); - } else if (ring_hash_it->second.type() != Json::Type::STRING) { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:hash_function error: should be a " - "string")); - } else if (ring_hash_it->second.string_value() != "XX_HASH" && - ring_hash_it->second.string_value() != "MURMUR_HASH_2") { - error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "field:hash_function error: unsupported " - "hash_function")); - } - break; + size_t min_ring_size; + size_t max_ring_size; + ParseRingHashLbConfig(policy_it->second, &min_ring_size, + &max_ring_size, &error_list); } } } diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index 3a10a423630..871d756a4f5 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -568,6 +568,9 @@ absl::optional HeaderHashHelper( std::string value_buffer; absl::optional header_value = GetHeaderValue(initial_metadata, policy.header_name, &value_buffer); + if (!header_value.has_value()) { + return absl::nullopt; + } if (policy.regex != nullptr) { // If GetHeaderValue() did not already store the value in // value_buffer, copy it there now, so we can modify it. @@ -649,10 +652,13 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig( case XdsApi::Route::HashPolicy::HEADER: new_hash = HeaderHashHelper(hash_policy, args.initial_metadata); break; - case XdsApi::Route::HashPolicy::CHANNEL_ID: - new_hash = - static_cast(reinterpret_cast(resolver)); + case XdsApi::Route::HashPolicy::CHANNEL_ID: { + std::string address_str = absl::StrFormat( + "%" PRIu64, + static_cast(reinterpret_cast(resolver))); + new_hash = XXH64(address_str.c_str(), address_str.length(), 0); break; + } default: GPR_ASSERT(0); } @@ -671,7 +677,12 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig( } if (!hash.has_value()) { // If there is no hash, we just choose a random value as a default. - hash = rand(); + // We cannot directly use the result of rand() as the hash value, + // since it is a 32-bit number and not a 64-bit number and will + // therefore not be evenly distributed. + uint32_t upper = rand(); + uint32_t lower = rand(); + hash = (static_cast(upper) << 32) | lower; } CallConfig call_config; if (method_config != nullptr) { @@ -680,8 +691,12 @@ ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig( call_config.service_config = std::move(method_config); } call_config.call_attributes[kXdsClusterAttribute] = it->first; - call_config.call_attributes[kRequestRingHashAttribute] = - absl::StrFormat("%" PRIu64, hash.value()); + std::string hash_string = absl::StrCat(hash.value()); + char* hash_value = + static_cast(args.arena->Alloc(hash_string.size() + 1)); + memcpy(hash_value, hash_string.c_str(), hash_string.size()); + hash_value[hash_string.size()] = '\0'; + call_config.call_attributes[kRequestRingHashAttribute] = hash_value; call_config.on_call_committed = [resolver, cluster_state]() { cluster_state->Unref(); ExecCtx::Run( diff --git a/src/core/ext/xds/xds_api.cc b/src/core/ext/xds/xds_api.cc index e51bc07cbb1..610d4e22863 100644 --- a/src/core/ext/xds/xds_api.cc +++ b/src/core/ext/xds/xds_api.cc @@ -1605,40 +1605,35 @@ grpc_error_handle RouteActionParse(const EncodingContext& context, regex_rewrite = envoy_config_route_v3_RouteAction_HashPolicy_Header_regex_rewrite( header); - if (regex_rewrite == nullptr) { - gpr_log( - GPR_DEBUG, - "RouteAction HashPolicy contains policy specifier Header with " - "RegexMatchAndSubstitution but Regex is missing"); - continue; - } - const envoy_type_matcher_v3_RegexMatcher* regex_matcher = - envoy_type_matcher_v3_RegexMatchAndSubstitute_pattern( - regex_rewrite); - if (regex_matcher == nullptr) { - gpr_log( - GPR_DEBUG, - "RouteAction HashPolicy contains policy specifier Header with " - "RegexMatchAndSubstitution but RegexMatcher pattern is " - "missing"); - continue; - } - RE2::Options options; - policy.regex = absl::make_unique( - UpbStringToStdString( - envoy_type_matcher_v3_RegexMatcher_regex(regex_matcher)), - options); - if (!policy.regex->ok()) { - gpr_log( - GPR_DEBUG, - "RouteAction HashPolicy contains policy specifier Header with " - "RegexMatchAndSubstitution but RegexMatcher pattern does not " - "compile"); - continue; + if (regex_rewrite != nullptr) { + const envoy_type_matcher_v3_RegexMatcher* regex_matcher = + envoy_type_matcher_v3_RegexMatchAndSubstitute_pattern( + regex_rewrite); + if (regex_matcher == nullptr) { + gpr_log( + GPR_DEBUG, + "RouteAction HashPolicy contains policy specifier Header with " + "RegexMatchAndSubstitution but RegexMatcher pattern is " + "missing"); + continue; + } + RE2::Options options; + policy.regex = absl::make_unique( + UpbStringToStdString( + envoy_type_matcher_v3_RegexMatcher_regex(regex_matcher)), + options); + if (!policy.regex->ok()) { + gpr_log( + GPR_DEBUG, + "RouteAction HashPolicy contains policy specifier Header with " + "RegexMatchAndSubstitution but RegexMatcher pattern does not " + "compile"); + continue; + } + policy.regex_substitution = UpbStringToStdString( + envoy_type_matcher_v3_RegexMatchAndSubstitute_substitution( + regex_rewrite)); } - policy.regex_substitution = UpbStringToStdString( - envoy_type_matcher_v3_RegexMatchAndSubstitute_substitution( - regex_rewrite)); } else if ((filter_state = envoy_config_route_v3_RouteAction_HashPolicy_filter_state( hash_policy)) != nullptr) { @@ -2815,75 +2810,61 @@ grpc_error_handle CdsResponseParse( // Record ring hash lb config auto* ring_hash_config = envoy_config_cluster_v3_Cluster_ring_hash_lb_config(cluster); - if (ring_hash_config == nullptr) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, - ": ring hash lb config required but not present.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } - const google_protobuf_UInt64Value* max_ring_size = - envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size( - ring_hash_config); - if (max_ring_size != nullptr) { - cds_update.max_ring_size = - google_protobuf_UInt64Value_value(max_ring_size); - if (cds_update.max_ring_size > 8388608 || - cds_update.max_ring_size == 0) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat( - cluster_name, - ": max_ring_size is not in the range of 1 to 8388608.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; + if (ring_hash_config != nullptr) { + const google_protobuf_UInt64Value* max_ring_size = + envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size( + ring_hash_config); + if (max_ring_size != nullptr) { + cds_update.max_ring_size = + google_protobuf_UInt64Value_value(max_ring_size); + if (cds_update.max_ring_size > 8388608 || + cds_update.max_ring_size == 0) { + errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat( + cluster_name, + ": max_ring_size is not in the range of 1 to 8388608.") + .c_str())); + resource_names_failed->insert(cluster_name); + continue; + } } - } - const google_protobuf_UInt64Value* min_ring_size = - envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size( - ring_hash_config); - if (min_ring_size != nullptr) { - cds_update.min_ring_size = - google_protobuf_UInt64Value_value(min_ring_size); - if (cds_update.min_ring_size > 8388608 || - cds_update.min_ring_size == 0) { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat( - cluster_name, - ": min_ring_size is not in the range of 1 to 8388608.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; + const google_protobuf_UInt64Value* min_ring_size = + envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size( + ring_hash_config); + if (min_ring_size != nullptr) { + cds_update.min_ring_size = + google_protobuf_UInt64Value_value(min_ring_size); + if (cds_update.min_ring_size > 8388608 || + cds_update.min_ring_size == 0) { + errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat( + cluster_name, + ": min_ring_size is not in the range of 1 to 8388608.") + .c_str())); + resource_names_failed->insert(cluster_name); + continue; + } + if (cds_update.min_ring_size > cds_update.max_ring_size) { + errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( + absl::StrCat( + cluster_name, + ": min_ring_size cannot be greater than max_ring_size.") + .c_str())); + resource_names_failed->insert(cluster_name); + continue; + } } - if (cds_update.min_ring_size > cds_update.max_ring_size) { + if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function( + ring_hash_config) != + envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) { errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat( - cluster_name, - ": min_ring_size cannot be greater than max_ring_size.") + absl::StrCat(cluster_name, + ": ring hash lb config has invalid hash function.") .c_str())); resource_names_failed->insert(cluster_name); continue; } } - if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function( - ring_hash_config) == - envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) { - cds_update.hash_function = XdsApi::CdsUpdate::HashFunction::XX_HASH; - } else if ( - envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function( - ring_hash_config) == - envoy_config_cluster_v3_Cluster_RingHashLbConfig_MURMUR_HASH_2) { - cds_update.hash_function = - XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2; - } else { - errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( - absl::StrCat(cluster_name, - ": ring hash lb config has invalid hash function.") - .c_str())); - resource_names_failed->insert(cluster_name); - continue; - } } else { errors.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat(cluster_name, ": LB policy is not supported.").c_str())); @@ -3014,13 +2995,28 @@ grpc_error_handle ServerAddressParseAndAppend( if (GPR_UNLIKELY(port >> 16) != 0) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Invalid port."); } + // Find load_balancing_weight for the endpoint. + const google_protobuf_UInt32Value* load_balancing_weight = + envoy_config_endpoint_v3_LbEndpoint_load_balancing_weight(lb_endpoint); + const int32_t weight = + load_balancing_weight != nullptr + ? google_protobuf_UInt32Value_value(load_balancing_weight) + : 500; + if (weight == 0) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Invalid endpoint weight of 0."); + } // Populate grpc_resolved_address. grpc_resolved_address addr; grpc_error_handle error = grpc_string_to_sockaddr(&addr, address_str.c_str(), port); if (error != GRPC_ERROR_NONE) return error; // Append the address to the list. - list->emplace_back(addr, nullptr); + std::map> + attributes; + attributes[ServerAddressWeightAttribute::kServerAddressWeightAttributeKey] = + absl::make_unique(weight); + list->emplace_back(addr, nullptr, std::move(attributes)); return GRPC_ERROR_NONE; } diff --git a/src/core/ext/xds/xds_api.h b/src/core/ext/xds/xds_api.h index e7bf1cd14e5..c1483491358 100644 --- a/src/core/ext/xds/xds_api.h +++ b/src/core/ext/xds/xds_api.h @@ -411,8 +411,6 @@ class XdsApi { // Used for RING_HASH LB policy only. uint64_t min_ring_size = 1024; uint64_t max_ring_size = 8388608; - enum HashFunction { XX_HASH, MURMUR_HASH_2 }; - HashFunction hash_function; // Maximum number of outstanding requests can be made to the upstream // cluster. uint32_t max_concurrent_requests = 1024; diff --git a/src/core/plugin_registry/grpc_plugin_registry.cc b/src/core/plugin_registry/grpc_plugin_registry.cc index d3def27e17a..271b0103d0f 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_plugin_registry.cc @@ -63,6 +63,8 @@ void grpc_workaround_cronet_compression_filter_shutdown(void); namespace grpc_core { void FaultInjectionFilterInit(void); void FaultInjectionFilterShutdown(void); +void GrpcLbPolicyRingHashInit(void); +void GrpcLbPolicyRingHashShutdown(void); } // namespace grpc_core #ifndef GRPC_NO_XDS @@ -115,6 +117,8 @@ void grpc_register_built_in_plugins(void) { grpc_lb_policy_pick_first_shutdown); grpc_register_plugin(grpc_lb_policy_round_robin_init, grpc_lb_policy_round_robin_shutdown); + grpc_register_plugin(grpc_core::GrpcLbPolicyRingHashInit, + grpc_core::GrpcLbPolicyRingHashShutdown); grpc_register_plugin(grpc_resolver_dns_ares_init, grpc_resolver_dns_ares_shutdown); grpc_register_plugin(grpc_resolver_dns_native_init, diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc index 5e7452936a1..b58ab32838e 100644 --- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc +++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.cc @@ -57,6 +57,8 @@ void grpc_message_size_filter_shutdown(void); namespace grpc_core { void FaultInjectionFilterInit(void); void FaultInjectionFilterShutdown(void); +void GrpcLbPolicyRingHashInit(void); +void GrpcLbPolicyRingHashShutdown(void); } // namespace grpc_core void grpc_service_config_channel_arg_filter_init(void); void grpc_service_config_channel_arg_filter_shutdown(void); @@ -94,6 +96,8 @@ void grpc_register_built_in_plugins(void) { grpc_lb_policy_pick_first_shutdown); grpc_register_plugin(grpc_lb_policy_round_robin_init, grpc_lb_policy_round_robin_shutdown); + grpc_register_plugin(grpc_core::GrpcLbPolicyRingHashInit, + grpc_core::GrpcLbPolicyRingHashShutdown); grpc_register_plugin(grpc_client_idle_filter_init, grpc_client_idle_filter_shutdown); grpc_register_plugin(grpc_max_age_filter_init, diff --git a/src/proto/grpc/testing/xds/v3/cluster.proto b/src/proto/grpc/testing/xds/v3/cluster.proto index c04fe20a919..2c26c9bd8aa 100644 --- a/src/proto/grpc/testing/xds/v3/cluster.proto +++ b/src/proto/grpc/testing/xds/v3/cluster.proto @@ -153,12 +153,50 @@ message Cluster { // Configuration to use for EDS updates for the Cluster. EdsClusterConfig eds_cluster_config = 3; + // Specific configuration for the :ref:`RingHash` + // load balancing policy. + message RingHashLbConfig { + // The hash function used to hash hosts onto the ketama ring. + enum HashFunction { + // Use `xxHash `_, this is the default hash function. + XX_HASH = 0; + MURMUR_HASH_2 = 1; + } + + reserved 2; + + // Minimum hash ring size. The larger the ring is (that is, the more hashes there are for each + // provided host) the better the request distribution will reflect the desired weights. Defaults + // to 1024 entries, and limited to 8M entries. See also + // :ref:`maximum_ring_size`. + google.protobuf.UInt64Value minimum_ring_size = 1; + + // The hash function used to hash hosts onto the ketama ring. The value defaults to + // :ref:`XX_HASH`. + HashFunction hash_function = 3; + + // Maximum hash ring size. Defaults to 8M entries, and limited to 8M entries, but can be lowered + // to further constrain resource use. See also + // :ref:`minimum_ring_size`. + google.protobuf.UInt64Value maximum_ring_size = 4; + } + // The :ref:`load balancer type ` to use // when picking a host in the cluster. LbPolicy lb_policy = 6; CircuitBreakers circuit_breakers = 10; + // Optional configuration for the load balancing algorithm selected by + // LbPolicy. Currently only + // :ref:`RING_HASH`, + // Specifying ring_hash_lb_config without setting the corresponding + // LbPolicy will generate an error at runtime. + oneof lb_config { + // Optional configuration for the Ring Hash load balancing policy. + RingHashLbConfig ring_hash_lb_config = 23; + } + // Optional custom transport socket implementation to use for upstream connections. // To setup TLS, set a transport socket with name `tls` and // :ref:`UpstreamTlsContexts ` in the `typed_config`. diff --git a/src/proto/grpc/testing/xds/v3/endpoint.proto b/src/proto/grpc/testing/xds/v3/endpoint.proto index 7cc1d40ca6e..7cbea7f443f 100644 --- a/src/proto/grpc/testing/xds/v3/endpoint.proto +++ b/src/proto/grpc/testing/xds/v3/endpoint.proto @@ -76,6 +76,17 @@ message LbEndpoint { // Optional health status when known and supplied by EDS server. HealthStatus health_status = 2; + + // The optional load balancing weight of the upstream host; at least 1. + // Envoy uses the load balancing weight in some of the built in load + // balancers. The load balancing weight for an endpoint is divided by the sum + // of the weights of all endpoints in the endpoint's locality to produce a + // percentage of traffic for the endpoint. This percentage is then further + // weighted by the endpoint's locality's load balancing weight from + // LocalityLbEndpoints. If unspecified, each host is presumed to have equal + // weight in a locality. The sum of the weights of all endpoints in the + // endpoint's locality must not exceed uint32_t maximal value (4294967295). + google.protobuf.UInt32Value load_balancing_weight = 4; } // A group of endpoints belonging to a Locality. diff --git a/src/proto/grpc/testing/xds/v3/regex.proto b/src/proto/grpc/testing/xds/v3/regex.proto index af9045774f8..9039ed46441 100644 --- a/src/proto/grpc/testing/xds/v3/regex.proto +++ b/src/proto/grpc/testing/xds/v3/regex.proto @@ -36,3 +36,8 @@ message RegexMatcher { // The regex match string. The string must be supported by the configured engine. string regex = 2; } + +message RegexMatchAndSubstitute { + RegexMatcher pattern = 1; + string substitution = 2; +} diff --git a/src/proto/grpc/testing/xds/v3/route.proto b/src/proto/grpc/testing/xds/v3/route.proto index baeaaf644d4..89260f6ff77 100644 --- a/src/proto/grpc/testing/xds/v3/route.proto +++ b/src/proto/grpc/testing/xds/v3/route.proto @@ -246,6 +246,78 @@ message RouteAction { // for additional documentation. WeightedCluster weighted_clusters = 3; } + + message HashPolicy { + message Header { + // The name of the request header that will be used to obtain the hash + // key. If the request header is not present, no hash will be produced. + string header_name = 1; + + // If specified, the request header value will be rewritten and used + // to produce the hash key. + type.matcher.v3.RegexMatchAndSubstitute regex_rewrite = 2; + } + + message Cookie { + string name = 1; + } + + message ConnectionProperties { + bool source_ip = 1; + } + + message QueryParameter { + string name = 1; + } + + message FilterState { + // The name of the Object in the per-request filterState, which is an + // Envoy::Http::Hashable object. If there is no data associated with the key, + // or the stored object is not Envoy::Http::Hashable, no hash will be produced. + string key = 1; + } + + oneof policy_specifier { + // Header hash policy. + Header header = 1; + + // Cookie hash policy. + Cookie cookie = 2; + + // Connection properties hash policy. + ConnectionProperties connection_properties = 3; + + // Query parameter hash policy. + QueryParameter query_parameter = 5; + + // Filter state hash policy. + FilterState filter_state = 6; + } + + // The flag that short-circuits the hash computing. This field provides a + // 'fallback' style of configuration: "if a terminal policy doesn't work, + // fallback to rest of the policy list", it saves time when the terminal + // policy works. + // + // If true, and there is already a hash computed, ignore rest of the + // list of hash polices. + // For example, if the following hash methods are configured: + // + // ========= ======== + // specifier terminal + // ========= ======== + // Header A true + // Header B false + // Header C false + // ========= ======== + // + // The generateHash process ends if policy "header A" generates a hash, as + // it's a terminal policy. + bool terminal = 4; + } + + repeated HashPolicy hash_policy = 15; + // Specifies the maximum stream duration for this route. MaxStreamDuration max_stream_duration = 36; } diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 2a80c54a9f9..025777830a2 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -32,6 +32,7 @@ #include "absl/functional/bind_front.h" #include "absl/memory/memory.h" #include "absl/strings/str_cat.h" +#include "absl/strings/str_format.h" #include "absl/strings/str_join.h" #include "absl/types/optional.h" @@ -2095,6 +2096,14 @@ class XdsEnd2endTest : public ::testing::TestWithParam { return addresses; } + std::string CreateMetadataValueThatHashesToBackendPort(int port) { + return absl::StrCat(ipv6_only_ ? "::1" : "127.0.0.1", ":", port, "_0"); + } + + std::string CreateMetadataValueThatHashesToBackend(int index) { + return CreateMetadataValueThatHashesToBackendPort(backends_[index]->port()); + } + void SetNextResolution( const std::vector& ports, grpc_core::FakeResolverResponseGenerator* response_generator = nullptr) { @@ -2236,6 +2245,13 @@ class XdsEnd2endTest : public ::testing::TestWithParam { return listener; } + AdsServiceImpl::EdsResourceArgs::Endpoint CreateEndpoint( + size_t backend_idx, HealthStatus health_status = HealthStatus::UNKNOWN, + int lb_weight = 1) { + return AdsServiceImpl::EdsResourceArgs::Endpoint( + backends_[backend_idx]->port(), health_status, lb_weight); + } + std::vector CreateEndpointsForBackends(size_t start_index = 0, size_t stop_index = 0, HealthStatus health_status = HealthStatus::UNKNOWN, @@ -2243,7 +2259,7 @@ class XdsEnd2endTest : public ::testing::TestWithParam { if (stop_index == 0) stop_index = backends_.size(); std::vector endpoints; for (size_t i = start_index; i < stop_index; ++i) { - endpoints.emplace_back(backends_[i]->port(), health_status, lb_weight); + endpoints.emplace_back(CreateEndpoint(i, health_status, lb_weight)); } return endpoints; } @@ -2272,6 +2288,11 @@ class XdsEnd2endTest : public ::testing::TestWithParam { locality.endpoints[i].health_status != HealthStatus::UNKNOWN) { lb_endpoints->set_health_status(locality.endpoints[i].health_status); } + if (locality.endpoints.size() > i && + locality.endpoints[i].lb_weight >= 1) { + lb_endpoints->mutable_load_balancing_weight()->set_value( + locality.endpoints[i].lb_weight); + } auto* endpoint = lb_endpoints->mutable_endpoint(); auto* address = endpoint->mutable_address(); auto* socket_address = address->mutable_socket_address(); @@ -2766,8 +2787,8 @@ TEST_P(BasicTest, IgnoresUnhealthyEndpoints) { } } -// Tests that subchannel sharing works when the same backend is listed multiple -// times. +// Tests that subchannel sharing works when the same backend is listed +// multiple times. TEST_P(BasicTest, SameBackendListedMultipleTimes) { SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); @@ -2852,8 +2873,8 @@ TEST_P(BasicTest, AllServersUnreachableFailFast) { EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code()); } -// Tests that RPCs fail when the backends are down, and will succeed again after -// the backends are restarted. +// Tests that RPCs fail when the backends are down, and will succeed again +// after the backends are restarted. TEST_P(BasicTest, BackendsRestart) { SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); @@ -2867,13 +2888,13 @@ TEST_P(BasicTest, BackendsRestart) { ShutdownAllBackends(); // Sending multiple failed requests instead of just one to ensure that the // client notices that all backends are down before we restart them. If we - // didn't do this, then a single RPC could fail here due to the race condition - // between the LB pick and the GOAWAY from the chosen backend being shut down, - // which would not actually prove that the client noticed that all of the - // backends are down. Then, when we send another request below (which we - // expect to succeed), if the callbacks happen in the wrong order, the same - // race condition could happen again due to the client not yet having noticed - // that the backends were all down. + // didn't do this, then a single RPC could fail here due to the race + // condition between the LB pick and the GOAWAY from the chosen backend + // being shut down, which would not actually prove that the client noticed + // that all of the backends are down. Then, when we send another request + // below (which we expect to succeed), if the callbacks happen in the wrong + // order, the same race condition could happen again due to the client not + // yet having noticed that the backends were all down. CheckRpcSendFailure(num_backends_); // Restart all backends. RPCs should start succeeding again. StartAllBackends(); @@ -3501,7 +3522,8 @@ TEST_P(LdsTest, RdsMissingConfigSource) { } // Tests that LDS client should send a NACK if the rds message in the -// http_connection_manager has a config_source field that does not specify ADS. +// http_connection_manager has a config_source field that does not specify +// ADS. TEST_P(LdsTest, RdsConfigSourceDoesNotSpecifyAds) { auto listener = default_listener_; HttpConnectionManager http_connection_manager; @@ -3519,10 +3541,9 @@ TEST_P(LdsTest, RdsConfigSourceDoesNotSpecifyAds) { const auto response_state = balancers_[0]->ads_service()->lds_response_state(); EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); - EXPECT_THAT( - response_state.error_message, - ::testing::HasSubstr( - "HttpConnectionManager ConfigSource for RDS does not specify ADS.")); + EXPECT_THAT(response_state.error_message, + ::testing::HasSubstr("HttpConnectionManager ConfigSource for " + "RDS does not specify ADS.")); } // Tests that we ignore filters after the router filter. @@ -3893,8 +3914,8 @@ TEST_P(LdsRdsTest, NoMatchedDomain) { EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::ACKED); } -// Tests that LDS client should choose the virtual host with matching domain if -// multiple virtual hosts exist in the LDS response. +// Tests that LDS client should choose the virtual host with matching domain +// if multiple virtual hosts exist in the LDS response. TEST_P(LdsRdsTest, ChooseMatchedDomain) { RouteConfiguration route_config = default_route_config_; *(route_config.add_virtual_hosts()) = route_config.virtual_hosts(0); @@ -4233,10 +4254,9 @@ TEST_P(LdsRdsTest, RouteActionWeightedTargetClusterHasEmptyClusterName) { CheckRpcSendFailure(); const auto response_state = RouteConfigurationResponseState(0); EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); - EXPECT_THAT( - response_state.error_message, - ::testing::HasSubstr( - "RouteAction weighted_cluster cluster contains empty cluster name.")); + EXPECT_THAT(response_state.error_message, + ::testing::HasSubstr("RouteAction weighted_cluster cluster " + "contains empty cluster name.")); } TEST_P(LdsRdsTest, RouteActionWeightedTargetClusterHasNoWeight) { @@ -4992,8 +5012,8 @@ TEST_P(LdsRdsTest, XdsRoutingWeightedClusterUpdateWeights) { // Change Route Configurations: same clusters different weights. weighted_cluster1->mutable_weight()->set_value(kWeight50); weighted_cluster2->mutable_weight()->set_value(kWeight50); - // Change default route to a new cluster to help to identify when new polices - // are seen by the client. + // Change default route to a new cluster to help to identify when new + // polices are seen by the client. default_route->mutable_route()->set_cluster(kNewCluster3Name); SetRouteConfiguration(0, new_route_config); ResetBackendCounters(); @@ -5258,8 +5278,8 @@ TEST_P(LdsRdsTest, XdsRoutingClusterUpdateClustersWithPickingDelays) { new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); default_route->mutable_route()->set_cluster(kNewClusterName); SetRouteConfiguration(0, new_route_config); - // Wait for RPCs to go to the new backend: 1, this ensures that the client has - // processed the update. + // Wait for RPCs to go to the new backend: 1, this ensures that the client + // has processed the update. WaitForBackend( 1, WaitForBackendOptions().set_reset_counters(false).set_allow_failures( true)); @@ -6648,8 +6668,8 @@ TEST_P(CdsTest, AggregateClusterTypeDisabled) { ::testing::HasSubstr("DiscoveryType is not valid.")); } -// Tests that CDS client should send a NACK if the cluster type in CDS response -// is unsupported. +// Tests that CDS client should send a NACK if the cluster type in CDS +// response is unsupported. TEST_P(CdsTest, UnsupportedClusterType) { auto cluster = default_cluster_; cluster.set_type(Cluster::STATIC); @@ -6695,8 +6715,8 @@ TEST_P(CdsTest, MultipleBadResources) { kClusterName2, ": DiscoveryType is not valid.")))); } -// Tests that CDS client should send a NACK if the eds_config in CDS response is -// other than ADS. +// Tests that CDS client should send a NACK if the eds_config in CDS response +// is other than ADS. TEST_P(CdsTest, WrongEdsConfig) { auto cluster = default_cluster_; cluster.mutable_eds_cluster_config()->mutable_eds_config()->mutable_self(); @@ -6711,8 +6731,8 @@ TEST_P(CdsTest, WrongEdsConfig) { ::testing::HasSubstr("EDS ConfigSource is not ADS.")); } -// Tests that CDS client should send a NACK if the lb_policy in CDS response is -// other than ROUND_ROBIN. +// Tests that CDS client should send a NACK if the lb_policy in CDS response +// is other than ROUND_ROBIN. TEST_P(CdsTest, WrongLbPolicy) { auto cluster = default_cluster_; cluster.set_lb_policy(Cluster::LEAST_REQUEST); @@ -6727,8 +6747,8 @@ TEST_P(CdsTest, WrongLbPolicy) { ::testing::HasSubstr("LB policy is not supported.")); } -// Tests that CDS client should send a NACK if the lrs_server in CDS response is -// other than SELF. +// Tests that CDS client should send a NACK if the lrs_server in CDS response +// is other than SELF. TEST_P(CdsTest, WrongLrsServer) { auto cluster = default_cluster_; cluster.mutable_lrs_server()->mutable_ads(); @@ -6743,6 +6763,801 @@ TEST_P(CdsTest, WrongLrsServer) { ::testing::HasSubstr("LRS ConfigSource is not self.")); } +// Tests that ring hash policy that hashes using channel id ensures all RPCs +// to go 1 particular backend. +TEST_P(CdsTest, RingHashChannelIdHashing) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendOk(100); + bool found = false; + for (size_t i = 0; i < backends_.size(); ++i) { + if (backends_[i]->backend_service()->request_count() > 0) { + EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100) + << "backend " << i; + EXPECT_FALSE(found) << "backend " << i; + found = true; + } + } + EXPECT_TRUE(found); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Tests that ring hash policy that hashes using a header value can spread +// RPCs across all the backends. +TEST_P(CdsTest, RingHashHeaderHashing) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("address_hash"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + // Note each type of RPC will contains a header value that will always be + // hashed to a specific backend as the header value matches the value used + // to create the entry in the ring. + std::vector> metadata = { + {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; + std::vector> metadata1 = { + {"address_hash", CreateMetadataValueThatHashesToBackend(1)}}; + std::vector> metadata2 = { + {"address_hash", CreateMetadataValueThatHashesToBackend(2)}}; + std::vector> metadata3 = { + {"address_hash", CreateMetadataValueThatHashesToBackend(3)}}; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1)); + const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2)); + const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3)); + WaitForBackend(0, WaitForBackendOptions(), rpc_options); + WaitForBackend(1, WaitForBackendOptions(), rpc_options1); + WaitForBackend(2, WaitForBackendOptions(), rpc_options2); + WaitForBackend(3, WaitForBackendOptions(), rpc_options3); + CheckRpcSendOk(100, rpc_options); + CheckRpcSendOk(100, rpc_options1); + CheckRpcSendOk(100, rpc_options2); + CheckRpcSendOk(100, rpc_options3); + for (size_t i = 0; i < backends_.size(); ++i) { + EXPECT_EQ(100, backends_[i]->backend_service()->request_count()); + } + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Tests that ring hash policy that hashes using a header value and regex +// rewrite to aggregate RPCs to 1 backend. +TEST_P(CdsTest, RingHashHeaderHashingWithRegexRewrite) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("address_hash"); + hash_policy->mutable_header() + ->mutable_regex_rewrite() + ->mutable_pattern() + ->set_regex("[0-9]+"); + hash_policy->mutable_header()->mutable_regex_rewrite()->set_substitution( + "foo"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + std::vector> metadata = { + {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; + std::vector> metadata1 = { + {"address_hash", CreateMetadataValueThatHashesToBackend(1)}}; + std::vector> metadata2 = { + {"address_hash", CreateMetadataValueThatHashesToBackend(2)}}; + std::vector> metadata3 = { + {"address_hash", CreateMetadataValueThatHashesToBackend(3)}}; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + const auto rpc_options1 = RpcOptions().set_metadata(std::move(metadata1)); + const auto rpc_options2 = RpcOptions().set_metadata(std::move(metadata2)); + const auto rpc_options3 = RpcOptions().set_metadata(std::move(metadata3)); + CheckRpcSendOk(100, rpc_options); + CheckRpcSendOk(100, rpc_options1); + CheckRpcSendOk(100, rpc_options2); + CheckRpcSendOk(100, rpc_options3); + bool found = false; + for (size_t i = 0; i < backends_.size(); ++i) { + if (backends_[i]->backend_service()->request_count() > 0) { + EXPECT_EQ(backends_[i]->backend_service()->request_count(), 400) + << "backend " << i; + EXPECT_FALSE(found) << "backend " << i; + found = true; + } + } + EXPECT_TRUE(found); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Tests that ring hash policy that hashes using a random value. +TEST_P(CdsTest, RingHashNoHashPolicy) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + const double kDistribution50Percent = 0.5; + const double kErrorTolerance = 0.05; + const uint32_t kRpcTimeoutMs = 10000; + const size_t kNumRpcs = + ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance); + auto cluster = default_cluster_; + // Increasing min ring size for random distribution. + cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( + 100000); + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + AdsServiceImpl::EdsResourceArgs args( + {{"locality0", CreateEndpointsForBackends(0, 2)}}); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + // TODO(donnadionne): remove extended timeout after ring creation + // optimization. + WaitForAllBackends(0, 2, WaitForBackendOptions(), + RpcOptions().set_timeout_ms(kRpcTimeoutMs)); + CheckRpcSendOk(kNumRpcs); + const int request_count_1 = backends_[0]->backend_service()->request_count(); + const int request_count_2 = backends_[1]->backend_service()->request_count(); + EXPECT_THAT(static_cast(request_count_1) / kNumRpcs, + ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); + EXPECT_THAT(static_cast(request_count_2) / kNumRpcs, + ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Test that ring hash policy evaluation will continue past the terminal +// policy if no results are produced yet. +TEST_P(CdsTest, RingHashContinuesPastTerminalPolicyThatDoesNotProduceResult) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("header_not_present"); + hash_policy->set_terminal(true); + auto* hash_policy2 = route->mutable_route()->add_hash_policy(); + hash_policy2->mutable_header()->set_header_name("address_hash"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + AdsServiceImpl::EdsResourceArgs args( + {{"locality0", CreateEndpointsForBackends(0, 2)}}); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + std::vector> metadata = { + {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + CheckRpcSendOk(100, rpc_options); + EXPECT_EQ(backends_[0]->backend_service()->request_count(), 100); + EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Test random hash is used when header hashing specified a header field that +// the RPC did not have. +TEST_P(CdsTest, RingHashOnHeaderThatIsNotPresent) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + const double kDistribution50Percent = 0.5; + const double kErrorTolerance = 0.05; + const uint32_t kRpcTimeoutMs = 10000; + const size_t kNumRpcs = + ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance); + auto cluster = default_cluster_; + // Increasing min ring size for random distribution. + cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( + 100000); + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("header_not_present"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + AdsServiceImpl::EdsResourceArgs args( + {{"locality0", CreateEndpointsForBackends(0, 2)}}); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + std::vector> metadata = { + {"unmatched_header", absl::StrFormat("%" PRIu32, rand())}, + }; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + // TODO(donnadionne): remove extended timeout after ring creation + // optimization. + WaitForAllBackends(0, 2, WaitForBackendOptions(), + RpcOptions().set_timeout_ms(kRpcTimeoutMs)); + CheckRpcSendOk(kNumRpcs, rpc_options); + const int request_count_1 = backends_[0]->backend_service()->request_count(); + const int request_count_2 = backends_[1]->backend_service()->request_count(); + EXPECT_THAT(static_cast(request_count_1) / kNumRpcs, + ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); + EXPECT_THAT(static_cast(request_count_2) / kNumRpcs, + ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Test random hash is used when only unsupported hash policies are +// configured. +TEST_P(CdsTest, RingHashUnsupportedHashPolicyDefaultToRandomHashing) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + const double kDistribution50Percent = 0.5; + const double kErrorTolerance = 0.05; + const uint32_t kRpcTimeoutMs = 10000; + const size_t kNumRpcs = + ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance); + auto cluster = default_cluster_; + // Increasing min ring size for random distribution. + cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( + 100000); + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy(); + hash_policy_unsupported_1->mutable_cookie()->set_name("cookie"); + auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy(); + hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip( + true); + auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy(); + hash_policy_unsupported_3->mutable_query_parameter()->set_name( + "query_parameter"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + AdsServiceImpl::EdsResourceArgs args( + {{"locality0", CreateEndpointsForBackends(0, 2)}}); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + // TODO(donnadionne): remove extended timeout after ring creation + // optimization. + WaitForAllBackends(0, 2, WaitForBackendOptions(), + RpcOptions().set_timeout_ms(kRpcTimeoutMs)); + CheckRpcSendOk(kNumRpcs); + const int request_count_1 = backends_[0]->backend_service()->request_count(); + const int request_count_2 = backends_[1]->backend_service()->request_count(); + EXPECT_THAT(static_cast(request_count_1) / kNumRpcs, + ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); + EXPECT_THAT(static_cast(request_count_2) / kNumRpcs, + ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance)); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Tests that ring hash policy that hashes using a random value can spread +// RPCs across all the backends according to locality weight. +TEST_P(CdsTest, RingHashRandomHashingDistributionAccordingToEndpointWeight) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + const size_t kWeight1 = 1; + const size_t kWeight2 = 2; + const size_t kWeightTotal = kWeight1 + kWeight2; + const double kWeight33Percent = static_cast(kWeight1) / kWeightTotal; + const double kWeight66Percent = static_cast(kWeight2) / kWeightTotal; + const double kErrorTolerance = 0.05; + const uint32_t kRpcTimeoutMs = 10000; + const size_t kNumRpcs = + ComputeIdealNumRpcs(kWeight33Percent, kErrorTolerance); + auto cluster = default_cluster_; + // Increasing min ring size for random distribution. + cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( + 100000); + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + AdsServiceImpl::EdsResourceArgs args( + {{"locality0", + {CreateEndpoint(0, HealthStatus::UNKNOWN, 1), + CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}}}); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + // TODO(donnadionne): remove extended timeout after ring creation + // optimization. + WaitForAllBackends(0, 2, WaitForBackendOptions(), + RpcOptions().set_timeout_ms(kRpcTimeoutMs)); + CheckRpcSendOk(kNumRpcs); + const int weight_33_request_count = + backends_[0]->backend_service()->request_count(); + const int weight_66_request_count = + backends_[1]->backend_service()->request_count(); + EXPECT_THAT(static_cast(weight_33_request_count) / kNumRpcs, + ::testing::DoubleNear(kWeight33Percent, kErrorTolerance)); + EXPECT_THAT(static_cast(weight_66_request_count) / kNumRpcs, + ::testing::DoubleNear(kWeight66Percent, kErrorTolerance)); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Tests that ring hash policy that hashes using a random value can spread +// RPCs across all the backends according to locality weight. +TEST_P(CdsTest, + RingHashRandomHashingDistributionAccordingToLocalityAndEndpointWeight) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + const size_t kWeight1 = 1 * 1; + const size_t kWeight2 = 2 * 2; + const size_t kWeightTotal = kWeight1 + kWeight2; + const double kWeight20Percent = static_cast(kWeight1) / kWeightTotal; + const double kWeight80Percent = static_cast(kWeight2) / kWeightTotal; + const double kErrorTolerance = 0.05; + const uint32_t kRpcTimeoutMs = 10000; + const size_t kNumRpcs = + ComputeIdealNumRpcs(kWeight20Percent, kErrorTolerance); + auto cluster = default_cluster_; + // Increasing min ring size for random distribution. + cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( + 100000); + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + AdsServiceImpl::EdsResourceArgs args( + {{"locality0", {CreateEndpoint(0, HealthStatus::UNKNOWN, 1)}, 1}, + {"locality1", {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}, 2}}); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + // TODO(donnadionne): remove extended timeout after ring creation + // optimization. + WaitForAllBackends(0, 2, WaitForBackendOptions(), + RpcOptions().set_timeout_ms(kRpcTimeoutMs)); + CheckRpcSendOk(kNumRpcs); + const int weight_20_request_count = + backends_[0]->backend_service()->request_count(); + const int weight_80_request_count = + backends_[1]->backend_service()->request_count(); + EXPECT_THAT(static_cast(weight_20_request_count) / kNumRpcs, + ::testing::DoubleNear(kWeight20Percent, kErrorTolerance)); + EXPECT_THAT(static_cast(weight_80_request_count) / kNumRpcs, + ::testing::DoubleNear(kWeight80Percent, kErrorTolerance)); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Tests round robin is not implacted by the endpoint weight, and that the +// localities in a locality map are picked according to their weights. +TEST_P(CdsTest, RingHashEndpointWeightDoesNotImpactWeightedRoundRobin) { + SetNextResolution({}); + SetNextResolutionForLbChannelAllBalancers(); + const int kLocalityWeight0 = 2; + const int kLocalityWeight1 = 8; + const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1; + const double kLocalityWeightRate0 = + static_cast(kLocalityWeight0) / kTotalLocalityWeight; + const double kLocalityWeightRate1 = + static_cast(kLocalityWeight1) / kTotalLocalityWeight; + const double kErrorTolerance = 0.05; + const size_t kNumRpcs = + ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance); + // ADS response contains 2 localities, each of which contains 1 backend. + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", + {CreateEndpoint(0, HealthStatus::UNKNOWN, 8)}, + kLocalityWeight0}, + {"locality1", + {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}, + kLocalityWeight1}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + // Wait for both backends to be ready. + WaitForAllBackends(0, 2); + // Send kNumRpcs RPCs. + CheckRpcSendOk(kNumRpcs); + // The locality picking rates should be roughly equal to the expectation. + const double locality_picked_rate_0 = + static_cast(backends_[0]->backend_service()->request_count()) / + kNumRpcs; + const double locality_picked_rate_1 = + static_cast(backends_[1]->backend_service()->request_count()) / + kNumRpcs; + EXPECT_THAT(locality_picked_rate_0, + ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance)); + EXPECT_THAT(locality_picked_rate_1, + ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance)); +} + +// Tests that ring hash policy that hashes using a fixed string ensures all +// RPCs to go 1 particular backend; and that subsequent hashing policies are +// ignored due to the setting of terminal. +TEST_P(CdsTest, RingHashFixedHashingTerminalPolicy) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("fixed_string"); + hash_policy->set_terminal(true); + auto* hash_policy_to_be_ignored = route->mutable_route()->add_hash_policy(); + hash_policy_to_be_ignored->mutable_header()->set_header_name("random_string"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + std::vector> metadata = { + {"fixed_string", "fixed_value"}, + {"random_string", absl::StrFormat("%" PRIu32, rand())}, + }; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + CheckRpcSendOk(100, rpc_options); + bool found = false; + for (size_t i = 0; i < backends_.size(); ++i) { + if (backends_[i]->backend_service()->request_count() > 0) { + EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100) + << "backend " << i; + EXPECT_FALSE(found) << "backend " << i; + found = true; + } + } + EXPECT_TRUE(found); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Test that the channel will go from idle to ready via connecting; +// (tho it is not possible to catch the connecting state before moving to +// ready) +TEST_P(CdsTest, RingHashIdleToReady) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false)); + CheckRpcSendOk(); + EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false)); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Test that when the first pick is down leading to a transient failure, we +// will move on to the next ring hash entry. +TEST_P(CdsTest, RingHashTransientFailureCheckNextOne) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("address_hash"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + std::vector endpoints; + const int unused_port = grpc_pick_unused_port_or_die(); + endpoints.emplace_back(unused_port); + endpoints.emplace_back(backends_[1]->port()); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", std::move(endpoints)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + std::vector> metadata = { + {"address_hash", + CreateMetadataValueThatHashesToBackendPort(unused_port)}}; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + WaitForBackend(1, WaitForBackendOptions(), rpc_options); + CheckRpcSendOk(100, rpc_options); + EXPECT_EQ(0, backends_[0]->backend_service()->request_count()); + EXPECT_EQ(100, backends_[1]->backend_service()->request_count()); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Test that when a backend goes down, we will move on to the next subchannel +// (with a lower priority). When the backend comes back up, traffic will move +// back. +TEST_P(CdsTest, RingHashSwitchToLowerPrioirtyAndThenBack) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("address_hash"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight, + 0}, + {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight, + 1}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + std::vector> metadata = { + {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + WaitForBackend(0, WaitForBackendOptions(), rpc_options); + ShutdownBackend(0); + WaitForBackend(1, WaitForBackendOptions().set_allow_failures(true), + rpc_options); + StartBackend(0); + WaitForBackend(0, WaitForBackendOptions(), rpc_options); + CheckRpcSendOk(100, rpc_options); + EXPECT_EQ(100, backends_[0]->backend_service()->request_count()); + EXPECT_EQ(0, backends_[1]->backend_service()->request_count()); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Test that when all backends are down, we will keep reattempting. +TEST_P(CdsTest, RingHashAllFailReattempt) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + const uint32_t kConnectionTimeoutMilliseconds = 5000; + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("address_hash"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + std::vector endpoints; + endpoints.emplace_back(grpc_pick_unused_port_or_die()); + endpoints.emplace_back(backends_[1]->port()); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", std::move(endpoints)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + std::vector> metadata = { + {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false)); + ShutdownBackend(1); + CheckRpcSendFailure(1, rpc_options); + StartBackend(1); + // Ensure we are actively connecting without any traffic. + EXPECT_TRUE(channel_->WaitForConnected( + grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds))); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Test that when all backends are down and then up, we may pick a TF backend +// and we will then jump to ready backend. +TEST_P(CdsTest, RingHashTransientFailureSkipToAvailableReady) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + const uint32_t kConnectionTimeoutMilliseconds = 5000; + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_header()->set_header_name("address_hash"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + std::vector endpoints; + // Make sure we include some unused ports to fill the ring. + endpoints.emplace_back(backends_[0]->port()); + endpoints.emplace_back(backends_[1]->port()); + endpoints.emplace_back(grpc_pick_unused_port_or_die()); + endpoints.emplace_back(grpc_pick_unused_port_or_die()); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", std::move(endpoints)}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + std::vector> metadata = { + {"address_hash", CreateMetadataValueThatHashesToBackend(0)}}; + const auto rpc_options = RpcOptions().set_metadata(std::move(metadata)); + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false)); + ShutdownBackend(0); + ShutdownBackend(1); + CheckRpcSendFailure(1, rpc_options); + EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false)); + // Bring up 0, should be picked as the RPC is hashed to it. + StartBackend(0); + EXPECT_TRUE(channel_->WaitForConnected( + grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds))); + WaitForBackend(0, WaitForBackendOptions(), rpc_options); + // Bring down 0 and bring up 1. + // Note the RPC contains a header value that will always be hashed to + // backend 0. So by purposely bring down backend 0 and bring up another + // backend, this will ensure Picker's first choice of backend 0 will fail + // and it will + // 1. reattempt backend 0 and + // 2. go through the remaining subchannels to find one in READY. + // Since the the entries in the ring is pretty distributed and we have + // unused ports to fill the ring, it is almost guaranteed that the Picker + // will go through some non-READY entries and skip them as per design. + ShutdownBackend(0); + CheckRpcSendFailure(1, rpc_options); + StartBackend(1); + EXPECT_TRUE(channel_->WaitForConnected( + grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds))); + WaitForBackend(1, WaitForBackendOptions(), rpc_options); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Test unspported hash policy types are all ignored before a supported +// policy. +TEST_P(CdsTest, RingHashUnsupportedHashPolicyUntilChannelIdHashing) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy(); + hash_policy_unsupported_1->mutable_cookie()->set_name("cookie"); + auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy(); + hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip( + true); + auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy(); + hash_policy_unsupported_3->mutable_query_parameter()->set_name( + "query_parameter"); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendOk(100); + bool found = false; + for (size_t i = 0; i < backends_.size(); ++i) { + if (backends_[i]->backend_service()->request_count() > 0) { + EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100) + << "backend " << i; + EXPECT_FALSE(found) << "backend " << i; + found = true; + } + } + EXPECT_TRUE(found); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Test we nack when ring hash policy has invalid hash function (something +// other than XX_HASH. +TEST_P(CdsTest, RingHashPolicyHasInvalidHashFunction) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + cluster.mutable_ring_hash_lb_config()->set_hash_function( + Cluster::RingHashLbConfig::MURMUR_HASH_2); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendFailure(); + const auto response_state = + balancers_[0]->ads_service()->cds_response_state(); + EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); + EXPECT_THAT( + response_state.error_message, + ::testing::HasSubstr("ring hash lb config has invalid hash function.")); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Test we nack when ring hash policy has invalid ring size. +TEST_P(CdsTest, RingHashPolicyHasInvalidMinimumRingSize) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( + 0); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendFailure(); + const auto response_state = + balancers_[0]->ads_service()->cds_response_state(); + EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); + EXPECT_THAT(response_state.error_message, + ::testing::HasSubstr( + "min_ring_size is not in the range of 1 to 8388608.")); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Test we nack when ring hash policy has invalid ring size. +TEST_P(CdsTest, RingHashPolicyHasInvalidMaxmumRingSize) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value( + 8388609); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendFailure(); + const auto response_state = + balancers_[0]->ads_service()->cds_response_state(); + EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); + EXPECT_THAT(response_state.error_message, + ::testing::HasSubstr( + "max_ring_size is not in the range of 1 to 8388608.")); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + +// Test we nack when ring hash policy has invalid ring size. +TEST_P(CdsTest, RingHashPolicyHasInvalidRingSizeMinGreaterThanMax) { + gpr_setenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", "true"); + auto cluster = default_cluster_; + cluster.set_lb_policy(Cluster::RING_HASH); + cluster.mutable_ring_hash_lb_config()->mutable_maximum_ring_size()->set_value( + 5000); + cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value( + 5001); + balancers_[0]->ads_service()->SetCdsResource(cluster); + auto new_route_config = default_route_config_; + auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0); + auto* hash_policy = route->mutable_route()->add_hash_policy(); + hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id"); + SetListenerAndRouteConfiguration(0, default_listener_, new_route_config); + AdsServiceImpl::EdsResourceArgs args({ + {"locality0", CreateEndpointsForBackends()}, + }); + balancers_[0]->ads_service()->SetEdsResource( + BuildEdsResource(args, DefaultEdsServiceName())); + SetNextResolutionForLbChannelAllBalancers(); + CheckRpcSendFailure(); + const auto response_state = + balancers_[0]->ads_service()->cds_response_state(); + EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::NACKED); + EXPECT_THAT(response_state.error_message, + ::testing::HasSubstr( + "min_ring_size cannot be greater than max_ring_size.")); + gpr_unsetenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"); +} + class XdsSecurityTest : public BasicTest { protected: static void SetUpTestCase() { @@ -6760,8 +7575,8 @@ class XdsSecurityTest : public BasicTest { root_cert_ = ReadFile(kCaCertPath); bad_root_cert_ = ReadFile(kBadClientCertPath); identity_pair_ = ReadTlsIdentityPair(kClientKeyPath, kClientCertPath); - // TODO(yashykt): Use different client certs here instead of reusing server - // certs after https://github.com/grpc/grpc/pull/24876 is merged + // TODO(yashykt): Use different client certs here instead of reusing + // server certs after https://github.com/grpc/grpc/pull/24876 is merged fallback_identity_pair_ = ReadTlsIdentityPair(kServerKeyPath, kServerCertPath); bad_identity_pair_ = @@ -7562,7 +8377,8 @@ TEST_P(XdsEnabledServerTest, EXPECT_EQ(response_state.state, AdsServiceImpl::ResponseState::ACKED); } -// Verify that a mismatch of listening address results in "not serving" status. +// Verify that a mismatch of listening address results in "not serving" +// status. TEST_P(XdsEnabledServerTest, ListenerAddressMismatch) { Listener listener; listener.set_name( @@ -8421,8 +9237,8 @@ TEST_P(XdsServerFilterChainMatchTest, HttpConnectionManager()); filter_chain->mutable_filter_chain_match()->add_application_protocols("h2"); balancers_[0]->ads_service()->SetLdsResource(listener); - // A successful RPC proves that filter chains that mention "raw_buffer" as the - // transport protocol are chosen as the best match in the round. + // A successful RPC proves that filter chains that mention "raw_buffer" as + // the transport protocol are chosen as the best match in the round. SendRpc([this]() { return CreateInsecureChannel(); }, {}, {}); } @@ -8449,8 +9265,8 @@ TEST_P(XdsServerFilterChainMatchTest, prefix_range->set_address_prefix(ipv6_only_ ? "::1" : "127.0.0.1"); prefix_range->mutable_prefix_len()->set_value(16); filter_chain->mutable_filter_chain_match()->add_server_names("server_name"); - // Add filter chain with two prefix ranges (length 8 and 24). Since 24 is the - // highest match, it should be chosen. + // Add filter chain with two prefix ranges (length 8 and 24). Since 24 is + // the highest match, it should be chosen. filter_chain = listener.add_filter_chains(); filter_chain->add_filters()->mutable_typed_config()->PackFrom( HttpConnectionManager()); @@ -8462,7 +9278,8 @@ TEST_P(XdsServerFilterChainMatchTest, filter_chain->mutable_filter_chain_match()->add_prefix_ranges(); prefix_range->set_address_prefix(ipv6_only_ ? "::1" : "127.0.0.1"); prefix_range->mutable_prefix_len()->set_value(24); - // Add another filter chain with a non-matching prefix range (with length 30) + // Add another filter chain with a non-matching prefix range (with length + // 30) filter_chain = listener.add_filter_chains(); filter_chain->add_filters()->mutable_typed_config()->PackFrom( HttpConnectionManager()); @@ -8528,10 +9345,10 @@ TEST_P(XdsServerFilterChainMatchTest, auto* socket_address = listener.mutable_address()->mutable_socket_address(); socket_address->set_address(ipv6_only_ ? "::1" : "127.0.0.1"); socket_address->set_port_value(backends_[0]->port()); - // Add filter chain with source prefix range (length 16) but with a bad source - // port mentioned. (Prefix range is matched first.) - // Note that backends_[0]->port() will never be a match for the source port - // because it is already being used by a backend. + // Add filter chain with source prefix range (length 16) but with a bad + // source port mentioned. (Prefix range is matched first.) Note that + // backends_[0]->port() will never be a match for the source port because it + // is already being used by a backend. auto* filter_chain = listener.add_filter_chains(); filter_chain->add_filters()->mutable_typed_config()->PackFrom( HttpConnectionManager()); @@ -8545,8 +9362,8 @@ TEST_P(XdsServerFilterChainMatchTest, source_prefix_range->mutable_prefix_len()->set_value(16); filter_chain->mutable_filter_chain_match()->add_source_ports( backends_[0]->port()); - // Add filter chain with two source prefix ranges (length 8 and 24). Since 24 - // is the highest match, it should be chosen. + // Add filter chain with two source prefix ranges (length 8 and 24). Since + // 24 is the highest match, it should be chosen. filter_chain = listener.add_filter_chains(); filter_chain->add_filters()->mutable_typed_config()->PackFrom( HttpConnectionManager()); @@ -8716,8 +9533,8 @@ TEST_P(XdsServerFilterChainMatchTest, DuplicateMatchOnTransportProtocolNacked) { HttpConnectionManager()); filter_chain->mutable_filter_chain_match()->set_transport_protocol( "raw_buffer"); - // Add a duplicate filter chain with the same "raw_buffer" transport protocol - // entry + // Add a duplicate filter chain with the same "raw_buffer" transport + // protocol entry filter_chain = listener.add_filter_chains(); filter_chain->add_filters()->mutable_typed_config()->PackFrom( HttpConnectionManager()); @@ -9083,8 +9900,8 @@ TEST_P(LocalityMapTest, StressTest) { delayed_resource_setter.join(); } -// Tests that the localities in a locality map are picked correctly after update -// (addition, modification, deletion). +// Tests that the localities in a locality map are picked correctly after +// update (addition, modification, deletion). TEST_P(LocalityMapTest, UpdateMap) { SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); @@ -9148,8 +9965,8 @@ TEST_P(LocalityMapTest, UpdateMap) { BuildEdsResource(args, DefaultEdsServiceName())); // Backend 3 hasn't received any request. EXPECT_EQ(0U, backends_[3]->backend_service()->request_count()); - // Wait until the locality update has been processed, as signaled by backend 3 - // receiving a request. + // Wait until the locality update has been processed, as signaled by backend + // 3 receiving a request. WaitForAllBackends(3, 4); gpr_log(GPR_INFO, "========= BEFORE SECOND BATCH =========="); // Send kNumRpcs RPCs. @@ -9268,8 +10085,8 @@ TEST_P(FailoverTest, DoesNotUseLocalityWithNoEndpoints) { EXPECT_EQ(0, std::get<1>(counts)); } -// If the higher priority localities are not reachable, failover to the highest -// priority among the rest. +// If the higher priority localities are not reachable, failover to the +// highest priority among the rest. TEST_P(FailoverTest, Failover) { SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); @@ -9372,8 +10189,8 @@ TEST_P(FailoverTest, UpdateInitialUnavailable) { delayed_resource_setter.join(); } -// Tests that after the localities' priorities are updated, we still choose the -// highest READY priority with the updated localities. +// Tests that after the localities' priorities are updated, we still choose +// the highest READY priority with the updated localities. TEST_P(FailoverTest, UpdatePriority) { SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); @@ -9624,8 +10441,8 @@ TEST_P(DropTest, Update) { {kThrottleDropType, kDropPerMillionForThrottle}}; balancers_[0]->ads_service()->SetEdsResource( BuildEdsResource(args, DefaultEdsServiceName())); - // Wait until the drop rate increases to the middle of the two configs, which - // implies that the update has been in effect. + // Wait until the drop rate increases to the middle of the two configs, + // which implies that the update has been in effect. const double kDropRateThreshold = (kDropRateForLb + kDropRateForLbAndThrottle) / 2; size_t num_rpcs = kNumRpcsBoth; @@ -9693,8 +10510,8 @@ class BalancerUpdateTest : public XdsEnd2endTest { BalancerUpdateTest() : XdsEnd2endTest(4, 3) {} }; -// Tests that the old LB call is still used after the balancer address update as -// long as that call is still alive. +// Tests that the old LB call is still used after the balancer address update +// as long as that call is still alive. TEST_P(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) { SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); @@ -9752,10 +10569,10 @@ TEST_P(BalancerUpdateTest, UpdateBalancersButKeepUsingOriginalBalancer) { } // Tests that the old LB call is still used after multiple balancer address -// updates as long as that call is still alive. Send an update with the same set -// of LBs as the one in SetUp() in order to verify that the LB channel inside -// xds keeps the initial connection (which by definition is also present in the -// update). +// updates as long as that call is still alive. Send an update with the same +// set of LBs as the one in SetUp() in order to verify that the LB channel +// inside xds keeps the initial connection (which by definition is also +// present in the update). TEST_P(BalancerUpdateTest, Repeated) { SetNextResolution({}); SetNextResolutionForLbChannelAllBalancers(); @@ -10187,9 +11004,9 @@ class FaultInjectionTest : public XdsEnd2endTest { public: FaultInjectionTest() : XdsEnd2endTest(1, 1) {} - // Builds a Listener with Fault Injection filter config. If the http_fault is - // nullptr, then assign an empty filter config. This filter config is required - // to enable the fault injection features. + // Builds a Listener with Fault Injection filter config. If the http_fault + // is nullptr, then assign an empty filter config. This filter config is + // required to enable the fault injection features. static Listener BuildListenerWithFaultInjection( const HTTPFault& http_fault = HTTPFault()) { HttpConnectionManager http_connection_manager; @@ -10501,9 +11318,9 @@ TEST_P(FaultInjectionTest, XdsFaultInjectionAlwaysDelayPercentageAbort) { ::testing::DoubleNear(kAbortRate, kErrorTolerance)); } -// This test and the above test apply different denominators to delay and abort. -// This ensures that we are using the right denominator for each injected fault -// in our code. +// This test and the above test apply different denominators to delay and +// abort. This ensures that we are using the right denominator for each +// injected fault in our code. TEST_P(FaultInjectionTest, XdsFaultInjectionAlwaysDelayPercentageAbortSwitchDenominator) { const uint32_t kAbortPercentagePerMillion = 500000;