diff --git a/Package.swift b/Package.swift index 283ae691bee..7e04d9f9be8 100644 --- a/Package.swift +++ b/Package.swift @@ -175,7 +175,6 @@ let package = Package( "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h", "src/core/ext/filters/client_channel/lb_policy/rls/rls.cc", "src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc", - "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h", "src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.cc", "src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.h", "src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 2900769a832..be0992812db 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -244,7 +244,6 @@ libs: - src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h - src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h - src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h - - src/core/ext/filters/client_channel/lb_policy/subchannel_list.h - src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.h - src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h - src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 2d6ad2ce956..acb834679b5 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -276,7 +276,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h', 'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h', - 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h', 'src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.h', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h', @@ -1339,7 +1338,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h', 'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h', - 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h', 'src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.h', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 54927792021..f69b95d25d5 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -276,7 +276,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h', 'src/core/ext/filters/client_channel/lb_policy/rls/rls.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', - 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h', 'src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.cc', 'src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.h', 'src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc', @@ -2089,7 +2088,6 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h', 'src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h', - 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.h', 'src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.h', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h', 'src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h', diff --git a/grpc.gemspec b/grpc.gemspec index 423d2053a4a..4a5124ef2aa 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -181,7 +181,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/rls/rls.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc ) - s.files += %w( src/core/ext/filters/client_channel/lb_policy/subchannel_list.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.cc ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc ) diff --git a/package.xml b/package.xml index 5560354cd06..c9bace71f3e 100644 --- a/package.xml +++ b/package.xml @@ -163,7 +163,6 @@ - diff --git a/src/core/BUILD b/src/core/BUILD index adc94513fd5..cd3f7ea7c7b 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4582,34 +4582,6 @@ grpc_cc_library( ], ) -grpc_cc_library( - name = "grpc_lb_subchannel_list", - hdrs = [ - "ext/filters/client_channel/lb_policy/subchannel_list.h", - ], - external_deps = [ - "absl/status", - "absl/types:optional", - ], - language = "c++", - deps = [ - "channel_args", - "dual_ref_counted", - "gpr_manual_constructor", - "health_check_client", - "iomgr_fwd", - "lb_policy", - "subchannel_interface", - "//:debug_location", - "//:gpr", - "//:grpc_base", - "//:grpc_client_channel", - "//:ref_counted_ptr", - "//:server_address", - "//:work_serializer", - ], -) - grpc_cc_library( name = "lb_endpoint_list", srcs = [ @@ -4707,16 +4679,18 @@ grpc_cc_library( deps = [ "channel_args", "closure", + "delegating_helper", "error", - "grpc_lb_subchannel_list", + "grpc_lb_policy_pick_first", "grpc_service_config", "json", "json_args", "json_object_loader", "lb_policy", "lb_policy_factory", + "lb_policy_registry", + "pollset_set", "ref_counted", - "subchannel_interface", "unique_type_name", "validation_errors", "//:config", diff --git a/src/core/ext/filters/client_channel/lb_policy/endpoint_list.h b/src/core/ext/filters/client_channel/lb_policy/endpoint_list.h index ca68193f8d8..66fce2871e4 100644 --- a/src/core/ext/filters/client_channel/lb_policy/endpoint_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/endpoint_list.h @@ -91,6 +91,8 @@ class MyEndpointList : public EndpointList { } }; */ +// TODO(roth): Consider wrapping this in an LB policy subclass for petiole +// policies to inherit from. class EndpointList : public InternallyRefCounted { public: // An individual endpoint. diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc index f5767e4679f..af67078d0e0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc +++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -37,6 +38,8 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" +#include + #define XXH_INLINE_ALL #include "xxhash.h" @@ -45,11 +48,12 @@ #include #include "src/core/ext/filters/client_channel/client_channel_internal.h" -#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" +#include "src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h" #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" @@ -59,10 +63,12 @@ #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/json/json.h" +#include "src/core/lib/load_balancing/delegating_helper.h" #include "src/core/lib/load_balancing/lb_policy.h" #include "src/core/lib/load_balancing/lb_policy_factory.h" -#include "src/core/lib/load_balancing/subchannel_interface.h" +#include "src/core/lib/load_balancing/lb_policy_registry.h" #include "src/core/lib/resolver/server_address.h" #include "src/core/lib/transport/connectivity_state.h" @@ -140,201 +146,153 @@ class RingHash : public LoadBalancingPolicy { void ResetBackoffLocked() override; private: - // Forward declaration. - class RingHashSubchannelList; - - // Data for a particular subchannel in a subchannel list. - // This subclass adds the following functionality: - // - Tracks the previous connectivity state of the subchannel, so that - // we know how many subchannels are in each state. - class RingHashSubchannelData - : public SubchannelData { + // A ring computed based on a config and address list. + class Ring : public RefCounted { public: - RingHashSubchannelData( - SubchannelList* - subchannel_list, - const ServerAddress& address, - RefCountedPtr subchannel) - : SubchannelData(subchannel_list, address, std::move(subchannel)), - address_(address) {} - - const ServerAddress& address() const { return address_; } - - grpc_connectivity_state logical_connectivity_state() const { - return logical_connectivity_state_; - } - const absl::Status& logical_connectivity_status() const { - return logical_connectivity_status_; - } + struct RingEntry { + uint64_t hash; + size_t endpoint_index; // Index into RingHash::addresses_. + }; + + Ring(RingHash* ring_hash, RingHashLbConfig* config); + + const std::vector& ring() const { return ring_; } private: - // Performs connectivity state updates that need to be done only - // after we have started watching. - void ProcessConnectivityChangeLocked( - absl::optional old_state, - grpc_connectivity_state new_state) override; - - ServerAddress address_; - - // Last logical connectivity state seen. - // Note that this may differ from the state actually reported by the - // subchannel in some cases; for example, once this is set to - // TRANSIENT_FAILURE, we do not change it again until we get READY, - // so we skip any interim stops in CONNECTING. - grpc_connectivity_state logical_connectivity_state_ = GRPC_CHANNEL_IDLE; - absl::Status logical_connectivity_status_; + std::vector ring_; }; - // A list of subchannels and the ring containing those subchannels. - class RingHashSubchannelList - : public SubchannelList { + // State for a particular endpoint. Delegates to a pick_first child policy. + class RingHashEndpoint : public InternallyRefCounted { public: - class Ring : public RefCounted { - public: - struct RingEntry { - uint64_t hash; - size_t subchannel_index; - }; + // index is the index into RingHash::addresses_ of this endpoint. + RingHashEndpoint(RefCountedPtr ring_hash, size_t index) + : ring_hash_(std::move(ring_hash)), index_(index) {} - Ring(RingHashLbConfig* config, RingHashSubchannelList* subchannel_list, - const ChannelArgs& args); + void Orphan() override; - const std::vector& ring() const { return ring_; } + size_t index() const { return index_; } + void set_index(size_t index) { index_ = index; } - private: - std::vector ring_; - }; - - RingHashSubchannelList(RingHash* policy, ServerAddressList addresses, - const ChannelArgs& args); + grpc_connectivity_state connectivity_state() const { + return connectivity_state_; + } - ~RingHashSubchannelList() override { - RingHash* p = static_cast(policy()); - p->Unref(DEBUG_LOCATION, "subchannel_list"); + // Returns info about the endpoint to be stored in the picker. + struct EndpointInfo { + RefCountedPtr endpoint; + RefCountedPtr picker; + grpc_connectivity_state state; + absl::Status status; + }; + EndpointInfo GetInfoForPicker() { + return {Ref(), picker_, connectivity_state_, status_}; } - RefCountedPtr ring() { return ring_; } - - // Updates the counters of subchannels in each state when a - // subchannel transitions from old_state to new_state. - void UpdateStateCountersLocked(grpc_connectivity_state old_state, - grpc_connectivity_state new_state); - - // Updates the RH policy's connectivity state based on the - // subchannel list's state counters, creating new picker and new ring. - // The index parameter indicates the index into the list of the subchannel - // whose status report triggered the call to - // UpdateRingHashConnectivityStateLocked(). - // connection_attempt_complete is true if the subchannel just - // finished a connection attempt. - void UpdateRingHashConnectivityStateLocked(size_t index, - bool connection_attempt_complete, - absl::Status status); + void ResetBackoffLocked(); + + // If the child policy does not yet exist, creates it; otherwise, + // asks the child to exit IDLE. + void RequestConnectionLocked(); private: - std::shared_ptr work_serializer() const override { - return static_cast(policy())->work_serializer(); - } + class Helper; - size_t num_idle_; - size_t num_ready_ = 0; - size_t num_connecting_ = 0; - size_t num_transient_failure_ = 0; + void CreateChildPolicy(); - RefCountedPtr ring_; + // Called when the child policy reports a connectivity state update. + void OnStateUpdate(grpc_connectivity_state new_state, + const absl::Status& status, + RefCountedPtr picker); + + // Ref to our parent. + RefCountedPtr ring_hash_; + size_t index_; // Index into RingHash::addresses_ of this endpoint. - // The index of the subchannel currently doing an internally - // triggered connection attempt, if any. - absl::optional internally_triggered_connection_index_; + // The pick_first child policy. + OrphanablePtr child_policy_; - // TODO(roth): If we ever change the helper UpdateState() API to not - // need the status reported for TRANSIENT_FAILURE state (because - // it's not currently actually used for anything outside of the picker), - // then we will no longer need this data member. - absl::Status last_failure_; + grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; + absl::Status status_; + RefCountedPtr picker_; }; class Picker : public SubchannelPicker { public: - Picker(RefCountedPtr ring_hash_lb, - RingHashSubchannelList* subchannel_list) - : ring_hash_lb_(std::move(ring_hash_lb)), - ring_(subchannel_list->ring()) { - subchannels_.reserve(subchannel_list->num_subchannels()); - for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { - RingHashSubchannelData* subchannel_data = - subchannel_list->subchannel(i); - subchannels_.emplace_back( - SubchannelInfo{subchannel_data->subchannel()->Ref(), - subchannel_data->logical_connectivity_state(), - subchannel_data->logical_connectivity_status()}); + explicit Picker(RefCountedPtr ring_hash) + : ring_hash_(std::move(ring_hash)), + ring_(ring_hash_->ring_), + endpoints_(ring_hash_->addresses_.size()) { + for (const auto& p : ring_hash_->endpoint_map_) { + endpoints_[p.second->index()] = p.second->GetInfoForPicker(); } } PickResult Pick(PickArgs args) override; private: - // A fire-and-forget class that schedules subchannel connection attempts + // A fire-and-forget class that schedules endpoint connection attempts // on the control plane WorkSerializer. - class SubchannelConnectionAttempter : public Orphanable { + class EndpointConnectionAttempter { public: - explicit SubchannelConnectionAttempter( - RefCountedPtr ring_hash_lb) - : ring_hash_lb_(std::move(ring_hash_lb)) { - GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr); - } - - void Orphan() override { + EndpointConnectionAttempter(RefCountedPtr ring_hash, + RefCountedPtr endpoint) + : ring_hash_(std::move(ring_hash)), endpoint_(std::move(endpoint)) { // Hop into ExecCtx, so that we're not holding the data plane mutex // while we run control-plane code. + GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr); ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus()); } - void AddSubchannel(RefCountedPtr subchannel) { - subchannels_.push_back(std::move(subchannel)); - } - private: static void RunInExecCtx(void* arg, grpc_error_handle /*error*/) { - auto* self = static_cast(arg); - self->ring_hash_lb_->work_serializer()->Run( + auto* self = static_cast(arg); + self->ring_hash_->work_serializer()->Run( [self]() { - if (!self->ring_hash_lb_->shutdown_) { - for (auto& subchannel : self->subchannels_) { - subchannel->RequestConnection(); - } + if (!self->ring_hash_->shutdown_) { + self->endpoint_->RequestConnectionLocked(); } delete self; }, DEBUG_LOCATION); } - RefCountedPtr ring_hash_lb_; + RefCountedPtr ring_hash_; + RefCountedPtr endpoint_; grpc_closure closure_; - std::vector> subchannels_; }; - struct SubchannelInfo { - RefCountedPtr subchannel; - grpc_connectivity_state state; - absl::Status status; - }; - - RefCountedPtr ring_hash_lb_; - RefCountedPtr ring_; - std::vector subchannels_; + RefCountedPtr ring_hash_; + RefCountedPtr ring_; + std::vector endpoints_; }; ~RingHash() override; void ShutdownLocked() override; - // Current config from resolver. - RefCountedPtr config_; + // Updates the aggregate policy's connectivity state based on the + // endpoint list's state counters, creating a new picker. + // entered_transient_failure is true if the endpoint has just + // entered TRANSIENT_FAILURE state. + // If the call to this method is triggered by an endpoint entering + // TRANSIENT_FAILURE, then status is the status reported by the endpoint. + void UpdateAggregatedConnectivityStateLocked(bool entered_transient_failure, + absl::Status status); + + // Current address list, channel args, and ring. + ServerAddressList addresses_; + ChannelArgs args_; + RefCountedPtr ring_; + + std::map> endpoint_map_; + + // TODO(roth): If we ever change the helper UpdateState() API to not + // need the status reported for TRANSIENT_FAILURE state (because + // it's not currently actually used for anything outside of the picker), + // then we will no longer need this data member. + absl::Status last_failure_; - // list of subchannels. - RefCountedPtr subchannel_list_; - RefCountedPtr latest_pending_subchannel_list_; // indicating if we are shutting down. bool shutdown_ = false; }; @@ -357,106 +315,62 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) { absl::InternalError("ring hash value is not a number")); } const auto& ring = ring_->ring(); + // Find the index in the ring to use for this RPC. // Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c // (ketama_get_server) NOTE: The algorithm depends on using signed integers - // for lowp, highp, and first_index. Do not change them! + // for lowp, highp, and index. Do not change them! int64_t lowp = 0; int64_t highp = ring.size(); - int64_t first_index = 0; + int64_t index = 0; while (true) { - first_index = (lowp + highp) / 2; - if (first_index == static_cast(ring.size())) { - first_index = 0; + index = (lowp + highp) / 2; + if (index == static_cast(ring.size())) { + index = 0; break; } - uint64_t midval = ring[first_index].hash; - uint64_t midval1 = first_index == 0 ? 0 : ring[first_index - 1].hash; + uint64_t midval = ring[index].hash; + uint64_t midval1 = index == 0 ? 0 : ring[index - 1].hash; if (h <= midval && h > midval1) { break; } if (midval < h) { - lowp = first_index + 1; + lowp = index + 1; } else { - highp = first_index - 1; + highp = index - 1; } if (lowp > highp) { - first_index = 0; + index = 0; break; } } - OrphanablePtr subchannel_connection_attempter; - auto ScheduleSubchannelConnectionAttempt = - [&](RefCountedPtr subchannel) { - if (subchannel_connection_attempter == nullptr) { - subchannel_connection_attempter = - MakeOrphanable(ring_hash_lb_->Ref( - DEBUG_LOCATION, "SubchannelConnectionAttempter")); - } - subchannel_connection_attempter->AddSubchannel(std::move(subchannel)); - }; - SubchannelInfo& first_subchannel = - subchannels_[ring[first_index].subchannel_index]; - switch (first_subchannel.state) { - case GRPC_CHANNEL_READY: - return PickResult::Complete(first_subchannel.subchannel); - case GRPC_CHANNEL_IDLE: - ScheduleSubchannelConnectionAttempt(first_subchannel.subchannel); - ABSL_FALLTHROUGH_INTENDED; - case GRPC_CHANNEL_CONNECTING: - return PickResult::Queue(); - default: // GRPC_CHANNEL_TRANSIENT_FAILURE - break; - } - ScheduleSubchannelConnectionAttempt(first_subchannel.subchannel); - // Loop through remaining subchannels to find one in READY. - // On the way, we make sure the right set of connection attempts - // will happen. - bool found_second_subchannel = false; - bool found_first_non_failed = false; - for (size_t i = 1; i < ring.size(); ++i) { - const auto& entry = ring[(first_index + i) % ring.size()]; - if (entry.subchannel_index == ring[first_index].subchannel_index) { - continue; - } - SubchannelInfo& subchannel_info = subchannels_[entry.subchannel_index]; - if (subchannel_info.state == GRPC_CHANNEL_READY) { - return PickResult::Complete(subchannel_info.subchannel); - } - if (!found_second_subchannel) { - switch (subchannel_info.state) { - case GRPC_CHANNEL_IDLE: - ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel); - ABSL_FALLTHROUGH_INTENDED; - case GRPC_CHANNEL_CONNECTING: - return PickResult::Queue(); - default: - break; - } - found_second_subchannel = true; - } - if (!found_first_non_failed) { - if (subchannel_info.state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel); - } else { - if (subchannel_info.state == GRPC_CHANNEL_IDLE) { - ScheduleSubchannelConnectionAttempt(subchannel_info.subchannel); - } - found_first_non_failed = true; - } + // Find the first endpoint we can use from the selected index. + for (size_t i = 0; i < ring.size(); ++i) { + const auto& entry = ring[(index + i) % ring.size()]; + const auto& endpoint_info = endpoints_[entry.endpoint_index]; + switch (endpoint_info.state) { + case GRPC_CHANNEL_READY: + return endpoint_info.picker->Pick(args); + case GRPC_CHANNEL_IDLE: + new EndpointConnectionAttempter( + ring_hash_->Ref(DEBUG_LOCATION, "EndpointConnectionAttempter"), + endpoint_info.endpoint); + ABSL_FALLTHROUGH_INTENDED; + case GRPC_CHANNEL_CONNECTING: + return PickResult::Queue(); + default: + break; } } return PickResult::Fail(absl::UnavailableError(absl::StrCat( - "ring hash cannot find a connected subchannel; first failure: ", - first_subchannel.status.ToString()))); + "ring hash cannot find a connected endpoint; first failure: ", + endpoints_[ring[index].endpoint_index].status.message()))); } // -// RingHash::RingHashSubchannelList::Ring +// RingHash::Ring // -RingHash::RingHashSubchannelList::Ring::Ring( - RingHashLbConfig* config, RingHashSubchannelList* subchannel_list, - const ChannelArgs& args) { +RingHash::Ring::Ring(RingHash* ring_hash, RingHashLbConfig* config) { // Store the weights while finding the sum. struct AddressWeight { std::string address; @@ -467,15 +381,15 @@ RingHash::RingHashSubchannelList::Ring::Ring( }; std::vector address_weights; size_t sum = 0; - address_weights.reserve(subchannel_list->num_subchannels()); - for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { - RingHashSubchannelData* sd = subchannel_list->subchannel(i); - auto weight_arg = sd->address().args().GetInt(GRPC_ARG_ADDRESS_WEIGHT); + const ServerAddressList& addresses = ring_hash->addresses_; + address_weights.reserve(addresses.size()); + for (const auto& address : addresses) { AddressWeight address_weight; address_weight.address = - grpc_sockaddr_to_string(&sd->address().address(), false).value(); + grpc_sockaddr_to_string(&address.address(), false).value(); // Weight should never be zero, but ignore it just in case, since // that value would screw up the ring-building algorithm. + auto weight_arg = address.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT); if (weight_arg.value_or(0) > 0) { address_weight.weight = *weight_arg; } @@ -499,8 +413,9 @@ RingHash::RingHashSubchannelList::Ring::Ring( // weights aren't provided, all hosts should get an equal number of hashes. In // the case where this number exceeds the max_ring_size, it's scaled back down // to fit. - const size_t ring_size_cap = args.GetInt(GRPC_ARG_RING_HASH_LB_RING_SIZE_CAP) - .value_or(kRingSizeCapDefault); + const size_t ring_size_cap = + ring_hash->args_.GetInt(GRPC_ARG_RING_HASH_LB_RING_SIZE_CAP) + .value_or(kRingSizeCapDefault); const size_t min_ring_size = std::min(config->min_ring_size(), ring_size_cap); const size_t max_ring_size = std::min(config->max_ring_size(), ring_size_cap); const double scale = std::min( @@ -519,7 +434,7 @@ RingHash::RingHashSubchannelList::Ring::Ring( double target_hashes = 0.0; uint64_t min_hashes_per_host = ring_size; uint64_t max_hashes_per_host = 0; - for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { + for (size_t i = 0; i < addresses.size(); ++i) { const std::string& address_string = address_weights[i].address; hash_key_buffer.assign(address_string.begin(), address_string.end()); hash_key_buffer.emplace_back('_'); @@ -549,210 +464,125 @@ RingHash::RingHashSubchannelList::Ring::Ring( } // -// RingHash::RingHashSubchannelList +// RingHash::RingHashEndpoint::Helper // -RingHash::RingHashSubchannelList::RingHashSubchannelList( - RingHash* policy, ServerAddressList addresses, const ChannelArgs& args) - : SubchannelList(policy, - (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) - ? "RingHashSubchannelList" - : nullptr), - std::move(addresses), policy->channel_control_helper(), - args), - num_idle_(num_subchannels()) { - // Need to maintain a ref to the LB policy as long as we maintain - // any references to subchannels, since the subchannels' - // pollset_sets will include the LB policy's pollset_set. - policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); - // Construct the ring. - ring_ = MakeRefCounted(policy->config_.get(), this, args); - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { - gpr_log(GPR_INFO, - "[RH %p] created subchannel list %p with %" PRIuPTR " ring entries", - policy, this, ring_->ring().size()); +class RingHash::RingHashEndpoint::Helper + : public LoadBalancingPolicy::DelegatingChannelControlHelper { + public: + explicit Helper(RefCountedPtr endpoint) + : endpoint_(std::move(endpoint)) {} + + ~Helper() override { endpoint_.reset(DEBUG_LOCATION, "Helper"); } + + void UpdateState( + grpc_connectivity_state state, const absl::Status& status, + RefCountedPtr picker) override { + endpoint_->OnStateUpdate(state, status, std::move(picker)); } -} -void RingHash::RingHashSubchannelList::UpdateStateCountersLocked( - grpc_connectivity_state old_state, grpc_connectivity_state new_state) { - if (old_state == GRPC_CHANNEL_IDLE) { - GPR_ASSERT(num_idle_ > 0); - --num_idle_; - } else if (old_state == GRPC_CHANNEL_READY) { - GPR_ASSERT(num_ready_ > 0); - --num_ready_; - } else if (old_state == GRPC_CHANNEL_CONNECTING) { - GPR_ASSERT(num_connecting_ > 0); - --num_connecting_; - } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - GPR_ASSERT(num_transient_failure_ > 0); - --num_transient_failure_; + private: + LoadBalancingPolicy::ChannelControlHelper* parent_helper() const override { + return endpoint_->ring_hash_->channel_control_helper(); } - GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN); - if (new_state == GRPC_CHANNEL_IDLE) { - ++num_idle_; - } else if (new_state == GRPC_CHANNEL_READY) { - ++num_ready_; - } else if (new_state == GRPC_CHANNEL_CONNECTING) { - ++num_connecting_; - } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - ++num_transient_failure_; + + RefCountedPtr endpoint_; +}; + +// +// RingHash::RingHashEndpoint +// + +void RingHash::RingHashEndpoint::Orphan() { + if (child_policy_ != nullptr) { + // Remove pollset_set linkage. + grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), + ring_hash_->interested_parties()); + child_policy_.reset(); + picker_.reset(); } + Unref(); } -void RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked( - size_t index, bool connection_attempt_complete, absl::Status status) { - RingHash* p = static_cast(policy()); - // If this is latest_pending_subchannel_list_, then swap it into - // subchannel_list_ as soon as we get the initial connectivity state - // report for every subchannel in the list. - if (p->latest_pending_subchannel_list_.get() == this && - AllSubchannelsSeenInitialState()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { - gpr_log(GPR_INFO, "[RH %p] replacing subchannel list %p with %p", p, - p->subchannel_list_.get(), this); - } - p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); - } - // Only set connectivity state if this is the current subchannel list. - if (p->subchannel_list_.get() != this) return; - // The overall aggregation rules here are: - // 1. If there is at least one subchannel in READY state, report READY. - // 2. If there are 2 or more subchannels in TRANSIENT_FAILURE state, report - // TRANSIENT_FAILURE. - // 3. If there is at least one subchannel in CONNECTING state, report - // CONNECTING. - // 4. If there is one subchannel in TRANSIENT_FAILURE state and there is - // more than one subchannel, report CONNECTING. - // 5. If there is at least one subchannel in IDLE state, report IDLE. - // 6. Otherwise, report TRANSIENT_FAILURE. - // - // We set start_connection_attempt to true if we match rules 2, 3, or 6. - grpc_connectivity_state state; - bool start_connection_attempt = false; - if (num_ready_ > 0) { - state = GRPC_CHANNEL_READY; - } else if (num_transient_failure_ >= 2) { - state = GRPC_CHANNEL_TRANSIENT_FAILURE; - start_connection_attempt = true; - } else if (num_connecting_ > 0) { - state = GRPC_CHANNEL_CONNECTING; - } else if (num_transient_failure_ == 1 && num_subchannels() > 1) { - state = GRPC_CHANNEL_CONNECTING; - start_connection_attempt = true; - } else if (num_idle_ > 0) { - state = GRPC_CHANNEL_IDLE; - } else { - state = GRPC_CHANNEL_TRANSIENT_FAILURE; - start_connection_attempt = true; - } - // In TRANSIENT_FAILURE, report the last reported failure. - // Otherwise, report OK. - if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - if (!status.ok()) { - last_failure_ = absl::UnavailableError(absl::StrCat( - "no reachable subchannels; last error: ", status.ToString())); - } - status = last_failure_; +void RingHash::RingHashEndpoint::ResetBackoffLocked() { + if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); +} + +void RingHash::RingHashEndpoint::RequestConnectionLocked() { + if (child_policy_ == nullptr) { + CreateChildPolicy(); } else { - status = absl::OkStatus(); - } - // Generate new picker and return it to the channel. - // Note that we use our own picker regardless of connectivity state. - p->channel_control_helper()->UpdateState( - state, status, - MakeRefCounted(p->Ref(DEBUG_LOCATION, "RingHashPicker"), this)); - // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will - // not be getting any pick requests from the priority policy. - // However, because the ring_hash policy does not attempt to - // reconnect to subchannels unless it is getting pick requests, - // it will need special handling to ensure that it will eventually - // recover from TRANSIENT_FAILURE state once the problem is resolved. - // Specifically, it will make sure that it is attempting to connect to - // at least one subchannel at any given time. After a given subchannel - // fails a connection attempt, it will move on to the next subchannel - // in the ring. It will keep doing this until one of the subchannels - // successfully connects, at which point it will report READY and stop - // proactively trying to connect. The policy will remain in - // TRANSIENT_FAILURE until at least one subchannel becomes connected, - // even if subchannels are in state CONNECTING during that time. - // - // Note that we do the same thing when the policy is in state - // CONNECTING, just to ensure that we don't remain in CONNECTING state - // indefinitely if there are no new picks coming in. - if (internally_triggered_connection_index_.has_value() && - *internally_triggered_connection_index_ == index && - connection_attempt_complete) { - internally_triggered_connection_index_.reset(); - } - if (start_connection_attempt && - !internally_triggered_connection_index_.has_value()) { - size_t next_index = (index + 1) % num_subchannels(); - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { - gpr_log(GPR_INFO, - "[RH %p] triggering internal connection attempt for subchannel " - "%p, subchannel_list %p (index %" PRIuPTR " of %" PRIuPTR ")", - p, subchannel(next_index)->subchannel(), this, next_index, - num_subchannels()); - } - internally_triggered_connection_index_ = next_index; - subchannel(next_index)->subchannel()->RequestConnection(); + child_policy_->ExitIdleLocked(); } } -// -// RingHash::RingHashSubchannelData -// +void RingHash::RingHashEndpoint::CreateChildPolicy() { + GPR_ASSERT(child_policy_ == nullptr); + const ServerAddress& address = ring_hash_->addresses_[index_]; + LoadBalancingPolicy::Args lb_policy_args; + auto child_args = + ring_hash_->args_ + .Set(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING, true) + .Set(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX, true); + lb_policy_args.work_serializer = ring_hash_->work_serializer(); + lb_policy_args.args = child_args; + lb_policy_args.channel_control_helper = + std::make_unique(Ref(DEBUG_LOCATION, "Helper")); + child_policy_ = + CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy( + "pick_first", std::move(lb_policy_args)); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { + gpr_log(GPR_INFO, + "[RH %p] endpoint %p (index %" PRIuPTR " of %" PRIuPTR + ", %s): created child policy %p", + ring_hash_.get(), this, index_, ring_hash_->addresses_.size(), + address.ToString().c_str(), child_policy_.get()); + } + // Add our interested_parties pollset_set to that of the newly created + // child policy. This will make the child policy progress upon activity on + // this policy, which in turn is tied to the application's call. + grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(), + ring_hash_->interested_parties()); + // Construct pick_first config. + auto config = + CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig( + Json::FromArray( + {Json::FromObject({{"pick_first", Json::FromObject({})}})})); + GPR_ASSERT(config.ok()); + // Update child policy. + LoadBalancingPolicy::UpdateArgs update_args; + update_args.addresses.emplace().emplace_back(address); + update_args.args = std::move(child_args); + update_args.config = std::move(*config); + // TODO(roth): If the child reports a non-OK status with the update, + // we need to propagate that back to the resolver somehow. + (void)child_policy_->UpdateLocked(std::move(update_args)); +} -void RingHash::RingHashSubchannelData::ProcessConnectivityChangeLocked( - absl::optional old_state, - grpc_connectivity_state new_state) { - RingHash* p = static_cast(subchannel_list()->policy()); +void RingHash::RingHashEndpoint::OnStateUpdate( + grpc_connectivity_state new_state, const absl::Status& status, + RefCountedPtr picker) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { gpr_log( GPR_INFO, - "[RH %p] connectivity changed for subchannel %p, subchannel_list %p " - "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s", - p, subchannel(), subchannel_list(), Index(), - subchannel_list()->num_subchannels(), - ConnectivityStateName(logical_connectivity_state_), - ConnectivityStateName(new_state)); - } - GPR_ASSERT(subchannel() != nullptr); - // If this is not the initial state notification and the new state is - // TRANSIENT_FAILURE or IDLE, re-resolve. - // Note that we don't want to do this on the initial state notification, - // because that would result in an endless loop of re-resolution. - if (old_state.has_value() && (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE || - new_state == GRPC_CHANNEL_IDLE)) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { - gpr_log(GPR_INFO, - "[RH %p] Subchannel %p reported %s; requesting re-resolution", p, - subchannel(), ConnectivityStateName(new_state)); - } - p->channel_control_helper()->RequestReresolution(); - } - const bool connection_attempt_complete = new_state != GRPC_CHANNEL_CONNECTING; - // Decide what state to report for the purposes of aggregation and - // picker behavior. - // If the last recorded state was TRANSIENT_FAILURE, ignore the change - // unless the new state is READY (or TF again, in which case we need - // to update the status). - if (logical_connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE || - new_state == GRPC_CHANNEL_READY || - new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { - // Update state counters used for aggregation. - subchannel_list()->UpdateStateCountersLocked(logical_connectivity_state_, - new_state); - // Update logical state. - logical_connectivity_state_ = new_state; - logical_connectivity_status_ = connectivity_status(); + "[RH %p] connectivity changed for endpoint %p (%s, child_policy=%p): " + "prev_state=%s new_state=%s (%s)", + ring_hash_.get(), this, + ring_hash_->addresses_[index_].ToString().c_str(), child_policy_.get(), + ConnectivityStateName(connectivity_state_), + ConnectivityStateName(new_state), status.ToString().c_str()); } - // Update the RH policy's connectivity state, creating new picker and new - // ring. - subchannel_list()->UpdateRingHashConnectivityStateLocked( - Index(), connection_attempt_complete, logical_connectivity_status_); + if (child_policy_ == nullptr) return; // Already orphaned. + // Update state. + const bool entered_transient_failure = + connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE && + new_state == GRPC_CHANNEL_TRANSIENT_FAILURE; + connectivity_state_ = new_state; + status_ = status; + picker_ = std::move(picker); + // Update the aggregated connectivity state. + ring_hash_->UpdateAggregatedConnectivityStateLocked(entered_transient_failure, + status); } // @@ -769,8 +599,6 @@ RingHash::~RingHash() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { gpr_log(GPR_INFO, "[RH %p] Destroying Ring Hash policy", this); } - GPR_ASSERT(subchannel_list_ == nullptr); - GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); } void RingHash::ShutdownLocked() { @@ -778,73 +606,212 @@ void RingHash::ShutdownLocked() { gpr_log(GPR_INFO, "[RH %p] Shutting down", this); } shutdown_ = true; - subchannel_list_.reset(); - latest_pending_subchannel_list_.reset(); + endpoint_map_.clear(); } void RingHash::ResetBackoffLocked() { - subchannel_list_->ResetBackoffLocked(); - if (latest_pending_subchannel_list_ != nullptr) { - latest_pending_subchannel_list_->ResetBackoffLocked(); + for (const auto& p : endpoint_map_) { + p.second->ResetBackoffLocked(); } } absl::Status RingHash::UpdateLocked(UpdateArgs args) { - config_ = std::move(args.config); - ServerAddressList addresses; + // Check address list. if (args.addresses.ok()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses", this, args.addresses->size()); } - addresses = *std::move(args.addresses); + addresses_ = *std::move(args.addresses); } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s", this, args.addresses.status().ToString().c_str()); } - // If we already have a subchannel list, then keep using the existing + // If we already have an endpoint list, then keep using the existing // list, but still report back that the update was not accepted. - if (subchannel_list_ != nullptr) return args.addresses.status(); + if (!addresses_.empty()) return args.addresses.status(); } - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) && - latest_pending_subchannel_list_ != nullptr) { - gpr_log(GPR_INFO, "[RH %p] replacing latest pending subchannel list %p", - this, latest_pending_subchannel_list_.get()); + // Save channel args. + args_ = std::move(args.args); + // Build new ring. + ring_ = MakeRefCounted( + this, static_cast(args.config.get())); + // Update endpoint map. + std::map> endpoint_map; + for (size_t i = 0; i < addresses_.size(); ++i) { + const ServerAddress& address = addresses_[i]; + // If present in old map, retain it; otherwise, create a new one. + auto it = endpoint_map_.find(address); + if (it != endpoint_map_.end()) { + it->second->set_index(i); + endpoint_map.emplace(address, std::move(it->second)); + } else { + endpoint_map.emplace(address, MakeOrphanable(Ref(), i)); + } + } + endpoint_map_ = std::move(endpoint_map); + // If the address list is empty, report TRANSIENT_FAILURE. + // TODO(roth): As part of adding dualstack backend support, we need to + // also handle the case where the list of addresses for a given + // endpoint is empty. + if (addresses_.empty()) { + absl::Status status = + args.addresses.ok() ? absl::UnavailableError(absl::StrCat( + "empty address list: ", args.resolution_note)) + : args.addresses.status(); + channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, status, + MakeRefCounted(status)); + return status; } - latest_pending_subchannel_list_ = MakeRefCounted( - this, std::move(addresses), args.args); - latest_pending_subchannel_list_->StartWatchingLocked(args.args); - // If we have no existing list or the new list is empty, immediately - // promote the new list. - // Otherwise, do nothing; the new list will be promoted when the - // initial subchannel states are reported. - if (subchannel_list_ == nullptr || - latest_pending_subchannel_list_->num_subchannels() == 0) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace) && - subchannel_list_ != nullptr) { - gpr_log(GPR_INFO, - "[RH %p] empty address list, replacing subchannel list %p", this, - subchannel_list_.get()); + // Return a new picker. + UpdateAggregatedConnectivityStateLocked(/*entered_transient_failure=*/false, + absl::OkStatus()); + return absl::OkStatus(); +} + +void RingHash::UpdateAggregatedConnectivityStateLocked( + bool entered_transient_failure, absl::Status status) { + // Count the number of endpoints in each state. + size_t num_idle = 0; + size_t num_connecting = 0; + size_t num_ready = 0; + size_t num_transient_failure = 0; + for (const auto& p : endpoint_map_) { + switch (p.second->connectivity_state()) { + case GRPC_CHANNEL_READY: + ++num_ready; + break; + case GRPC_CHANNEL_IDLE: + ++num_idle; + break; + case GRPC_CHANNEL_CONNECTING: + ++num_connecting; + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + ++num_transient_failure; + break; + default: + Crash("child policy should never report SHUTDOWN"); } - subchannel_list_ = std::move(latest_pending_subchannel_list_); - // If the new list is empty, report TRANSIENT_FAILURE. - if (subchannel_list_->num_subchannels() == 0) { - absl::Status status = - args.addresses.ok() - ? absl::UnavailableError( - absl::StrCat("empty address list: ", args.resolution_note)) - : args.addresses.status(); - channel_control_helper()->UpdateState( - GRPC_CHANNEL_TRANSIENT_FAILURE, status, - MakeRefCounted(status)); - return status; + } + // The overall aggregation rules here are: + // 1. If there is at least one endpoint in READY state, report READY. + // 2. If there are 2 or more endpoints in TRANSIENT_FAILURE state, report + // TRANSIENT_FAILURE. + // 3. If there is at least one endpoint in CONNECTING state, report + // CONNECTING. + // 4. If there is one endpoint in TRANSIENT_FAILURE state and there is + // more than one endpoint, report CONNECTING. + // 5. If there is at least one endpoint in IDLE state, report IDLE. + // 6. Otherwise, report TRANSIENT_FAILURE. + // + // We set start_connection_attempt to true if we match rules 2, 4, or 6. + grpc_connectivity_state state; + bool start_connection_attempt = false; + if (num_ready > 0) { + state = GRPC_CHANNEL_READY; + } else if (num_transient_failure >= 2) { + state = GRPC_CHANNEL_TRANSIENT_FAILURE; + start_connection_attempt = true; + } else if (num_connecting > 0) { + state = GRPC_CHANNEL_CONNECTING; + } else if (num_transient_failure == 1 && addresses_.size() > 1) { + state = GRPC_CHANNEL_CONNECTING; + start_connection_attempt = true; + } else if (num_idle > 0) { + state = GRPC_CHANNEL_IDLE; + } else { + state = GRPC_CHANNEL_TRANSIENT_FAILURE; + start_connection_attempt = true; + } + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { + gpr_log(GPR_INFO, + "[RH %p] setting connectivity state to %s (num_idle=%" PRIuPTR + ", num_connecting=%" PRIuPTR ", num_ready=%" PRIuPTR + ", num_transient_failure=%" PRIuPTR ", size=%" PRIuPTR + ") -- start_connection_attempt=%d", + this, ConnectivityStateName(state), num_idle, num_connecting, + num_ready, num_transient_failure, addresses_.size(), + start_connection_attempt); + } + // In TRANSIENT_FAILURE, report the last reported failure. + // Otherwise, report OK. + if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + if (!status.ok()) { + last_failure_ = absl::UnavailableError(absl::StrCat( + "no reachable endpoints; last error: ", status.message())); + } + status = last_failure_; + } else { + status = absl::OkStatus(); + } + // Generate new picker and return it to the channel. + // Note that we use our own picker regardless of connectivity state. + channel_control_helper()->UpdateState( + state, status, + MakeRefCounted(Ref(DEBUG_LOCATION, "RingHashPicker"))); + // While the ring_hash policy is reporting TRANSIENT_FAILURE, it will + // not be getting any pick requests from the priority policy. + // However, because the ring_hash policy does not attempt to + // reconnect to endpoints unless it is getting pick requests, + // it will need special handling to ensure that it will eventually + // recover from TRANSIENT_FAILURE state once the problem is resolved. + // Specifically, it will make sure that it is attempting to connect to + // at least one endpoint at any given time. But we don't want to just + // try to connect to only one endpoint, because if that particular + // endpoint happens to be down but the rest are reachable, we would + // incorrectly fail to recover. + // + // So, to handle this, whenever an endpoint initially enters + // TRANSIENT_FAILURE state (i.e., its initial connection attempt has + // failed), if there are no endpoints currently in CONNECTING state + // (i.e., they are still trying their initial connection attempt), + // then we will trigger a connection attempt for the first endpoint + // that is currently in state IDLE, if any. + // + // Note that once an endpoint enters TRANSIENT_FAILURE state, it will + // stay in that state and automatically retry after appropriate backoff, + // never stopping until it establishes a connection. This means that + // if we stay in TRANSIENT_FAILURE for a long period of time, we will + // eventually be trying *all* endpoints, which probably isn't ideal. + // But it's no different than what can happen if ring_hash is the root + // LB policy and we keep getting picks, so it's not really a new + // problem. If/when it becomes an issue, we can figure out how to + // address it. + // + // Note that we do the same thing when the policy is in state + // CONNECTING, just to ensure that we don't remain in CONNECTING state + // indefinitely if there are no new picks coming in. + if (start_connection_attempt && entered_transient_failure) { + size_t first_idle_index = addresses_.size(); + for (size_t i = 0; i < addresses_.size(); ++i) { + auto it = endpoint_map_.find(addresses_[i]); + GPR_ASSERT(it != endpoint_map_.end()); + if (it->second->connectivity_state() == GRPC_CHANNEL_CONNECTING) { + first_idle_index = addresses_.size(); + break; + } + if (first_idle_index == addresses_.size() && + it->second->connectivity_state() == GRPC_CHANNEL_IDLE) { + first_idle_index = i; + } + } + if (first_idle_index != addresses_.size()) { + auto it = endpoint_map_.find(addresses_[first_idle_index]); + GPR_ASSERT(it != endpoint_map_.end()); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { + gpr_log(GPR_INFO, + "[RH %p] triggering internal connection attempt for endpoint " + "%p (%s) (index %" PRIuPTR " of %" PRIuPTR ")", + this, it->second.get(), + addresses_[first_idle_index].ToString().c_str(), + first_idle_index, addresses_.size()); + } + it->second->RequestConnectionLocked(); } - // Otherwise, report IDLE. - subchannel_list_->UpdateRingHashConnectivityStateLocked( - /*index=*/0, /*connection_attempt_complete=*/false, absl::OkStatus()); } - return absl::OkStatus(); } // diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h deleted file mode 100644 index 87072c7381c..00000000000 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ /dev/null @@ -1,488 +0,0 @@ -// -// Copyright 2015 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H -#define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H - -#include - -#include -#include - -#include -#include -#include -#include - -#include "absl/status/status.h" -#include "absl/types/optional.h" - -#include -#include -#include - -#include "src/core/ext/filters/client_channel/client_channel_internal.h" -#include "src/core/ext/filters/client_channel/lb_policy/health_check_client.h" -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/gprpp/debug_location.h" -#include "src/core/lib/gprpp/dual_ref_counted.h" -#include "src/core/lib/gprpp/manual_constructor.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/work_serializer.h" -#include "src/core/lib/iomgr/iomgr_fwd.h" -#include "src/core/lib/load_balancing/lb_policy.h" -#include "src/core/lib/load_balancing/subchannel_interface.h" -#include "src/core/lib/resolver/server_address.h" -#include "src/core/lib/transport/connectivity_state.h" - -// Code for maintaining a list of subchannels within an LB policy. -// -// To use this, callers must create their own subclasses, like so: -// - -// class MySubchannelList; // Forward declaration. - -// class MySubchannelData -// : public SubchannelData { -// public: -// void ProcessConnectivityChangeLocked( -// absl::optional old_state, -// grpc_connectivity_state new_state) override { -// // ...code to handle connectivity changes... -// } -// }; - -// class MySubchannelList -// : public SubchannelList { -// }; - -// -// All methods will be called from within the client_channel work serializer. - -namespace grpc_core { - -// Forward declaration. -template -class SubchannelList; - -// Stores data for a particular subchannel in a subchannel list. -// Callers must create a subclass that implements the -// ProcessConnectivityChangeLocked() method. -template -class SubchannelData { - public: - // Returns a pointer to the subchannel list containing this object. - SubchannelListType* subchannel_list() const { - return static_cast(subchannel_list_); - } - - // Returns the index into the subchannel list of this object. - size_t Index() const { - return static_cast(static_cast(this) - - subchannel_list_->subchannel(0)); - } - - // Returns a pointer to the subchannel. - SubchannelInterface* subchannel() const { return subchannel_.get(); } - - // Returns the cached connectivity state, if any. - absl::optional connectivity_state() { - return connectivity_state_; - } - absl::Status connectivity_status() { return connectivity_status_; } - - // Resets the connection backoff. - void ResetBackoffLocked(); - - // Cancels any pending connectivity watch and unrefs the subchannel. - void ShutdownLocked(); - - protected: - SubchannelData( - SubchannelList* subchannel_list, - const ServerAddress& address, - RefCountedPtr subchannel); - - virtual ~SubchannelData(); - - // This method will be invoked once soon after instantiation to report - // the current connectivity state, and it will then be invoked again - // whenever the connectivity state changes. - virtual void ProcessConnectivityChangeLocked( - absl::optional old_state, - grpc_connectivity_state new_state) = 0; - - private: - // For accessing StartConnectivityWatchLocked(). - friend class SubchannelList; - - // Watcher for subchannel connectivity state. - class Watcher - : public SubchannelInterface::ConnectivityStateWatcherInterface { - public: - Watcher( - SubchannelData* subchannel_data, - WeakRefCountedPtr subchannel_list) - : subchannel_data_(subchannel_data), - subchannel_list_(std::move(subchannel_list)) {} - - ~Watcher() override { - subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); - } - - void OnConnectivityStateChange(grpc_connectivity_state new_state, - absl::Status status) override; - - grpc_pollset_set* interested_parties() override { - return subchannel_list_->policy()->interested_parties(); - } - - private: - SubchannelData* subchannel_data_; - WeakRefCountedPtr subchannel_list_; - }; - - // Starts watching the connectivity state of the subchannel. - // ProcessConnectivityChangeLocked() will be called whenever the - // connectivity state changes. - void StartConnectivityWatchLocked(const ChannelArgs& args); - - // Cancels watching the connectivity state of the subchannel. - void CancelConnectivityWatchLocked(const char* reason); - - // Unrefs the subchannel. - void UnrefSubchannelLocked(const char* reason); - - // Backpointer to owning subchannel list. Not owned. - SubchannelList* subchannel_list_; - // The subchannel. - RefCountedPtr subchannel_; - // Will be non-null when the subchannel's state is being watched. - SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ = - nullptr; - SubchannelInterface::DataWatcherInterface* health_watcher_ = nullptr; - // Data updated by the watcher. - absl::optional connectivity_state_; - absl::Status connectivity_status_; -}; - -// A list of subchannels. -template -class SubchannelList : public DualRefCounted { - public: - // Starts watching the connectivity state of all subchannels. - // Must be called immediately after instantiation. - void StartWatchingLocked(const ChannelArgs& args); - - // The number of subchannels in the list. - size_t num_subchannels() const { return subchannels_.size(); } - - // The data for the subchannel at a particular index. - SubchannelDataType* subchannel(size_t index) { - return subchannels_[index].get(); - } - - // Returns true if the subchannel list is shutting down. - bool shutting_down() const { return shutting_down_; } - - // Accessors. - LoadBalancingPolicy* policy() const { return policy_; } - const char* tracer() const { return tracer_; } - - // Resets connection backoff of all subchannels. - void ResetBackoffLocked(); - - // Returns true if all subchannels have seen their initial - // connectivity state notifications. - bool AllSubchannelsSeenInitialState(); - - void Orphan() override; - - protected: - SubchannelList(LoadBalancingPolicy* policy, const char* tracer, - ServerAddressList addresses, - LoadBalancingPolicy::ChannelControlHelper* helper, - const ChannelArgs& args); - - virtual ~SubchannelList(); - - private: - // For accessing Ref() and Unref(). - friend class SubchannelData; - - virtual std::shared_ptr work_serializer() const = 0; - - // Backpointer to owning policy. - LoadBalancingPolicy* policy_; - - const char* tracer_; - - absl::optional health_check_service_name_; - - // The list of subchannels. - // We use ManualConstructor here to support SubchannelDataType classes - // that are not copyable. - std::vector> subchannels_; - - // Is this list shutting down? This may be true due to the shutdown of the - // policy itself or because a newer update has arrived while this one hadn't - // finished processing. - bool shutting_down_ = false; -}; - -// -// implementation -- no user-servicable parts below -// - -// -// SubchannelData::Watcher -// - -template -void SubchannelData::Watcher:: - OnConnectivityStateChange(grpc_connectivity_state new_state, - absl::Status status) { - if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) { - gpr_log( - GPR_INFO, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): connectivity changed: old_state=%s, new_state=%s, " - "status=%s, shutting_down=%d, pending_watcher=%p, health_watcher=%p", - subchannel_list_->tracer(), subchannel_list_->policy(), - subchannel_list_.get(), subchannel_data_->Index(), - subchannel_list_->num_subchannels(), - subchannel_data_->subchannel_.get(), - (subchannel_data_->connectivity_state_.has_value() - ? ConnectivityStateName(*subchannel_data_->connectivity_state_) - : "N/A"), - ConnectivityStateName(new_state), status.ToString().c_str(), - subchannel_list_->shutting_down(), subchannel_data_->pending_watcher_, - subchannel_data_->health_watcher_); - } - if (!subchannel_list_->shutting_down() && - (subchannel_data_->pending_watcher_ != nullptr || - subchannel_data_->health_watcher_ != nullptr)) { - absl::optional old_state = - subchannel_data_->connectivity_state_; - subchannel_data_->connectivity_state_ = new_state; - subchannel_data_->connectivity_status_ = status; - // Call the subclass's ProcessConnectivityChangeLocked() method. - subchannel_data_->ProcessConnectivityChangeLocked(old_state, new_state); - } -} - -// -// SubchannelData -// - -template -SubchannelData::SubchannelData( - SubchannelList* subchannel_list, - const ServerAddress& /*address*/, - RefCountedPtr subchannel) - : subchannel_list_(subchannel_list), subchannel_(std::move(subchannel)) {} - -template -SubchannelData::~SubchannelData() { - GPR_ASSERT(subchannel_ == nullptr); -} - -template -void SubchannelData:: - UnrefSubchannelLocked(const char* reason) { - if (subchannel_ != nullptr) { - if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) { - gpr_log(GPR_INFO, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): unreffing subchannel (%s)", - subchannel_list_->tracer(), subchannel_list_->policy(), - subchannel_list_, Index(), subchannel_list_->num_subchannels(), - subchannel_.get(), reason); - } - subchannel_.reset(); - } -} - -template -void SubchannelData::ResetBackoffLocked() { - if (subchannel_ != nullptr) { - subchannel_->ResetBackoff(); - } -} - -template -void SubchannelData:: - StartConnectivityWatchLocked(const ChannelArgs& args) { - if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) { - gpr_log( - GPR_INFO, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): starting watch " - "(health_check_service_name=\"%s\")", - subchannel_list_->tracer(), subchannel_list_->policy(), - subchannel_list_, Index(), subchannel_list_->num_subchannels(), - subchannel_.get(), - subchannel_list()->health_check_service_name_.value_or("N/A").c_str()); - } - GPR_ASSERT(pending_watcher_ == nullptr); - GPR_ASSERT(health_watcher_ == nullptr); - auto watcher = std::make_unique( - this, subchannel_list()->WeakRef(DEBUG_LOCATION, "Watcher")); - if (subchannel_list()->health_check_service_name_.has_value()) { - auto health_watcher = MakeHealthCheckWatcher( - subchannel_list_->work_serializer(), args, std::move(watcher)); - health_watcher_ = health_watcher.get(); - subchannel_->AddDataWatcher(std::move(health_watcher)); - } else { - pending_watcher_ = watcher.get(); - subchannel_->WatchConnectivityState(std::move(watcher)); - } -} - -template -void SubchannelData:: - CancelConnectivityWatchLocked(const char* reason) { - if (pending_watcher_ != nullptr) { - if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) { - gpr_log(GPR_INFO, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): canceling connectivity watch (%s)", - subchannel_list_->tracer(), subchannel_list_->policy(), - subchannel_list_, Index(), subchannel_list_->num_subchannels(), - subchannel_.get(), reason); - } - subchannel_->CancelConnectivityStateWatch(pending_watcher_); - pending_watcher_ = nullptr; - } else if (health_watcher_ != nullptr) { - if (GPR_UNLIKELY(subchannel_list_->tracer() != nullptr)) { - gpr_log(GPR_INFO, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): canceling health watch (%s)", - subchannel_list_->tracer(), subchannel_list_->policy(), - subchannel_list_, Index(), subchannel_list_->num_subchannels(), - subchannel_.get(), reason); - } - subchannel_->CancelDataWatcher(health_watcher_); - health_watcher_ = nullptr; - } -} - -template -void SubchannelData::ShutdownLocked() { - CancelConnectivityWatchLocked("shutdown"); - UnrefSubchannelLocked("shutdown"); -} - -// -// SubchannelList -// - -template -SubchannelList::SubchannelList( - LoadBalancingPolicy* policy, const char* tracer, - ServerAddressList addresses, - LoadBalancingPolicy::ChannelControlHelper* helper, const ChannelArgs& args) - : DualRefCounted(tracer), - policy_(policy), - tracer_(tracer) { - if (!args.GetBool(GRPC_ARG_INHIBIT_HEALTH_CHECKING).value_or(false)) { - health_check_service_name_ = - args.GetOwnedString(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME); - } - if (GPR_UNLIKELY(tracer_ != nullptr)) { - gpr_log(GPR_INFO, - "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", - tracer_, policy, this, addresses.size()); - } - subchannels_.reserve(addresses.size()); - // Create a subchannel for each address. - for (ServerAddress address : addresses) { - RefCountedPtr subchannel = - helper->CreateSubchannel(address, args); - if (subchannel == nullptr) { - // Subchannel could not be created. - if (GPR_UNLIKELY(tracer_ != nullptr)) { - gpr_log(GPR_INFO, - "[%s %p] could not create subchannel for address %s, ignoring", - tracer_, policy_, address.ToString().c_str()); - } - continue; - } - if (GPR_UNLIKELY(tracer_ != nullptr)) { - gpr_log(GPR_INFO, - "[%s %p] subchannel list %p index %" PRIuPTR - ": Created subchannel %p for address %s", - tracer_, policy_, this, subchannels_.size(), subchannel.get(), - address.ToString().c_str()); - } - subchannels_.emplace_back(); - subchannels_.back().Init(this, std::move(address), std::move(subchannel)); - } -} - -template -SubchannelList::~SubchannelList() { - if (GPR_UNLIKELY(tracer_ != nullptr)) { - gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_, policy_, - this); - } - for (auto& sd : subchannels_) { - sd.Destroy(); - } -} - -template -void SubchannelList:: - StartWatchingLocked(const ChannelArgs& args) { - for (auto& sd : subchannels_) { - sd->StartConnectivityWatchLocked(args); - } -} - -template -void SubchannelList::Orphan() { - if (GPR_UNLIKELY(tracer_ != nullptr)) { - gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p", tracer_, - policy_, this); - } - GPR_ASSERT(!shutting_down_); - shutting_down_ = true; - for (auto& sd : subchannels_) { - sd->ShutdownLocked(); - } -} - -template -void SubchannelList::ResetBackoffLocked() { - for (auto& sd : subchannels_) { - sd->ResetBackoffLocked(); - } -} - -template -bool SubchannelList::AllSubchannelsSeenInitialState() { - for (size_t i = 0; i < num_subchannels(); ++i) { - if (!subchannel(i)->connectivity_state().has_value()) return false; - } - return true; -} - -} // namespace grpc_core - -#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H diff --git a/src/core/lib/resolver/server_address.h b/src/core/lib/resolver/server_address.h index d42a70f9d74..e5c62638eca 100644 --- a/src/core/lib/resolver/server_address.h +++ b/src/core/lib/resolver/server_address.h @@ -58,6 +58,7 @@ class ServerAddress { ServerAddress& operator=(ServerAddress&& other) noexcept; bool operator==(const ServerAddress& other) const { return Cmp(other) == 0; } + bool operator<(const ServerAddress& other) const { return Cmp(other) < 0; } int Cmp(const ServerAddress& other) const; diff --git a/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc index 9a4b112e656..ef06120bc5b 100644 --- a/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc @@ -995,7 +995,7 @@ TEST_P(RingHashTest, ReattemptWhenAllEndpointsUnreachable) { CheckRpcSendFailure( DEBUG_LOCATION, StatusCode::UNAVAILABLE, MakeConnectionFailureRegex( - "ring hash cannot find a connected subchannel; first failure: "), + "ring hash cannot find a connected endpoint; first failure: "), RpcOptions().set_metadata(std::move(metadata))); StartBackend(0); // Ensure we are actively connecting without any traffic. @@ -1034,7 +1034,7 @@ TEST_P(RingHashTest, TransientFailureSkipToAvailableReady) { CheckRpcSendFailure( DEBUG_LOCATION, StatusCode::UNAVAILABLE, MakeConnectionFailureRegex( - "ring hash cannot find a connected subchannel; first failure: "), + "ring hash cannot find a connected endpoint; first failure: "), rpc_options); gpr_log(GPR_INFO, "=== DONE WITH FIRST RPC ==="); EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false)); @@ -1070,7 +1070,7 @@ TEST_P(RingHashTest, TransientFailureSkipToAvailableReady) { CheckRpcSendFailure( DEBUG_LOCATION, StatusCode::UNAVAILABLE, MakeConnectionFailureRegex( - "ring hash cannot find a connected subchannel; first failure: "), + "ring hash cannot find a connected endpoint; first failure: "), rpc_options); gpr_log(GPR_INFO, "=== STARTING BACKEND 1 ==="); StartBackend(1); diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 66c493b1962..246f6a7f140 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1139,7 +1139,6 @@ src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc \ src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h \ src/core/ext/filters/client_channel/lb_policy/rls/rls.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ -src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \ src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.cc \ src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.h \ src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index d663516929c..68cb19c969b 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -945,7 +945,6 @@ src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc \ src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h \ src/core/ext/filters/client_channel/lb_policy/rls/rls.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ -src/core/ext/filters/client_channel/lb_policy/subchannel_list.h \ src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.cc \ src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.h \ src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc \