diff --git a/BUILD b/BUILD index a83ccf8d7d4..6e02ac04d52 100644 --- a/BUILD +++ b/BUILD @@ -2941,6 +2941,7 @@ grpc_cc_library( "//src/core:lib/resolver/endpoint_addresses.h", ], external_deps = [ + "absl/functional:function_ref", "absl/status", "absl/status:statusor", "absl/strings", diff --git a/src/core/BUILD b/src/core/BUILD index cbf8ed32ea0..351f101b819 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4140,6 +4140,7 @@ grpc_cc_library( external_deps = [ "absl/base:core_headers", "absl/container:inlined_vector", + "absl/functional:function_ref", "absl/status", "absl/status:statusor", "absl/strings", @@ -4667,6 +4668,7 @@ grpc_cc_library( "ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc", ], external_deps = [ + "absl/functional:function_ref", "absl/status", "absl/status:statusor", "absl/strings", @@ -4689,6 +4691,7 @@ grpc_cc_library( "no_destruct", "pollset_set", "ref_counted_string", + "resolved_address", "validation_errors", "//:channel_arg_names", "//:config", @@ -4837,6 +4840,7 @@ grpc_cc_library( "ext/filters/client_channel/lb_policy/address_filtering.h", ], external_deps = [ + "absl/functional:function_ref", "absl/status:statusor", "absl/strings", ], @@ -4845,6 +4849,7 @@ grpc_cc_library( "channel_args", "ref_counted", "ref_counted_string", + "resolved_address", "//:endpoint_addresses", "//:gpr_platform", "//:ref_counted_ptr", @@ -4914,6 +4919,7 @@ grpc_cc_library( "lb_policy", "subchannel_interface", "//:debug_location", + "//:endpoint_addresses", "//:gpr", "//:grpc_base", "//:ref_counted_ptr", @@ -4931,7 +4937,7 @@ grpc_cc_library( "ext/filters/client_channel/lb_policy/endpoint_list.h", ], external_deps = [ - "absl/functional:any_invocable", + "absl/functional:function_ref", "absl/status", "absl/status:statusor", "absl/types:optional", @@ -5325,6 +5331,7 @@ grpc_cc_library( ], external_deps = [ "absl/base:core_headers", + "absl/functional:function_ref", "absl/status", "absl/status:statusor", "absl/strings", diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 991cbee4fc8..b5064ceb20f 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1599,7 +1599,12 @@ absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked( Resolver::Result result) { // Construct update. LoadBalancingPolicy::UpdateArgs update_args; - update_args.addresses = std::move(result.addresses); + if (!result.addresses.ok()) { + update_args.addresses = result.addresses.status(); + } else { + update_args.addresses = std::make_shared( + std::move(*result.addresses)); + } update_args.config = std::move(lb_policy_config); update_args.resolution_note = std::move(result.resolution_note); // Remove the config selector from channel args so that we're not holding diff --git a/src/core/ext/filters/client_channel/lb_policy/address_filtering.cc b/src/core/ext/filters/client_channel/lb_policy/address_filtering.cc index b6e8396b95e..bac15550d2d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/address_filtering.cc +++ b/src/core/ext/filters/client_channel/lb_policy/address_filtering.cc @@ -20,11 +20,13 @@ #include -#include #include +#include "absl/functional/function_ref.h" + #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/resolved_address.h" namespace grpc_core { @@ -43,32 +45,63 @@ int HierarchicalPathArg::ChannelArgsCompare(const HierarchicalPathArg* a, return 0; } +namespace { + +class HierarchicalAddressIterator : public EndpointAddressesIterator { + public: + HierarchicalAddressIterator( + std::shared_ptr parent_it, + RefCountedStringValue child_name) + : parent_it_(std::move(parent_it)), child_name_(std::move(child_name)) {} + + void ForEach(absl::FunctionRef callback) + const override { + RefCountedPtr remaining_path_attr; + parent_it_->ForEach([&](const EndpointAddresses& endpoint) { + const auto* path_arg = endpoint.args().GetObject(); + if (path_arg == nullptr) return; + const std::vector& path = path_arg->path(); + auto it = path.begin(); + if (it == path.end()) return; + if (*it != child_name_) return; + ChannelArgs args = endpoint.args(); + ++it; + if (it != path.end()) { + std::vector remaining_path(it, path.end()); + if (remaining_path_attr == nullptr || + remaining_path_attr->path() != remaining_path) { + remaining_path_attr = + MakeRefCounted(std::move(remaining_path)); + } + args = args.SetObject(remaining_path_attr); + } + callback(EndpointAddresses(endpoint.addresses(), args)); + }); + } + + private: + std::shared_ptr parent_it_; + RefCountedStringValue child_name_; +}; + +} // namespace + absl::StatusOr MakeHierarchicalAddressMap( - const absl::StatusOr& addresses) { + absl::StatusOr> addresses) { if (!addresses.ok()) return addresses.status(); HierarchicalAddressMap result; - RefCountedPtr remaining_path_attr; - for (const EndpointAddresses& endpoint_addresses : *addresses) { - const auto* path_arg = - endpoint_addresses.args().GetObject(); - if (path_arg == nullptr) continue; + (*addresses)->ForEach([&](const EndpointAddresses& endpoint) { + const auto* path_arg = endpoint.args().GetObject(); + if (path_arg == nullptr) return; const std::vector& path = path_arg->path(); auto it = path.begin(); - if (it == path.end()) continue; - EndpointAddressesList& target_list = result[*it]; - ChannelArgs args = endpoint_addresses.args(); - ++it; - if (it != path.end()) { - std::vector remaining_path(it, path.end()); - if (remaining_path_attr == nullptr || - remaining_path_attr->path() != remaining_path) { - remaining_path_attr = - MakeRefCounted(std::move(remaining_path)); - } - args = args.SetObject(remaining_path_attr); + if (it == path.end()) return; + auto& target_list = result[*it]; + if (target_list == nullptr) { + target_list = + std::make_shared(*addresses, *it); } - target_list.emplace_back(endpoint_addresses.addresses(), args); - } + }); return result; } diff --git a/src/core/ext/filters/client_channel/lb_policy/address_filtering.h b/src/core/ext/filters/client_channel/lb_policy/address_filtering.h index d0e2faae294..924261669bd 100644 --- a/src/core/ext/filters/client_channel/lb_policy/address_filtering.h +++ b/src/core/ext/filters/client_channel/lb_policy/address_filtering.h @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -105,12 +106,12 @@ class HierarchicalPathArg : public RefCounted { // A map from the next path element to the endpoint addresses that fall // under that path element. using HierarchicalAddressMap = - std::map, RefCountedStringValueLessThan>; // Splits up the addresses into a separate list for each child. absl::StatusOr MakeHierarchicalAddressMap( - const absl::StatusOr& addresses); + absl::StatusOr> addresses); } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc b/src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc index 2c878bb2db0..7f2ecdd69f0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc @@ -118,7 +118,7 @@ void EndpointList::Endpoint::Init( GPR_ASSERT(config.ok()); // Update child policy. LoadBalancingPolicy::UpdateArgs update_args; - update_args.addresses.emplace().emplace_back(addresses); + update_args.addresses = std::make_shared(addresses); update_args.args = child_args; update_args.config = std::move(*config); // TODO(roth): If the child reports a non-OK status with the update, @@ -163,15 +163,16 @@ RefCountedPtr EndpointList::Endpoint::CreateSubchannel( // void EndpointList::Init( - const EndpointAddressesList& endpoints, const ChannelArgs& args, - absl::AnyInvocable(RefCountedPtr, - const EndpointAddresses&, - const ChannelArgs&)> + EndpointAddressesIterator* endpoints, const ChannelArgs& args, + absl::FunctionRef(RefCountedPtr, + const EndpointAddresses&, + const ChannelArgs&)> create_endpoint) { - for (const EndpointAddresses& addresses : endpoints) { + if (endpoints == nullptr) return; + endpoints->ForEach([&](const EndpointAddresses& endpoint) { endpoints_.push_back( - create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), addresses, args)); - } + create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), endpoint, args)); + }); } void EndpointList::ResetBackoffLocked() { diff --git a/src/core/ext/filters/client_channel/lb_policy/endpoint_list.h b/src/core/ext/filters/client_channel/lb_policy/endpoint_list.h index df31bc39c0e..c814d9f50eb 100644 --- a/src/core/ext/filters/client_channel/lb_policy/endpoint_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/endpoint_list.h @@ -25,7 +25,7 @@ #include #include -#include "absl/functional/any_invocable.h" +#include "absl/functional/function_ref.h" #include "absl/status/status.h" #include "absl/types/optional.h" @@ -53,7 +53,7 @@ namespace grpc_core { class MyEndpointList : public EndpointList { public: MyEndpointList(RefCountedPtr lb_policy, - const EndpointAddressesList& endpoints, + EndpointAddressesIterator* endpoints, const ChannelArgs& args) : EndpointList(std::move(lb_policy), GRPC_TRACE_FLAG_ENABLED(grpc_my_tracer) @@ -184,8 +184,8 @@ class EndpointList : public InternallyRefCounted { EndpointList(RefCountedPtr policy, const char* tracer) : policy_(std::move(policy)), tracer_(tracer) {} - void Init(const EndpointAddressesList& endpoints, const ChannelArgs& args, - absl::AnyInvocable( + void Init(EndpointAddressesIterator* endpoints, const ChannelArgs& args, + absl::FunctionRef( RefCountedPtr, const EndpointAddresses&, const ChannelArgs&)> create_endpoint); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 4d3ebc1455d..0f4c7675f89 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -72,6 +72,7 @@ #include #include "absl/container/inlined_vector.h" +#include "absl/functional/function_ref.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" @@ -384,9 +385,9 @@ class GrpcLb : public LoadBalancingPolicy { // Returns a text representation suitable for logging. std::string AsText() const; - // Extracts all non-drop entries into an EndpointAddressesList. - EndpointAddressesList GetServerAddressList( - GrpcLbClientStats* client_stats) const; + // Extracts all non-drop entries into an EndpointAddressesIterator. + std::shared_ptr GetServerAddressList( + GrpcLbClientStats* client_stats); // Returns true if the serverlist contains at least one drop entry and // no backend address entries. @@ -400,6 +401,8 @@ class GrpcLb : public LoadBalancingPolicy { const char* ShouldDrop(); private: + class AddressIterator; + std::vector serverlist_; // Accessed from the picker, so needs synchronization. @@ -504,6 +507,8 @@ class GrpcLb : public LoadBalancingPolicy { RefCountedPtr parent_; }; + class NullLbTokenEndpointIterator; + void ShutdownLocked() override; // Helper functions used in UpdateLocked(). @@ -569,7 +574,8 @@ class GrpcLb : public LoadBalancingPolicy { // Whether we're in fallback mode. bool fallback_mode_ = false; // The backend addresses from the resolver. - absl::StatusOr fallback_backend_addresses_; + absl::StatusOr> + fallback_backend_addresses_; // The last resolution note from our parent. // To be passed to child policy when fallback_backend_addresses_ is empty. std::string resolution_note_; @@ -594,11 +600,30 @@ class GrpcLb : public LoadBalancingPolicy { }; // -// GrpcLb::Serverlist +// GrpcLb::Serverlist::AddressIterator // -bool GrpcLb::Serverlist::operator==(const Serverlist& other) const { - return serverlist_ == other.serverlist_; +bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) { + if (server.drop) return false; + if (GPR_UNLIKELY(server.port >> 16 != 0)) { + if (log) { + gpr_log(GPR_ERROR, + "Invalid port '%d' at index %" PRIuPTR + " of serverlist. Ignoring.", + server.port, idx); + } + return false; + } + if (GPR_UNLIKELY(server.ip_size != 4 && server.ip_size != 16)) { + if (log) { + gpr_log(GPR_ERROR, + "Expected IP to be 4 or 16 bytes, got %d at index %" PRIuPTR + " of serverlist. Ignoring", + server.ip_size, idx); + } + return false; + } + return true; } void ParseServer(const GrpcLbServer& server, grpc_resolved_address* addr) { @@ -623,6 +648,53 @@ void ParseServer(const GrpcLbServer& server, grpc_resolved_address* addr) { } } +class GrpcLb::Serverlist::AddressIterator : public EndpointAddressesIterator { + public: + AddressIterator(RefCountedPtr serverlist, + RefCountedPtr client_stats) + : serverlist_(std::move(serverlist)), + client_stats_(std::move(client_stats)) {} + + void ForEach(absl::FunctionRef callback) + const override { + for (size_t i = 0; i < serverlist_->serverlist_.size(); ++i) { + const GrpcLbServer& server = serverlist_->serverlist_[i]; + if (!IsServerValid(server, i, false)) continue; + // Address processing. + grpc_resolved_address addr; + ParseServer(server, &addr); + // LB token processing. + const size_t lb_token_length = strnlen( + server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token)); + std::string lb_token(server.load_balance_token, lb_token_length); + if (lb_token.empty()) { + auto addr_uri = grpc_sockaddr_to_uri(&addr); + gpr_log(GPR_INFO, + "Missing LB token for backend address '%s'. The empty token " + "will be used instead", + addr_uri.ok() ? addr_uri->c_str() + : addr_uri.status().ToString().c_str()); + } + // Return address with a channel arg containing LB token and stats object. + callback(EndpointAddresses( + addr, ChannelArgs().SetObject(MakeRefCounted( + std::move(lb_token), client_stats_)))); + } + } + + private: + RefCountedPtr serverlist_; + RefCountedPtr client_stats_; +}; + +// +// GrpcLb::Serverlist +// + +bool GrpcLb::Serverlist::operator==(const Serverlist& other) const { + return serverlist_ == other.serverlist_; +} + std::string GrpcLb::Serverlist::AsText() const { std::vector entries; for (size_t i = 0; i < serverlist_.size(); ++i) { @@ -642,59 +714,12 @@ std::string GrpcLb::Serverlist::AsText() const { return absl::StrJoin(entries, ""); } -bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) { - if (server.drop) return false; - if (GPR_UNLIKELY(server.port >> 16 != 0)) { - if (log) { - gpr_log(GPR_ERROR, - "Invalid port '%d' at index %" PRIuPTR - " of serverlist. Ignoring.", - server.port, idx); - } - return false; - } - if (GPR_UNLIKELY(server.ip_size != 4 && server.ip_size != 16)) { - if (log) { - gpr_log(GPR_ERROR, - "Expected IP to be 4 or 16 bytes, got %d at index %" PRIuPTR - " of serverlist. Ignoring", - server.ip_size, idx); - } - return false; - } - return true; -} - // Returns addresses extracted from the serverlist. -EndpointAddressesList GrpcLb::Serverlist::GetServerAddressList( - GrpcLbClientStats* client_stats) const { +std::shared_ptr +GrpcLb::Serverlist::GetServerAddressList(GrpcLbClientStats* client_stats) { RefCountedPtr stats; if (client_stats != nullptr) stats = client_stats->Ref(); - EndpointAddressesList endpoints; - for (size_t i = 0; i < serverlist_.size(); ++i) { - const GrpcLbServer& server = serverlist_[i]; - if (!IsServerValid(server, i, false)) continue; - // Address processing. - grpc_resolved_address addr; - ParseServer(server, &addr); - // LB token processing. - const size_t lb_token_length = strnlen( - server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token)); - std::string lb_token(server.load_balance_token, lb_token_length); - if (lb_token.empty()) { - auto addr_uri = grpc_sockaddr_to_uri(&addr); - gpr_log(GPR_INFO, - "Missing LB token for backend address '%s'. The empty token will " - "be used instead", - addr_uri.ok() ? addr_uri->c_str() - : addr_uri.status().ToString().c_str()); - } - // Add address with a channel arg containing LB token and stats object. - endpoints.emplace_back( - addr, ChannelArgs().SetObject(MakeRefCounted( - std::move(lb_token), stats))); - } - return endpoints; + return std::make_shared(Ref(), std::move(stats)); } bool GrpcLb::Serverlist::ContainsAllDropEntries() const { @@ -1503,6 +1528,31 @@ void GrpcLb::ResetBackoffLocked() { } } +// Endpoint iterator wrapper to add null LB token attribute. +class GrpcLb::NullLbTokenEndpointIterator : public EndpointAddressesIterator { + public: + explicit NullLbTokenEndpointIterator( + std::shared_ptr parent_it) + : parent_it_(std::move(parent_it)) {} + + void ForEach(absl::FunctionRef callback) + const override { + parent_it_->ForEach([&](const EndpointAddresses& endpoint) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "[grpclb %p] fallback address: %s", this, + endpoint.ToString().c_str()); + } + callback(EndpointAddresses(endpoint.addresses(), + endpoint.args().SetObject(empty_token_))); + }); + } + + private: + std::shared_ptr parent_it_; + RefCountedPtr empty_token_ = + MakeRefCounted("", nullptr); +}; + absl::Status GrpcLb::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "[grpclb %p] received update", this); @@ -1512,19 +1562,11 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) { GPR_ASSERT(config_ != nullptr); args_ = std::move(args.args); // Update fallback address list. - fallback_backend_addresses_ = std::move(args.addresses); - if (fallback_backend_addresses_.ok()) { - // Add null LB token attributes. - for (EndpointAddresses& endpoint : *fallback_backend_addresses_) { - endpoint = EndpointAddresses( - endpoint.addresses(), - endpoint.args().SetObject( - MakeRefCounted("", nullptr))); - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "[grpclb %p] fallback address: %s", this, - endpoint.ToString().c_str()); - } - } + if (!args.addresses.ok()) { + fallback_backend_addresses_ = args.addresses.status(); + } else { + fallback_backend_addresses_ = std::make_shared( + std::move(*args.addresses)); } resolution_note_ = std::move(args.resolution_note); // Update balancer channel. @@ -1756,6 +1798,12 @@ OrphanablePtr GrpcLb::CreateChildPolicyLocked( return lb_policy; } +bool EndpointIteratorIsEmpty(const EndpointAddressesIterator& endpoints) { + bool empty = true; + endpoints.ForEach([&](const EndpointAddresses&) { empty = false; }); + return empty; +} + void GrpcLb::CreateOrUpdateChildPolicyLocked() { if (shutting_down_) return; // Construct update args. @@ -1769,16 +1817,17 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() { // picks. update_args.addresses = fallback_backend_addresses_; if (fallback_backend_addresses_.ok() && - fallback_backend_addresses_->empty()) { + EndpointIteratorIsEmpty(**fallback_backend_addresses_)) { update_args.resolution_note = absl::StrCat( - "grpclb in fallback mode without any balancer addresses: ", + "grpclb in fallback mode without any fallback addresses: ", resolution_note_); } } else { update_args.addresses = serverlist_->GetServerAddressList( lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats()); is_backend_from_grpclb_load_balancer = true; - if (update_args.addresses.ok() && update_args.addresses->empty()) { + if (update_args.addresses.ok() && + EndpointIteratorIsEmpty(**update_args.addresses)) { update_args.resolution_note = "empty serverlist from grpclb balancer"; } } diff --git a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc index 8569f24019d..656332b2128 100644 --- a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc +++ b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc @@ -661,7 +661,7 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) { if (args.addresses.ok()) { std::set current_endpoints; std::set current_addresses; - for (const EndpointAddresses& endpoint : *args.addresses) { + (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) { EndpointAddressSet key(endpoint.addresses()); current_endpoints.emplace(key); for (const grpc_resolved_address& address : endpoint.addresses()) { @@ -708,7 +708,7 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) { } it->second->DisableEjection(); } - } + }); // Remove any entries we no longer need in the subchannel map. for (auto it = subchannel_state_map_.begin(); it != subchannel_state_map_.end();) { @@ -753,7 +753,6 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) { update_args.addresses = std::move(args.addresses); update_args.resolution_note = std::move(args.resolution_note); update_args.config = config_->child_policy(); - // Update the policy. update_args.args = std::move(args.args); if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 577744a3fff..e51248faa9c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -21,7 +21,6 @@ #include #include -#include #include #include #include @@ -114,7 +113,7 @@ class PickFirst : public LoadBalancingPolicy { public: class SubchannelData { public: - SubchannelData(SubchannelList* subchannel_list, + SubchannelData(SubchannelList* subchannel_list, size_t index, RefCountedPtr subchannel); SubchannelInterface* subchannel() const { return subchannel_.get(); } @@ -125,12 +124,6 @@ class PickFirst : public LoadBalancingPolicy { return connectivity_status_; } - // Returns the index into the subchannel list of this object. - size_t Index() const { - return static_cast(this - - &subchannel_list_->subchannels_.front()); - } - // Resets the connection backoff. void ResetBackoffLocked() { if (subchannel_ != nullptr) subchannel_->ResetBackoff(); @@ -153,10 +146,8 @@ class PickFirst : public LoadBalancingPolicy { class Watcher : public SubchannelInterface::ConnectivityStateWatcherInterface { public: - Watcher(SubchannelData* subchannel_data, - RefCountedPtr subchannel_list) - : subchannel_data_(subchannel_data), - subchannel_list_(std::move(subchannel_list)) {} + Watcher(RefCountedPtr subchannel_list, size_t index) + : subchannel_list_(std::move(subchannel_list)), index_(index) {} ~Watcher() override { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); @@ -164,8 +155,8 @@ class PickFirst : public LoadBalancingPolicy { void OnConnectivityStateChange(grpc_connectivity_state new_state, absl::Status status) override { - subchannel_data_->OnConnectivityStateChange(new_state, - std::move(status)); + subchannel_list_->subchannels_[index_].OnConnectivityStateChange( + new_state, std::move(status)); } grpc_pollset_set* interested_parties() override { @@ -173,8 +164,8 @@ class PickFirst : public LoadBalancingPolicy { } private: - SubchannelData* subchannel_data_; RefCountedPtr subchannel_list_; + const size_t index_; }; // This method will be invoked once soon after instantiation to report @@ -193,6 +184,7 @@ class PickFirst : public LoadBalancingPolicy { // Backpointer to owning subchannel list. Not owned. SubchannelList* subchannel_list_; + const size_t index_; // The subchannel. RefCountedPtr subchannel_; // Will be non-null when the subchannel's state is being watched. @@ -205,7 +197,8 @@ class PickFirst : public LoadBalancingPolicy { }; SubchannelList(RefCountedPtr policy, - EndpointAddressesList addresses, const ChannelArgs& args); + EndpointAddressesIterator* addresses, + const ChannelArgs& args); ~SubchannelList() override; @@ -413,9 +406,9 @@ void PickFirst::ResetBackoffLocked() { void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() { // Create a subchannel list from latest_update_args_. - EndpointAddressesList addresses; + EndpointAddressesIterator* addresses = nullptr; if (latest_update_args_.addresses.ok()) { - addresses = *latest_update_args_.addresses; + addresses = latest_update_args_.addresses->get(); } // Replace latest_pending_subchannel_list_. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) && @@ -425,7 +418,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() { latest_pending_subchannel_list_.get()); } latest_pending_subchannel_list_ = MakeOrphanable( - Ref(), std::move(addresses), latest_update_args_.args); + Ref(), addresses, latest_update_args_.args); // Empty update or no valid subchannels. Put the channel in // TRANSIENT_FAILURE and request re-resolution. if (latest_pending_subchannel_list_->size() == 0) { @@ -483,9 +476,7 @@ class AddressFamilyIterator { absl::Status PickFirst::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { if (args.addresses.ok()) { - gpr_log(GPR_INFO, - "Pick First %p received update with %" PRIuPTR " addresses", this, - args.addresses->size()); + gpr_log(GPR_INFO, "Pick First %p received update", this); } else { gpr_log(GPR_INFO, "Pick First %p received update with address error: %s", this, args.addresses.status().ToString().c_str()); @@ -495,51 +486,59 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) { absl::Status status; if (!args.addresses.ok()) { status = args.addresses.status(); - } else if (args.addresses->empty()) { - status = absl::UnavailableError("address list must not be empty"); } else { - // Shuffle the list if needed. - auto config = static_cast(args.config.get()); - if (config->shuffle_addresses()) { - absl::c_shuffle(*args.addresses, bit_gen_); - } - // Flatten the list so that we have one address per endpoint. - // While we're iterating, also determine the desired address family - // order and the index of the first element of each family, for use in - // the interleaving below. - std::set address_families; - std::vector address_family_order; EndpointAddressesList endpoints; - for (const auto& endpoint : *args.addresses) { - for (const auto& address : endpoint.addresses()) { - endpoints.emplace_back(address, endpoint.args()); - if (IsPickFirstHappyEyeballsEnabled()) { - absl::string_view scheme = GetAddressFamily(address); - bool inserted = address_families.insert(scheme).second; - if (inserted) { - address_family_order.emplace_back(scheme, endpoints.size() - 1); + (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) { + endpoints.push_back(endpoint); + }); + if (endpoints.empty()) { + status = absl::UnavailableError("address list must not be empty"); + } else { + // Shuffle the list if needed. + auto config = static_cast(args.config.get()); + if (config->shuffle_addresses()) { + absl::c_shuffle(endpoints, bit_gen_); + } + // Flatten the list so that we have one address per endpoint. + // While we're iterating, also determine the desired address family + // order and the index of the first element of each family, for use in + // the interleaving below. + std::set address_families; + std::vector address_family_order; + EndpointAddressesList flattened_endpoints; + for (const auto& endpoint : endpoints) { + for (const auto& address : endpoint.addresses()) { + flattened_endpoints.emplace_back(address, endpoint.args()); + if (IsPickFirstHappyEyeballsEnabled()) { + absl::string_view scheme = GetAddressFamily(address); + bool inserted = address_families.insert(scheme).second; + if (inserted) { + address_family_order.emplace_back(scheme, + flattened_endpoints.size() - 1); + } } } } - } - // Interleave addresses as per RFC-8305 section 4. - if (IsPickFirstHappyEyeballsEnabled()) { - EndpointAddressesList interleaved_endpoints; - interleaved_endpoints.reserve(endpoints.size()); - std::vector endpoints_moved(endpoints.size()); - size_t scheme_index = 0; - for (size_t i = 0; i < endpoints.size(); ++i) { - EndpointAddresses* endpoint; - do { - auto& iterator = address_family_order[scheme_index++ % - address_family_order.size()]; - endpoint = iterator.Next(endpoints, &endpoints_moved); - } while (endpoint == nullptr); - interleaved_endpoints.emplace_back(std::move(*endpoint)); + endpoints = std::move(flattened_endpoints); + // Interleave addresses as per RFC-8305 section 4. + if (IsPickFirstHappyEyeballsEnabled()) { + EndpointAddressesList interleaved_endpoints; + interleaved_endpoints.reserve(endpoints.size()); + std::vector endpoints_moved(endpoints.size()); + size_t scheme_index = 0; + for (size_t i = 0; i < endpoints.size(); ++i) { + EndpointAddresses* endpoint; + do { + auto& iterator = address_family_order[scheme_index++ % + address_family_order.size()]; + endpoint = iterator.Next(endpoints, &endpoints_moved); + } while (endpoint == nullptr); + interleaved_endpoints.emplace_back(std::move(*endpoint)); + } + endpoints = std::move(interleaved_endpoints); } - args.addresses = std::move(interleaved_endpoints); - } else { - args.addresses = std::move(endpoints); + args.addresses = + std::make_shared(std::move(endpoints)); } } // If the update contains a resolver error and we have a previous update @@ -617,18 +616,20 @@ void PickFirst::HealthWatcher::OnConnectivityStateChange( // PickFirst::SubchannelList::SubchannelData::SubchannelData( - SubchannelList* subchannel_list, + SubchannelList* subchannel_list, size_t index, RefCountedPtr subchannel) - : subchannel_list_(subchannel_list), subchannel_(std::move(subchannel)) { + : subchannel_list_(subchannel_list), + index_(index), + subchannel_(std::move(subchannel)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, "[PF %p] subchannel list %p index %" PRIuPTR " (subchannel %p): starting watch", - subchannel_list_->policy_.get(), subchannel_list_, - subchannel_list_->size(), subchannel_.get()); + subchannel_list_->policy_.get(), subchannel_list_, index_, + subchannel_.get()); } auto watcher = std::make_unique( - this, subchannel_list_->Ref(DEBUG_LOCATION, "Watcher")); + subchannel_list_->Ref(DEBUG_LOCATION, "Watcher"), index_); pending_watcher_ = watcher.get(); subchannel_->WatchConnectivityState(std::move(watcher)); } @@ -639,7 +640,7 @@ void PickFirst::SubchannelList::SubchannelData::ShutdownLocked() { gpr_log(GPR_INFO, "[PF %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR " (subchannel %p): cancelling watch and unreffing subchannel", - subchannel_list_->policy_.get(), subchannel_list_, Index(), + subchannel_list_->policy_.get(), subchannel_list_, index_, subchannel_list_->size(), subchannel_.get()); } subchannel_->CancelConnectivityStateWatch(pending_watcher_); @@ -659,7 +660,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( "status=%s, shutting_down=%d, pending_watcher=%p, " "seen_transient_failure=%d, p->selected_=%p, " "p->subchannel_list_=%p, p->latest_pending_subchannel_list_=%p", - p, subchannel_list_, Index(), subchannel_list_->size(), + p, subchannel_list_, index_, subchannel_list_->size(), subchannel_.get(), (connectivity_state_.has_value() ? ConnectivityStateName(*connectivity_state_) @@ -771,7 +772,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( if (!IsPickFirstHappyEyeballsEnabled()) { // Ignore any other updates for subchannels we're not currently trying to // connect to. - if (Index() != subchannel_list_->attempting_index_) return; + if (index_ != subchannel_list_->attempting_index_) return; // React to the connectivity state. ReactToConnectivityStateLocked(); return; @@ -784,7 +785,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( if (!prev_seen_transient_failure && seen_transient_failure_) { // If a connection attempt fails before the timer fires, then // cancel the timer and start connecting on the next subchannel. - if (Index() == subchannel_list_->attempting_index_) { + if (index_ == subchannel_list_->attempting_index_) { if (subchannel_list_->timer_handle_.has_value()) { p->channel_control_helper()->GetEventEngine()->Cancel( *subchannel_list_->timer_handle_); @@ -858,7 +859,7 @@ void PickFirst::SubchannelList::SubchannelData:: // We skip subchannels in state TRANSIENT_FAILURE to avoid a // large recursion that could overflow the stack. SubchannelData* found_subchannel = nullptr; - for (size_t next_index = Index() + 1; + for (size_t next_index = index_ + 1; next_index < subchannel_list_->size(); ++next_index) { SubchannelData* sc = &subchannel_list_->subchannels_[next_index]; GPR_ASSERT(sc->connectivity_state_.has_value()); @@ -946,14 +947,14 @@ void PickFirst::SubchannelList::SubchannelData::RequestConnectionWithTimer() { GPR_ASSERT(connectivity_state_ == GRPC_CHANNEL_CONNECTING); } // If this is not the last subchannel in the list, start the timer. - if (Index() != subchannel_list_->size() - 1) { + if (index_ != subchannel_list_->size() - 1) { PickFirst* p = subchannel_list_->policy_.get(); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, "Pick First %p subchannel list %p: starting Connection " "Attempt Delay timer for %" PRId64 "ms for index %" PRIuPTR, p, subchannel_list_, p->connection_attempt_delay_.millis(), - Index()); + index_); } subchannel_list_->timer_handle_ = p->channel_control_helper()->GetEventEngine()->RunAfter( @@ -1041,7 +1042,7 @@ void PickFirst::SubchannelList::SubchannelData::ProcessUnselectedReadyLocked() { } // Unref all other subchannels in the list. for (size_t i = 0; i < subchannel_list_->size(); ++i) { - if (i != Index()) { + if (i != index_) { subchannel_list_->subchannels_[i].ShutdownLocked(); } } @@ -1052,7 +1053,7 @@ void PickFirst::SubchannelList::SubchannelData::ProcessUnselectedReadyLocked() { // PickFirst::SubchannelList::SubchannelList(RefCountedPtr policy, - EndpointAddressesList addresses, + EndpointAddressesIterator* addresses, const ChannelArgs& args) : InternallyRefCounted( GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) ? "SubchannelList" @@ -1062,14 +1063,12 @@ PickFirst::SubchannelList::SubchannelList(RefCountedPtr policy, .Remove( GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, - "[PF %p] Creating subchannel list %p for %" PRIuPTR - " subchannels - channel args: %s", - policy_.get(), this, addresses.size(), args_.ToString().c_str()); + gpr_log(GPR_INFO, "[PF %p] Creating subchannel list %p - channel args: %s", + policy_.get(), this, args_.ToString().c_str()); } - subchannels_.reserve(addresses.size()); + if (addresses == nullptr) return; // Create a subchannel for each address. - for (const EndpointAddresses& address : addresses) { + addresses->ForEach([&](const EndpointAddresses& address) { GPR_ASSERT(address.addresses().size() == 1); RefCountedPtr subchannel = policy_->channel_control_helper()->CreateSubchannel( @@ -1081,7 +1080,7 @@ PickFirst::SubchannelList::SubchannelList(RefCountedPtr policy, "[PF %p] could not create subchannel for address %s, ignoring", policy_.get(), address.ToString().c_str()); } - continue; + return; } if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, @@ -1090,8 +1089,8 @@ PickFirst::SubchannelList::SubchannelList(RefCountedPtr policy, policy_.get(), this, subchannels_.size(), subchannel.get(), address.ToString().c_str()); } - subchannels_.emplace_back(this, std::move(subchannel)); - } + subchannels_.emplace_back(this, subchannels_.size(), std::move(subchannel)); + }); } PickFirst::SubchannelList::~SubchannelList() { diff --git a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc index e1e21072bc0..cdfe6adceaf 100644 --- a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc +++ b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc @@ -684,7 +684,8 @@ absl::Status PriorityLb::ChildPriority::UpdateLocked( if (priority_policy_->addresses_.ok()) { auto it = priority_policy_->addresses_->find(name_); if (it == priority_policy_->addresses_->end()) { - update_args.addresses.emplace(); + update_args.addresses = std::make_shared( + EndpointAddressesList()); } else { update_args.addresses = it->second; } diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc index 5881f168c86..c1c52c7de5f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc +++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc @@ -554,7 +554,8 @@ void RingHash::RingHashEndpoint::UpdateChildPolicyLocked() { GPR_ASSERT(config.ok()); // Update child policy. LoadBalancingPolicy::UpdateArgs update_args; - update_args.addresses.emplace().emplace_back(ring_hash_->endpoints_[index_]); + update_args.addresses = + std::make_shared(ring_hash_->endpoints_[index_]); update_args.args = ring_hash_->args_; update_args.config = std::move(*config); // TODO(roth): If the child reports a non-OK status with the update, @@ -622,18 +623,14 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) { // Check address list. if (args.addresses.ok()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { - gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses", - this, args.addresses->size()); + gpr_log(GPR_INFO, "[RH %p] received update", this); } // De-dup endpoints, taking weight into account. endpoints_.clear(); - endpoints_.reserve(args.addresses->size()); std::map endpoint_indices; - size_t num_skipped = 0; - for (size_t i = 0; i < args.addresses->size(); ++i) { - EndpointAddresses& endpoint = (*args.addresses)[i]; + (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) { const EndpointAddressSet key(endpoint.addresses()); - auto p = endpoint_indices.emplace(key, i - num_skipped); + auto p = endpoint_indices.emplace(key, endpoints_.size()); if (!p.second) { // Duplicate endpoint. Combine weights and skip the dup. EndpointAddresses& prev_endpoint = endpoints_[p.first->second]; @@ -651,11 +648,10 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) { prev_endpoint.addresses(), prev_endpoint.args().Set(GRPC_ARG_ADDRESS_WEIGHT, weight_arg + prev_weight_arg)); - ++num_skipped; } else { - endpoints_.push_back(std::move(endpoint)); + endpoints_.push_back(endpoint); } - } + }); } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s", diff --git a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc index f51ef89a0be..1521fafcfc4 100644 --- a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc +++ b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc @@ -707,7 +707,7 @@ class RlsLb : public LoadBalancingPolicy { OrphanablePtr rls_channel_ ABSL_GUARDED_BY(mu_); // Accessed only from within WorkSerializer. - absl::StatusOr addresses_; + absl::StatusOr> addresses_; ChannelArgs channel_args_; RefCountedPtr config_; RefCountedPtr default_child_policy_; @@ -1858,6 +1858,27 @@ RlsLb::RlsLb(Args args) : LoadBalancingPolicy(std::move(args)), cache_(this) { } } +bool EndpointsEqual( + const absl::StatusOr> endpoints1, + const absl::StatusOr> + endpoints2) { + if (endpoints1.status() != endpoints2.status()) return false; + if (endpoints1.ok()) { + std::vector e1_list; + (*endpoints1)->ForEach([&](const EndpointAddresses& endpoint) { + e1_list.push_back(endpoint); + }); + size_t i = 0; + bool different = false; + (*endpoints2)->ForEach([&](const EndpointAddresses& endpoint) { + if (endpoint != e1_list[i++]) different = true; + }); + if (different) return false; + if (i != e1_list.size()) return false; + } + return true; +} + absl::Status RlsLb::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) { gpr_log(GPR_INFO, "[rlslb %p] policy updated", this); @@ -1875,7 +1896,7 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) { // Swap out addresses. // If the new address list is an error and we have an existing address list, // stick with the existing addresses. - absl::StatusOr old_addresses; + absl::StatusOr> old_addresses; if (args.addresses.ok()) { old_addresses = std::move(addresses_); addresses_ = std::move(args.addresses); @@ -1888,7 +1909,7 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) { bool update_child_policies = old_config == nullptr || old_config->child_policy_config() != config_->child_policy_config() || - old_addresses != addresses_ || args.args != channel_args_; + !EndpointsEqual(old_addresses, addresses_) || args.args != channel_args_; // If default target changes, swap out child policy. bool created_default_child = false; if (old_config == nullptr || diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 51e89c8e5b3..1c893fb1dcb 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -125,14 +125,14 @@ class OldRoundRobin : public LoadBalancingPolicy { : public SubchannelList { public: - RoundRobinSubchannelList(OldRoundRobin* policy, ServerAddressList addresses, + RoundRobinSubchannelList(OldRoundRobin* policy, + EndpointAddressesIterator* addresses, const ChannelArgs& args) : SubchannelList(policy, (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) ? "RoundRobinSubchannelList" : nullptr), - std::move(addresses), policy->channel_control_helper(), - args) { + addresses, policy->channel_control_helper(), args) { // Need to maintain a ref to the LB policy as long as we maintain // any references to subchannels, since the subchannels' // pollset_sets will include the LB policy's pollset_set. @@ -277,13 +277,12 @@ void OldRoundRobin::ResetBackoffLocked() { } absl::Status OldRoundRobin::UpdateLocked(UpdateArgs args) { - ServerAddressList addresses; + EndpointAddressesIterator* addresses = nullptr; if (args.addresses.ok()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses", - this, args.addresses->size()); + gpr_log(GPR_INFO, "[RR %p] received update", this); } - addresses = std::move(*args.addresses); + addresses = args.addresses->get(); } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this, @@ -299,8 +298,8 @@ absl::Status OldRoundRobin::UpdateLocked(UpdateArgs args) { gpr_log(GPR_INFO, "[RR %p] replacing previous pending subchannel list %p", this, latest_pending_subchannel_list_.get()); } - latest_pending_subchannel_list_ = MakeRefCounted( - this, std::move(addresses), args.args); + latest_pending_subchannel_list_ = + MakeRefCounted(this, addresses, args.args); latest_pending_subchannel_list_->StartWatchingLocked(args.args); // If the new list is empty, immediately promote it to // subchannel_list_ and report TRANSIENT_FAILURE. @@ -524,7 +523,7 @@ class RoundRobin : public LoadBalancingPolicy { class RoundRobinEndpointList : public EndpointList { public: RoundRobinEndpointList(RefCountedPtr round_robin, - const EndpointAddressesList& endpoints, + EndpointAddressesIterator* endpoints, const ChannelArgs& args) : EndpointList(std::move(round_robin), GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) @@ -687,13 +686,12 @@ void RoundRobin::ResetBackoffLocked() { } absl::Status RoundRobin::UpdateLocked(UpdateArgs args) { - EndpointAddressesList addresses; + EndpointAddressesIterator* addresses = nullptr; if (args.addresses.ok()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " endpoints", - this, args.addresses->size()); + gpr_log(GPR_INFO, "[RR %p] received update", this); } - addresses = std::move(*args.addresses); + addresses = args.addresses->get(); } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this, @@ -710,8 +708,7 @@ absl::Status RoundRobin::UpdateLocked(UpdateArgs args) { latest_pending_endpoint_list_.get()); } latest_pending_endpoint_list_ = MakeOrphanable( - Ref(DEBUG_LOCATION, "RoundRobinEndpointList"), std::move(addresses), - args.args); + Ref(DEBUG_LOCATION, "RoundRobinEndpointList"), addresses, args.args); // If the new list is empty, immediately promote it to // endpoint_list_ and report TRANSIENT_FAILURE. if (latest_pending_endpoint_list_->size() == 0) { diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index cb52ad92daa..b5076a9f492 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -42,6 +42,7 @@ #include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/load_balancing/lb_policy.h" #include "src/core/lib/load_balancing/subchannel_interface.h" +#include "src/core/lib/resolver/endpoint_addresses.h" #include "src/core/lib/resolver/server_address.h" #include "src/core/lib/transport/connectivity_state.h" @@ -208,7 +209,7 @@ class SubchannelList : public DualRefCounted { protected: SubchannelList(LoadBalancingPolicy* policy, const char* tracer, - ServerAddressList addresses, + EndpointAddressesIterator* addresses, LoadBalancingPolicy::ChannelControlHelper* helper, const ChannelArgs& args); @@ -365,19 +366,18 @@ void SubchannelData::ShutdownLocked() { template SubchannelList::SubchannelList( LoadBalancingPolicy* policy, const char* tracer, - ServerAddressList addresses, + EndpointAddressesIterator* addresses, LoadBalancingPolicy::ChannelControlHelper* helper, const ChannelArgs& args) : DualRefCounted(tracer), policy_(policy), tracer_(tracer) { if (GPR_UNLIKELY(tracer_ != nullptr)) { - gpr_log(GPR_INFO, - "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", - tracer_, policy, this, addresses.size()); + gpr_log(GPR_INFO, "[%s %p] Creating subchannel list %p", tracer_, policy, + this); } - subchannels_.reserve(addresses.size()); + if (addresses == nullptr) return; // Create a subchannel for each address. - for (ServerAddress address : addresses) { + addresses->ForEach([&](const EndpointAddresses& address) { RefCountedPtr subchannel = helper->CreateSubchannel(address.address(), address.args(), args); if (subchannel == nullptr) { @@ -387,7 +387,7 @@ SubchannelList::SubchannelList( "[%s %p] could not create subchannel for address %s, ignoring", tracer_, policy_, address.ToString().c_str()); } - continue; + return; } if (GPR_UNLIKELY(tracer_ != nullptr)) { gpr_log(GPR_INFO, @@ -397,8 +397,8 @@ SubchannelList::SubchannelList( address.ToString().c_str()); } subchannels_.emplace_back(); - subchannels_.back().Init(this, std::move(address), std::move(subchannel)); - } + subchannels_.back().Init(this, address, std::move(subchannel)); + }); } template diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc index 102cd6aa7cf..aa2c6f5bb66 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc @@ -247,14 +247,13 @@ class OldWeightedRoundRobin : public LoadBalancingPolicy { WeightedRoundRobinSubchannelData> { public: WeightedRoundRobinSubchannelList(OldWeightedRoundRobin* policy, - ServerAddressList addresses, + EndpointAddressesIterator* addresses, const ChannelArgs& args) : SubchannelList(policy, (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) ? "WeightedRoundRobinSubchannelList" : nullptr), - std::move(addresses), policy->channel_control_helper(), - args) { + addresses, policy->channel_control_helper(), args) { // Need to maintain a ref to the LB policy as long as we maintain // any references to subchannels, since the subchannels' // pollset_sets will include the LB policy's pollset_set. @@ -675,11 +674,10 @@ void OldWeightedRoundRobin::ResetBackoffLocked() { absl::Status OldWeightedRoundRobin::UpdateLocked(UpdateArgs args) { global_stats().IncrementWrrUpdates(); config_ = std::move(args.config); - ServerAddressList addresses; + std::shared_ptr addresses; if (args.addresses.ok()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { - gpr_log(GPR_INFO, "[WRR %p] received update with %" PRIuPTR " addresses", - this, args.addresses->size()); + gpr_log(GPR_INFO, "[WRR %p] received update", this); } // Weed out duplicate addresses. Also sort the addresses so that if // the set of the addresses don't change, their indexes in the @@ -698,10 +696,12 @@ absl::Status OldWeightedRoundRobin::UpdateLocked(UpdateArgs args) { return memcmp(addr1.addr, addr2.addr, addr1.len) < 0; } }; - std::set ordered_addresses( - args.addresses->begin(), args.addresses->end()); - addresses = - ServerAddressList(ordered_addresses.begin(), ordered_addresses.end()); + std::set ordered_addresses; + (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) { + ordered_addresses.insert(endpoint); + }); + addresses = std::make_shared( + ServerAddressList(ordered_addresses.begin(), ordered_addresses.end())); } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] received update with address error: %s", this, @@ -718,8 +718,8 @@ absl::Status OldWeightedRoundRobin::UpdateLocked(UpdateArgs args) { this, latest_pending_subchannel_list_.get()); } latest_pending_subchannel_list_ = - MakeRefCounted( - this, std::move(addresses), args.args); + MakeRefCounted(this, addresses.get(), + args.args); latest_pending_subchannel_list_->StartWatchingLocked(args.args); // If the new list is empty, immediately promote it to // subchannel_list_ and report TRANSIENT_FAILURE. @@ -1079,7 +1079,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy { }; WrrEndpointList(RefCountedPtr wrr, - const EndpointAddressesList& endpoints, + EndpointAddressesIterator* endpoints, const ChannelArgs& args) : EndpointList(std::move(wrr), GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) @@ -1516,11 +1516,10 @@ void WeightedRoundRobin::ResetBackoffLocked() { absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) { global_stats().IncrementWrrUpdates(); config_ = std::move(args.config); - EndpointAddressesList addresses; + std::shared_ptr addresses; if (args.addresses.ok()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { - gpr_log(GPR_INFO, "[WRR %p] received update with %" PRIuPTR " addresses", - this, args.addresses->size()); + gpr_log(GPR_INFO, "[WRR %p] received update", this); } // Weed out duplicate endpoints. Also sort the endpoints so that if // the set of endpoints doesn't change, their indexes in the endpoint @@ -1539,10 +1538,13 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) { return e1 < e2; } }; - std::set ordered_addresses( - args.addresses->begin(), args.addresses->end()); - addresses = EndpointAddressesList(ordered_addresses.begin(), - ordered_addresses.end()); + std::set ordered_addresses; + (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) { + ordered_addresses.insert(endpoint); + }); + addresses = + std::make_shared(EndpointAddressesList( + ordered_addresses.begin(), ordered_addresses.end())); } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] received update with address error: %s", this, @@ -1559,7 +1561,7 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) { this, latest_pending_endpoint_list_.get()); } latest_pending_endpoint_list_ = - MakeOrphanable(Ref(), std::move(addresses), args.args); + MakeOrphanable(Ref(), addresses.get(), args.args); // If the new list is empty, immediately promote it to // endpoint_list_ and report TRANSIENT_FAILURE. if (latest_pending_endpoint_list_->size() == 0) { diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc index eecd112445a..0639ce9b094 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc @@ -157,10 +157,10 @@ class WeightedTargetLb : public LoadBalancingPolicy { void Orphan() override; - absl::Status UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config, - absl::StatusOr addresses, - const std::string& resolution_note, - const ChannelArgs& args); + absl::Status UpdateLocked( + const WeightedTargetLbConfig::ChildConfig& config, + absl::StatusOr> addresses, + const std::string& resolution_note, const ChannelArgs& args); void ResetBackoffLocked(); void DeactivateLocked(); @@ -338,11 +338,12 @@ absl::Status WeightedTargetLb::UpdateLocked(UpdateArgs args) { target = MakeOrphanable( Ref(DEBUG_LOCATION, "WeightedChild"), name); } - absl::StatusOr addresses; + absl::StatusOr> addresses; if (address_map.ok()) { auto it = address_map->find(name); if (it == address_map->end()) { - addresses.emplace(); + addresses = std::make_shared( + EndpointAddressesList()); } else { addresses = std::move(it->second); } @@ -589,7 +590,7 @@ WeightedTargetLb::WeightedChild::CreateChildPolicyLocked( absl::Status WeightedTargetLb::WeightedChild::UpdateLocked( const WeightedTargetLbConfig::ChildConfig& config, - absl::StatusOr addresses, + absl::StatusOr> addresses, const std::string& resolution_note, const ChannelArgs& args) { if (weighted_target_policy_->shutting_down_) return absl::OkStatus(); // Update child weight. diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc index 50740151c9d..8548de565d6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc @@ -250,7 +250,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy { OrphanablePtr CreateChildPolicyLocked( const ChannelArgs& args); absl::Status UpdateChildPolicyLocked( - absl::StatusOr addresses, + absl::StatusOr> addresses, std::string resolution_note, const ChannelArgs& args); void MaybeUpdatePickerLocked(); @@ -569,7 +569,7 @@ OrphanablePtr XdsClusterImplLb::CreateChildPolicyLocked( } absl::Status XdsClusterImplLb::UpdateChildPolicyLocked( - absl::StatusOr addresses, + absl::StatusOr> addresses, std::string resolution_note, const ChannelArgs& args) { // Create policy if needed. if (child_policy_ == nullptr) { diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc index 4f6e8611b5a..e613b3eb0d2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc @@ -149,7 +149,8 @@ class XdsClusterManagerLb : public LoadBalancingPolicy { absl::Status UpdateLocked( RefCountedPtr config, - const absl::StatusOr& addresses, + const absl::StatusOr>& + addresses, const ChannelArgs& args); void ExitIdleLocked(); void ResetBackoffLocked(); @@ -482,7 +483,7 @@ XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked( absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked( RefCountedPtr config, - const absl::StatusOr& addresses, + const absl::StatusOr>& addresses, const ChannelArgs& args) { if (xds_cluster_manager_policy_->shutting_down_) return absl::OkStatus(); // Update child weight. diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc index 42f8673e724..6a58213347a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc @@ -28,6 +28,7 @@ #include #include +#include "absl/functional/function_ref.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" @@ -61,6 +62,7 @@ #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/iomgr/resolved_address.h" #include "src/core/lib/json/json.h" #include "src/core/lib/json/json_args.h" #include "src/core/lib/json/json_object_loader.h" @@ -390,7 +392,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy { absl::Status UpdateChildPolicyLocked(); OrphanablePtr CreateChildPolicyLocked( const ChannelArgs& args); - EndpointAddressesList CreateChildPolicyAddressesLocked(); + std::shared_ptr CreateChildPolicyAddressesLocked(); std::string CreateChildPolicyResolutionNoteLocked(); RefCountedPtr CreateChildPolicyConfigLocked(); ChannelArgs CreateChildPolicyArgsLocked(const ChannelArgs& args_in); @@ -529,10 +531,16 @@ XdsClusterResolverLb::DiscoveryMechanismEntry::config() const { ->config_->discovery_mechanisms()[discovery_mechanism->index()]; } +std::string MakeChildPolicyName(absl::string_view cluster_name, + size_t child_number) { + return absl::StrCat("{cluster=", cluster_name, + ", child_number=", child_number, "}"); +} + std::string XdsClusterResolverLb::DiscoveryMechanismEntry::GetChildPolicyName( size_t priority) const { - return absl::StrCat("{cluster=", config().cluster_name, - ", child_number=", priority_child_numbers[priority], "}"); + return MakeChildPolicyName(config().cluster_name, + priority_child_numbers[priority]); } // @@ -768,39 +776,76 @@ void XdsClusterResolverLb::OnResourceDoesNotExist(size_t index, // child policy-related methods // -EndpointAddressesList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() { - EndpointAddressesList addresses; - for (const auto& discovery_entry : discovery_mechanisms_) { - const auto& priority_list = - GetUpdatePriorityList(*discovery_entry.latest_update); - for (size_t priority = 0; priority < priority_list.size(); ++priority) { - const auto& priority_entry = priority_list[priority]; - std::string priority_child_name = - discovery_entry.GetChildPolicyName(priority); - for (const auto& p : priority_entry.localities) { - const auto& locality_name = p.first; - const auto& locality = p.second; - std::vector hierarchical_path = { - RefCountedStringValue(priority_child_name), - RefCountedStringValue(locality_name->AsHumanReadableString())}; - auto hierarchical_path_attr = - MakeRefCounted(std::move(hierarchical_path)); - for (const auto& endpoint : locality.endpoints) { - uint32_t endpoint_weight = - locality.lb_weight * - endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1); - addresses.emplace_back( - endpoint.addresses(), - endpoint.args() - .SetObject(hierarchical_path_attr) - .Set(GRPC_ARG_ADDRESS_WEIGHT, endpoint_weight) - .SetObject(locality_name->Ref()) - .Set(GRPC_ARG_XDS_LOCALITY_WEIGHT, locality.lb_weight)); +class PriorityEndpointIterator : public EndpointAddressesIterator { + public: + struct DiscoveryMechanismResult { + std::shared_ptr update; + std::string cluster_name; + std::vector priority_child_numbers; + + DiscoveryMechanismResult( + std::shared_ptr resource, + std::string cluster, std::vector child_numbers) + : update(std::move(resource)), + cluster_name(std::move(cluster)), + priority_child_numbers(std::move(child_numbers)) {} + + std::string GetChildPolicyName(size_t priority) const { + return MakeChildPolicyName(cluster_name, + priority_child_numbers[priority]); + } + }; + + explicit PriorityEndpointIterator( + std::vector results) + : results_(std::move(results)) {} + + void ForEach(absl::FunctionRef callback) + const override { + for (const auto& entry : results_) { + const auto& priority_list = GetUpdatePriorityList(*entry.update); + for (size_t priority = 0; priority < priority_list.size(); ++priority) { + const auto& priority_entry = priority_list[priority]; + std::string priority_child_name = entry.GetChildPolicyName(priority); + for (const auto& p : priority_entry.localities) { + const auto& locality_name = p.first; + const auto& locality = p.second; + std::vector hierarchical_path = { + RefCountedStringValue(priority_child_name), + RefCountedStringValue(locality_name->AsHumanReadableString())}; + auto hierarchical_path_attr = + MakeRefCounted(std::move(hierarchical_path)); + for (const auto& endpoint : locality.endpoints) { + uint32_t endpoint_weight = + locality.lb_weight * + endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1); + callback(EndpointAddresses( + endpoint.addresses(), + endpoint.args() + .SetObject(hierarchical_path_attr) + .Set(GRPC_ARG_ADDRESS_WEIGHT, endpoint_weight) + .SetObject(locality_name->Ref()) + .Set(GRPC_ARG_XDS_LOCALITY_WEIGHT, locality.lb_weight))); + } } } } } - return addresses; + + private: + std::vector results_; +}; + +std::shared_ptr +XdsClusterResolverLb::CreateChildPolicyAddressesLocked() { + std::vector entries; + entries.reserve(discovery_mechanisms_.size()); + for (const auto& discovery_entry : discovery_mechanisms_) { + entries.emplace_back(discovery_entry.latest_update, + discovery_entry.config().cluster_name, + discovery_entry.priority_child_numbers); + } + return std::make_shared(std::move(entries)); } std::string XdsClusterResolverLb::CreateChildPolicyResolutionNoteLocked() { diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc index 7a1c6792a6c..c42219b4c42 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc @@ -18,7 +18,6 @@ #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h" -#include #include #include @@ -34,6 +33,7 @@ #include #include "absl/base/thread_annotations.h" +#include "absl/functional/function_ref.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" @@ -300,8 +300,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy { void MaybeUpdatePickerLocked(); - absl::StatusOr UpdateAddressMap( - absl::StatusOr endpoints); + void UpdateAddressMap(const EndpointAddressesIterator& endpoints); RefCountedPtr AdoptSubchannel( const grpc_resolved_address& address, @@ -508,12 +507,36 @@ void XdsOverrideHostLb::ResetBackoffLocked() { if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); } +// Wraps the endpoint iterator and filters out endpoints in state DRAINING. +class ChildEndpointIterator : public EndpointAddressesIterator { + public: + explicit ChildEndpointIterator( + std::shared_ptr parent_it) + : parent_it_(std::move(parent_it)) {} + + void ForEach(absl::FunctionRef callback) + const override { + parent_it_->ForEach([&](const EndpointAddresses& endpoint) { + XdsHealthStatus status = GetEndpointHealthStatus(endpoint); + if (status.status() != XdsHealthStatus::kDraining) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, + "[xds_override_host_lb %p] endpoint %s: not draining, " + "passing to child", + this, endpoint.ToString().c_str()); + } + callback(endpoint); + } + }); + } + + private: + std::shared_ptr parent_it_; +}; + absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { - gpr_log(GPR_INFO, - "[xds_override_host_lb %p] Received update with %" PRIuPTR - " addresses", - this, args.addresses.ok() ? args.addresses->size() : 0); + gpr_log(GPR_INFO, "[xds_override_host_lb %p] Received update", this); } auto old_config = std::move(config_); // Update config. @@ -521,13 +544,24 @@ absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) { if (config_ == nullptr) { return absl::InvalidArgumentError("Missing policy config"); } + // Update address map and wrap endpoint iterator for child policy. + if (args.addresses.ok()) { + UpdateAddressMap(**args.addresses); + args.addresses = + std::make_shared(std::move(*args.addresses)); + } else { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { + gpr_log(GPR_INFO, "[xds_override_host_lb %p] address error: %s", this, + args.addresses.status().ToString().c_str()); + } + } // Create child policy if needed. if (child_policy_ == nullptr) { child_policy_ = CreateChildPolicyLocked(args.args); } // Update child policy. UpdateArgs update_args; - update_args.addresses = UpdateAddressMap(std::move(args.addresses)); + update_args.addresses = std::move(args.addresses); update_args.resolution_note = std::move(args.resolution_note); update_args.config = config_->child_config(); update_args.args = std::move(args.args); @@ -578,18 +612,9 @@ OrphanablePtr XdsOverrideHostLb::CreateChildPolicyLocked( return lb_policy; } -absl::StatusOr XdsOverrideHostLb::UpdateAddressMap( - absl::StatusOr endpoints) { - if (!endpoints.ok()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { - gpr_log(GPR_INFO, "[xds_override_host_lb %p] address error: %s", this, - endpoints.status().ToString().c_str()); - } - return endpoints; - } - // Construct the list of addresses to pass to the child policy and a - // map of address info from which to update subchannel_map_. - EndpointAddressesList child_addresses; +void XdsOverrideHostLb::UpdateAddressMap( + const EndpointAddressesIterator& endpoints) { + // Construct a map of address info from which to update subchannel_map_. struct AddressInfo { XdsHealthStatus eds_health_status; RefCountedStringValue address_list; @@ -597,25 +622,18 @@ absl::StatusOr XdsOverrideHostLb::UpdateAddressMap( : eds_health_status(status), address_list(std::move(addresses)) {} }; std::map addresses_for_map; - for (const auto& endpoint : *endpoints) { + endpoints.ForEach([&](const EndpointAddresses& endpoint) { XdsHealthStatus status = GetEndpointHealthStatus(endpoint); - if (status.status() != XdsHealthStatus::kDraining) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { - gpr_log(GPR_INFO, - "[xds_override_host_lb %p] endpoint %s: not draining, " - "passing to child", - this, endpoint.ToString().c_str()); - } - child_addresses.push_back(endpoint); - } else if (!config_->override_host_status_set().Contains(status)) { - // Skip draining hosts if not in the override status set. + // Skip draining hosts if not in the override status set. + if (status.status() == XdsHealthStatus::kDraining && + !config_->override_host_status_set().Contains(status)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) { gpr_log(GPR_INFO, "[xds_override_host_lb %p] endpoint %s: draining but not in " "override_host_status set -- ignoring", this, endpoint.ToString().c_str()); } - continue; + return; } std::vector addresses; addresses.reserve(endpoint.addresses().size()); @@ -641,7 +659,7 @@ absl::StatusOr XdsOverrideHostLb::UpdateAddressMap( std::piecewise_construct, std::forward_as_tuple(addresses[i]), std::forward_as_tuple(status, std::move(address_list))); } - } + }); // Now grab the lock and update subchannel_map_ from addresses_for_map. { MutexLock lock(&subchannel_map_mu_); @@ -688,7 +706,6 @@ absl::StatusOr XdsOverrideHostLb::UpdateAddressMap( it->second.set_address_list(std::move(address_info.address_list)); } } - return child_addresses; } RefCountedPtr diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc index 26f0ec9084a..2d346217e47 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc @@ -21,7 +21,6 @@ #include #include #include -#include #include "absl/status/status.h" #include "absl/status/statusor.h" @@ -170,10 +169,10 @@ absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) { // Scan the addresses to find the weight for each locality. std::map locality_weights; if (args.addresses.ok()) { - for (const auto& address : *args.addresses) { - auto* locality_name = address.args().GetObject(); + (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) { + auto* locality_name = endpoint.args().GetObject(); uint32_t weight = - address.args().GetInt(GRPC_ARG_XDS_LOCALITY_WEIGHT).value_or(0); + endpoint.args().GetInt(GRPC_ARG_XDS_LOCALITY_WEIGHT).value_or(0); if (locality_name != nullptr && weight > 0) { auto p = locality_weights.emplace( locality_name->AsHumanReadableString(), weight); @@ -184,7 +183,7 @@ absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) { p.first->first.c_str(), p.first->second, weight); } } - } + }); } // Construct the config for the weighted_target policy. Json::Object weighted_targets; diff --git a/src/core/lib/gprpp/ref_counted_string.h b/src/core/lib/gprpp/ref_counted_string.h index 62fb15ded98..dbe32b113da 100644 --- a/src/core/lib/gprpp/ref_counted_string.h +++ b/src/core/lib/gprpp/ref_counted_string.h @@ -104,6 +104,19 @@ inline bool operator==(const RefCountedStringValue& lhs, return lhs.as_string_view() == rhs.as_string_view(); } +inline bool operator!=(const RefCountedStringValue& lhs, + absl::string_view rhs) { + return lhs.as_string_view() != rhs; +} +inline bool operator!=(absl::string_view lhs, + const RefCountedStringValue& rhs) { + return lhs != rhs.as_string_view(); +} +inline bool operator!=(const RefCountedStringValue& lhs, + const RefCountedStringValue& rhs) { + return lhs.as_string_view() != rhs.as_string_view(); +} + inline bool operator<(const RefCountedStringValue& lhs, absl::string_view rhs) { return lhs.as_string_view() < rhs; } diff --git a/src/core/lib/load_balancing/lb_policy.h b/src/core/lib/load_balancing/lb_policy.h index 2422339274e..a36280c163b 100644 --- a/src/core/lib/load_balancing/lb_policy.h +++ b/src/core/lib/load_balancing/lb_policy.h @@ -346,7 +346,7 @@ class LoadBalancingPolicy : public InternallyRefCounted { struct UpdateArgs { /// A list of endpoints, each with one or more address, or an error /// indicating a failure to obtain the list of addresses. - absl::StatusOr addresses; + absl::StatusOr> addresses; /// The LB policy config. RefCountedPtr config; /// A human-readable note providing context about the name resolution that diff --git a/src/core/lib/resolver/endpoint_addresses.cc b/src/core/lib/resolver/endpoint_addresses.cc index 7995163ad5d..7b5f4c0a329 100644 --- a/src/core/lib/resolver/endpoint_addresses.cc +++ b/src/core/lib/resolver/endpoint_addresses.cc @@ -22,7 +22,6 @@ #include -#include #include #include #include diff --git a/src/core/lib/resolver/endpoint_addresses.h b/src/core/lib/resolver/endpoint_addresses.h index 9d0d7c6edb8..5746df19c5b 100644 --- a/src/core/lib/resolver/endpoint_addresses.h +++ b/src/core/lib/resolver/endpoint_addresses.h @@ -23,8 +23,11 @@ #include #include +#include #include +#include "absl/functional/function_ref.h" + #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/resolved_address.h" @@ -64,6 +67,9 @@ class EndpointAddresses { bool operator==(const EndpointAddresses& other) const { return Cmp(other) == 0; } + bool operator!=(const EndpointAddresses& other) const { + return Cmp(other) != 0; + } bool operator<(const EndpointAddresses& other) const { return Cmp(other) < 0; } @@ -111,6 +117,48 @@ class EndpointAddressSet { std::set addresses_; }; +// An iterator interface for endpoints. +class EndpointAddressesIterator { + public: + virtual ~EndpointAddressesIterator() = default; + + // Invokes callback once for each endpoint. + virtual void ForEach( + absl::FunctionRef callback) const = 0; +}; + +// Iterator over a fixed list of endpoints. +class EndpointAddressesListIterator : public EndpointAddressesIterator { + public: + explicit EndpointAddressesListIterator(EndpointAddressesList endpoints) + : endpoints_(std::move(endpoints)) {} + + void ForEach(absl::FunctionRef callback) + const override { + for (const auto& endpoint : endpoints_) { + callback(endpoint); + } + } + + private: + EndpointAddressesList endpoints_; +}; + +// Iterator that returns only a single endpoint. +class SingleEndpointIterator : public EndpointAddressesIterator { + public: + explicit SingleEndpointIterator(EndpointAddresses endpoint) + : endpoint_(std::move(endpoint)) {} + + void ForEach(absl::FunctionRef callback) + const override { + callback(endpoint_); + } + + private: + EndpointAddresses endpoint_; +}; + } // namespace grpc_core #endif // GRPC_SRC_CORE_LIB_RESOLVER_ENDPOINT_ADDRESSES_H diff --git a/test/core/client_channel/lb_policy/lb_policy_test_lib.h b/test/core/client_channel/lb_policy/lb_policy_test_lib.h index 81365410c48..27d7f38285b 100644 --- a/test/core/client_channel/lb_policy/lb_policy_test_lib.h +++ b/test/core/client_channel/lb_policy/lb_policy_test_lib.h @@ -773,7 +773,8 @@ class LoadBalancingPolicyTest : public ::testing::Test { absl::Span endpoints, RefCountedPtr config) { LoadBalancingPolicy::UpdateArgs update; - update.addresses.emplace(endpoints.begin(), endpoints.end()); + update.addresses = std::make_shared( + EndpointAddressesList(endpoints.begin(), endpoints.end())); update.config = std::move(config); return update; } diff --git a/test/core/client_channel/lb_policy/xds_override_host_test.cc b/test/core/client_channel/lb_policy/xds_override_host_test.cc index f3bf82fc6be..7e9d45d7f63 100644 --- a/test/core/client_channel/lb_policy/xds_override_host_test.cc +++ b/test/core/client_channel/lb_policy/xds_override_host_test.cc @@ -99,11 +99,13 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest { "HEALTHY"}) { LoadBalancingPolicy::UpdateArgs update; update.config = MakeXdsOverrideHostConfig(override_host_status); - update.addresses.emplace(); + EndpointAddressesList endpoints; for (auto address_and_status : addresses_and_statuses) { - update.addresses->push_back(MakeAddressWithHealthStatus( + endpoints.push_back(MakeAddressWithHealthStatus( address_and_status.first, address_and_status.second)); } + update.addresses = + std::make_shared(std::move(endpoints)); EXPECT_EQ(ApplyUpdate(update, lb_policy()), absl::OkStatus()); } diff --git a/test/core/util/test_lb_policies.cc b/test/core/util/test_lb_policies.cc index 8fb255d65ed..f7e5abe6e2c 100644 --- a/test/core/util/test_lb_policies.cc +++ b/test/core/util/test_lb_policies.cc @@ -414,17 +414,19 @@ class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy { config->address().c_str()); auto uri = URI::Parse(config->address()); args.config.reset(); - args.addresses = EndpointAddressesList(); + EndpointAddressesList addresses; if (uri.ok()) { grpc_resolved_address address; GPR_ASSERT(grpc_parse_uri(*uri, &address)); - args.addresses->emplace_back(address, ChannelArgs()); + addresses.emplace_back(address, ChannelArgs()); } else { gpr_log(GPR_ERROR, "%s: could not parse URI (%s), using empty address list", kFixedAddressLbPolicyName, uri.status().ToString().c_str()); args.resolution_note = "no address in fixed_address_lb policy"; } + args.addresses = + std::make_shared(std::move(addresses)); return ForwardingLoadBalancingPolicy::UpdateLocked(std::move(args)); }