[LB policy API] pass address lists down via an iterator interface (#34753)

This avoids storing unnecessary copies of the address list in each node of the LB policy tree.

Closes #34753

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/34753 from markdroth:lb_address_list_iterator 1d39465fbc
PiperOrigin-RevId: 582891475
pull/34915/head^2
Mark D. Roth 1 year ago committed by Copybara-Service
parent 5f41fde4f2
commit fcdc9b4d29
  1. 1
      BUILD
  2. 9
      src/core/BUILD
  3. 7
      src/core/ext/filters/client_channel/client_channel.cc
  4. 75
      src/core/ext/filters/client_channel/lb_policy/address_filtering.cc
  5. 5
      src/core/ext/filters/client_channel/lb_policy/address_filtering.h
  6. 17
      src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc
  7. 8
      src/core/ext/filters/client_channel/lb_policy/endpoint_list.h
  8. 195
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  9. 5
      src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
  10. 167
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  11. 3
      src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
  12. 18
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  13. 27
      src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
  14. 29
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  15. 20
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  16. 44
      src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc
  17. 15
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  18. 4
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  19. 5
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
  20. 109
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  21. 85
      src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc
  22. 9
      src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc
  23. 13
      src/core/lib/gprpp/ref_counted_string.h
  24. 2
      src/core/lib/load_balancing/lb_policy.h
  25. 1
      src/core/lib/resolver/endpoint_addresses.cc
  26. 48
      src/core/lib/resolver/endpoint_addresses.h
  27. 3
      test/core/client_channel/lb_policy/lb_policy_test_lib.h
  28. 6
      test/core/client_channel/lb_policy/xds_override_host_test.cc
  29. 6
      test/core/util/test_lb_policies.cc

@ -2941,6 +2941,7 @@ grpc_cc_library(
"//src/core:lib/resolver/endpoint_addresses.h",
],
external_deps = [
"absl/functional:function_ref",
"absl/status",
"absl/status:statusor",
"absl/strings",

@ -4140,6 +4140,7 @@ grpc_cc_library(
external_deps = [
"absl/base:core_headers",
"absl/container:inlined_vector",
"absl/functional:function_ref",
"absl/status",
"absl/status:statusor",
"absl/strings",
@ -4667,6 +4668,7 @@ grpc_cc_library(
"ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc",
],
external_deps = [
"absl/functional:function_ref",
"absl/status",
"absl/status:statusor",
"absl/strings",
@ -4689,6 +4691,7 @@ grpc_cc_library(
"no_destruct",
"pollset_set",
"ref_counted_string",
"resolved_address",
"validation_errors",
"//:channel_arg_names",
"//:config",
@ -4837,6 +4840,7 @@ grpc_cc_library(
"ext/filters/client_channel/lb_policy/address_filtering.h",
],
external_deps = [
"absl/functional:function_ref",
"absl/status:statusor",
"absl/strings",
],
@ -4845,6 +4849,7 @@ grpc_cc_library(
"channel_args",
"ref_counted",
"ref_counted_string",
"resolved_address",
"//:endpoint_addresses",
"//:gpr_platform",
"//:ref_counted_ptr",
@ -4914,6 +4919,7 @@ grpc_cc_library(
"lb_policy",
"subchannel_interface",
"//:debug_location",
"//:endpoint_addresses",
"//:gpr",
"//:grpc_base",
"//:ref_counted_ptr",
@ -4931,7 +4937,7 @@ grpc_cc_library(
"ext/filters/client_channel/lb_policy/endpoint_list.h",
],
external_deps = [
"absl/functional:any_invocable",
"absl/functional:function_ref",
"absl/status",
"absl/status:statusor",
"absl/types:optional",
@ -5325,6 +5331,7 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/functional:function_ref",
"absl/status",
"absl/status:statusor",
"absl/strings",

@ -1599,7 +1599,12 @@ absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked(
Resolver::Result result) {
// Construct update.
LoadBalancingPolicy::UpdateArgs update_args;
update_args.addresses = std::move(result.addresses);
if (!result.addresses.ok()) {
update_args.addresses = result.addresses.status();
} else {
update_args.addresses = std::make_shared<EndpointAddressesListIterator>(
std::move(*result.addresses));
}
update_args.config = std::move(lb_policy_config);
update_args.resolution_note = std::move(result.resolution_note);
// Remove the config selector from channel args so that we're not holding

@ -20,11 +20,13 @@
#include <stddef.h>
#include <algorithm>
#include <utility>
#include "absl/functional/function_ref.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/resolved_address.h"
namespace grpc_core {
@ -43,32 +45,63 @@ int HierarchicalPathArg::ChannelArgsCompare(const HierarchicalPathArg* a,
return 0;
}
namespace {
class HierarchicalAddressIterator : public EndpointAddressesIterator {
public:
HierarchicalAddressIterator(
std::shared_ptr<EndpointAddressesIterator> parent_it,
RefCountedStringValue child_name)
: parent_it_(std::move(parent_it)), child_name_(std::move(child_name)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
RefCountedPtr<HierarchicalPathArg> remaining_path_attr;
parent_it_->ForEach([&](const EndpointAddresses& endpoint) {
const auto* path_arg = endpoint.args().GetObject<HierarchicalPathArg>();
if (path_arg == nullptr) return;
const std::vector<RefCountedStringValue>& path = path_arg->path();
auto it = path.begin();
if (it == path.end()) return;
if (*it != child_name_) return;
ChannelArgs args = endpoint.args();
++it;
if (it != path.end()) {
std::vector<RefCountedStringValue> remaining_path(it, path.end());
if (remaining_path_attr == nullptr ||
remaining_path_attr->path() != remaining_path) {
remaining_path_attr =
MakeRefCounted<HierarchicalPathArg>(std::move(remaining_path));
}
args = args.SetObject(remaining_path_attr);
}
callback(EndpointAddresses(endpoint.addresses(), args));
});
}
private:
std::shared_ptr<EndpointAddressesIterator> parent_it_;
RefCountedStringValue child_name_;
};
} // namespace
absl::StatusOr<HierarchicalAddressMap> MakeHierarchicalAddressMap(
const absl::StatusOr<EndpointAddressesList>& addresses) {
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses) {
if (!addresses.ok()) return addresses.status();
HierarchicalAddressMap result;
RefCountedPtr<HierarchicalPathArg> remaining_path_attr;
for (const EndpointAddresses& endpoint_addresses : *addresses) {
const auto* path_arg =
endpoint_addresses.args().GetObject<HierarchicalPathArg>();
if (path_arg == nullptr) continue;
(*addresses)->ForEach([&](const EndpointAddresses& endpoint) {
const auto* path_arg = endpoint.args().GetObject<HierarchicalPathArg>();
if (path_arg == nullptr) return;
const std::vector<RefCountedStringValue>& path = path_arg->path();
auto it = path.begin();
if (it == path.end()) continue;
EndpointAddressesList& target_list = result[*it];
ChannelArgs args = endpoint_addresses.args();
++it;
if (it != path.end()) {
std::vector<RefCountedStringValue> remaining_path(it, path.end());
if (remaining_path_attr == nullptr ||
remaining_path_attr->path() != remaining_path) {
remaining_path_attr =
MakeRefCounted<HierarchicalPathArg>(std::move(remaining_path));
}
args = args.SetObject(remaining_path_attr);
if (it == path.end()) return;
auto& target_list = result[*it];
if (target_list == nullptr) {
target_list =
std::make_shared<HierarchicalAddressIterator>(*addresses, *it);
}
target_list.emplace_back(endpoint_addresses.addresses(), args);
}
});
return result;
}

@ -20,6 +20,7 @@
#include <grpc/support/port_platform.h>
#include <map>
#include <memory>
#include <utility>
#include <vector>
@ -105,12 +106,12 @@ class HierarchicalPathArg : public RefCounted<HierarchicalPathArg> {
// A map from the next path element to the endpoint addresses that fall
// under that path element.
using HierarchicalAddressMap =
std::map<RefCountedStringValue, EndpointAddressesList,
std::map<RefCountedStringValue, std::shared_ptr<EndpointAddressesIterator>,
RefCountedStringValueLessThan>;
// Splits up the addresses into a separate list for each child.
absl::StatusOr<HierarchicalAddressMap> MakeHierarchicalAddressMap(
const absl::StatusOr<EndpointAddressesList>& addresses);
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses);
} // namespace grpc_core

@ -118,7 +118,7 @@ void EndpointList::Endpoint::Init(
GPR_ASSERT(config.ok());
// Update child policy.
LoadBalancingPolicy::UpdateArgs update_args;
update_args.addresses.emplace().emplace_back(addresses);
update_args.addresses = std::make_shared<SingleEndpointIterator>(addresses);
update_args.args = child_args;
update_args.config = std::move(*config);
// TODO(roth): If the child reports a non-OK status with the update,
@ -163,15 +163,16 @@ RefCountedPtr<SubchannelInterface> EndpointList::Endpoint::CreateSubchannel(
//
void EndpointList::Init(
const EndpointAddressesList& endpoints, const ChannelArgs& args,
absl::AnyInvocable<OrphanablePtr<Endpoint>(RefCountedPtr<EndpointList>,
const EndpointAddresses&,
const ChannelArgs&)>
EndpointAddressesIterator* endpoints, const ChannelArgs& args,
absl::FunctionRef<OrphanablePtr<Endpoint>(RefCountedPtr<EndpointList>,
const EndpointAddresses&,
const ChannelArgs&)>
create_endpoint) {
for (const EndpointAddresses& addresses : endpoints) {
if (endpoints == nullptr) return;
endpoints->ForEach([&](const EndpointAddresses& endpoint) {
endpoints_.push_back(
create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), addresses, args));
}
create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), endpoint, args));
});
}
void EndpointList::ResetBackoffLocked() {

@ -25,7 +25,7 @@
#include <utility>
#include <vector>
#include "absl/functional/any_invocable.h"
#include "absl/functional/function_ref.h"
#include "absl/status/status.h"
#include "absl/types/optional.h"
@ -53,7 +53,7 @@ namespace grpc_core {
class MyEndpointList : public EndpointList {
public:
MyEndpointList(RefCountedPtr<MyLbPolicy> lb_policy,
const EndpointAddressesList& endpoints,
EndpointAddressesIterator* endpoints,
const ChannelArgs& args)
: EndpointList(std::move(lb_policy),
GRPC_TRACE_FLAG_ENABLED(grpc_my_tracer)
@ -184,8 +184,8 @@ class EndpointList : public InternallyRefCounted<EndpointList> {
EndpointList(RefCountedPtr<LoadBalancingPolicy> policy, const char* tracer)
: policy_(std::move(policy)), tracer_(tracer) {}
void Init(const EndpointAddressesList& endpoints, const ChannelArgs& args,
absl::AnyInvocable<OrphanablePtr<Endpoint>(
void Init(EndpointAddressesIterator* endpoints, const ChannelArgs& args,
absl::FunctionRef<OrphanablePtr<Endpoint>(
RefCountedPtr<EndpointList>, const EndpointAddresses&,
const ChannelArgs&)>
create_endpoint);

@ -72,6 +72,7 @@
#include <vector>
#include "absl/container/inlined_vector.h"
#include "absl/functional/function_ref.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
@ -384,9 +385,9 @@ class GrpcLb : public LoadBalancingPolicy {
// Returns a text representation suitable for logging.
std::string AsText() const;
// Extracts all non-drop entries into an EndpointAddressesList.
EndpointAddressesList GetServerAddressList(
GrpcLbClientStats* client_stats) const;
// Extracts all non-drop entries into an EndpointAddressesIterator.
std::shared_ptr<EndpointAddressesIterator> GetServerAddressList(
GrpcLbClientStats* client_stats);
// Returns true if the serverlist contains at least one drop entry and
// no backend address entries.
@ -400,6 +401,8 @@ class GrpcLb : public LoadBalancingPolicy {
const char* ShouldDrop();
private:
class AddressIterator;
std::vector<GrpcLbServer> serverlist_;
// Accessed from the picker, so needs synchronization.
@ -504,6 +507,8 @@ class GrpcLb : public LoadBalancingPolicy {
RefCountedPtr<GrpcLb> parent_;
};
class NullLbTokenEndpointIterator;
void ShutdownLocked() override;
// Helper functions used in UpdateLocked().
@ -569,7 +574,8 @@ class GrpcLb : public LoadBalancingPolicy {
// Whether we're in fallback mode.
bool fallback_mode_ = false;
// The backend addresses from the resolver.
absl::StatusOr<EndpointAddressesList> fallback_backend_addresses_;
absl::StatusOr<std::shared_ptr<NullLbTokenEndpointIterator>>
fallback_backend_addresses_;
// The last resolution note from our parent.
// To be passed to child policy when fallback_backend_addresses_ is empty.
std::string resolution_note_;
@ -594,11 +600,30 @@ class GrpcLb : public LoadBalancingPolicy {
};
//
// GrpcLb::Serverlist
// GrpcLb::Serverlist::AddressIterator
//
bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
return serverlist_ == other.serverlist_;
bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) {
if (server.drop) return false;
if (GPR_UNLIKELY(server.port >> 16 != 0)) {
if (log) {
gpr_log(GPR_ERROR,
"Invalid port '%d' at index %" PRIuPTR
" of serverlist. Ignoring.",
server.port, idx);
}
return false;
}
if (GPR_UNLIKELY(server.ip_size != 4 && server.ip_size != 16)) {
if (log) {
gpr_log(GPR_ERROR,
"Expected IP to be 4 or 16 bytes, got %d at index %" PRIuPTR
" of serverlist. Ignoring",
server.ip_size, idx);
}
return false;
}
return true;
}
void ParseServer(const GrpcLbServer& server, grpc_resolved_address* addr) {
@ -623,6 +648,53 @@ void ParseServer(const GrpcLbServer& server, grpc_resolved_address* addr) {
}
}
class GrpcLb::Serverlist::AddressIterator : public EndpointAddressesIterator {
public:
AddressIterator(RefCountedPtr<Serverlist> serverlist,
RefCountedPtr<GrpcLbClientStats> client_stats)
: serverlist_(std::move(serverlist)),
client_stats_(std::move(client_stats)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
for (size_t i = 0; i < serverlist_->serverlist_.size(); ++i) {
const GrpcLbServer& server = serverlist_->serverlist_[i];
if (!IsServerValid(server, i, false)) continue;
// Address processing.
grpc_resolved_address addr;
ParseServer(server, &addr);
// LB token processing.
const size_t lb_token_length = strnlen(
server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token));
std::string lb_token(server.load_balance_token, lb_token_length);
if (lb_token.empty()) {
auto addr_uri = grpc_sockaddr_to_uri(&addr);
gpr_log(GPR_INFO,
"Missing LB token for backend address '%s'. The empty token "
"will be used instead",
addr_uri.ok() ? addr_uri->c_str()
: addr_uri.status().ToString().c_str());
}
// Return address with a channel arg containing LB token and stats object.
callback(EndpointAddresses(
addr, ChannelArgs().SetObject(MakeRefCounted<TokenAndClientStatsArg>(
std::move(lb_token), client_stats_))));
}
}
private:
RefCountedPtr<Serverlist> serverlist_;
RefCountedPtr<GrpcLbClientStats> client_stats_;
};
//
// GrpcLb::Serverlist
//
bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
return serverlist_ == other.serverlist_;
}
std::string GrpcLb::Serverlist::AsText() const {
std::vector<std::string> entries;
for (size_t i = 0; i < serverlist_.size(); ++i) {
@ -642,59 +714,12 @@ std::string GrpcLb::Serverlist::AsText() const {
return absl::StrJoin(entries, "");
}
bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) {
if (server.drop) return false;
if (GPR_UNLIKELY(server.port >> 16 != 0)) {
if (log) {
gpr_log(GPR_ERROR,
"Invalid port '%d' at index %" PRIuPTR
" of serverlist. Ignoring.",
server.port, idx);
}
return false;
}
if (GPR_UNLIKELY(server.ip_size != 4 && server.ip_size != 16)) {
if (log) {
gpr_log(GPR_ERROR,
"Expected IP to be 4 or 16 bytes, got %d at index %" PRIuPTR
" of serverlist. Ignoring",
server.ip_size, idx);
}
return false;
}
return true;
}
// Returns addresses extracted from the serverlist.
EndpointAddressesList GrpcLb::Serverlist::GetServerAddressList(
GrpcLbClientStats* client_stats) const {
std::shared_ptr<EndpointAddressesIterator>
GrpcLb::Serverlist::GetServerAddressList(GrpcLbClientStats* client_stats) {
RefCountedPtr<GrpcLbClientStats> stats;
if (client_stats != nullptr) stats = client_stats->Ref();
EndpointAddressesList endpoints;
for (size_t i = 0; i < serverlist_.size(); ++i) {
const GrpcLbServer& server = serverlist_[i];
if (!IsServerValid(server, i, false)) continue;
// Address processing.
grpc_resolved_address addr;
ParseServer(server, &addr);
// LB token processing.
const size_t lb_token_length = strnlen(
server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token));
std::string lb_token(server.load_balance_token, lb_token_length);
if (lb_token.empty()) {
auto addr_uri = grpc_sockaddr_to_uri(&addr);
gpr_log(GPR_INFO,
"Missing LB token for backend address '%s'. The empty token will "
"be used instead",
addr_uri.ok() ? addr_uri->c_str()
: addr_uri.status().ToString().c_str());
}
// Add address with a channel arg containing LB token and stats object.
endpoints.emplace_back(
addr, ChannelArgs().SetObject(MakeRefCounted<TokenAndClientStatsArg>(
std::move(lb_token), stats)));
}
return endpoints;
return std::make_shared<AddressIterator>(Ref(), std::move(stats));
}
bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
@ -1503,6 +1528,31 @@ void GrpcLb::ResetBackoffLocked() {
}
}
// Endpoint iterator wrapper to add null LB token attribute.
class GrpcLb::NullLbTokenEndpointIterator : public EndpointAddressesIterator {
public:
explicit NullLbTokenEndpointIterator(
std::shared_ptr<EndpointAddressesIterator> parent_it)
: parent_it_(std::move(parent_it)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
parent_it_->ForEach([&](const EndpointAddresses& endpoint) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] fallback address: %s", this,
endpoint.ToString().c_str());
}
callback(EndpointAddresses(endpoint.addresses(),
endpoint.args().SetObject(empty_token_)));
});
}
private:
std::shared_ptr<EndpointAddressesIterator> parent_it_;
RefCountedPtr<TokenAndClientStatsArg> empty_token_ =
MakeRefCounted<TokenAndClientStatsArg>("", nullptr);
};
absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] received update", this);
@ -1512,19 +1562,11 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
GPR_ASSERT(config_ != nullptr);
args_ = std::move(args.args);
// Update fallback address list.
fallback_backend_addresses_ = std::move(args.addresses);
if (fallback_backend_addresses_.ok()) {
// Add null LB token attributes.
for (EndpointAddresses& endpoint : *fallback_backend_addresses_) {
endpoint = EndpointAddresses(
endpoint.addresses(),
endpoint.args().SetObject(
MakeRefCounted<TokenAndClientStatsArg>("", nullptr)));
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] fallback address: %s", this,
endpoint.ToString().c_str());
}
}
if (!args.addresses.ok()) {
fallback_backend_addresses_ = args.addresses.status();
} else {
fallback_backend_addresses_ = std::make_shared<NullLbTokenEndpointIterator>(
std::move(*args.addresses));
}
resolution_note_ = std::move(args.resolution_note);
// Update balancer channel.
@ -1756,6 +1798,12 @@ OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
return lb_policy;
}
bool EndpointIteratorIsEmpty(const EndpointAddressesIterator& endpoints) {
bool empty = true;
endpoints.ForEach([&](const EndpointAddresses&) { empty = false; });
return empty;
}
void GrpcLb::CreateOrUpdateChildPolicyLocked() {
if (shutting_down_) return;
// Construct update args.
@ -1769,16 +1817,17 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
// picks.
update_args.addresses = fallback_backend_addresses_;
if (fallback_backend_addresses_.ok() &&
fallback_backend_addresses_->empty()) {
EndpointIteratorIsEmpty(**fallback_backend_addresses_)) {
update_args.resolution_note = absl::StrCat(
"grpclb in fallback mode without any balancer addresses: ",
"grpclb in fallback mode without any fallback addresses: ",
resolution_note_);
}
} else {
update_args.addresses = serverlist_->GetServerAddressList(
lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
is_backend_from_grpclb_load_balancer = true;
if (update_args.addresses.ok() && update_args.addresses->empty()) {
if (update_args.addresses.ok() &&
EndpointIteratorIsEmpty(**update_args.addresses)) {
update_args.resolution_note = "empty serverlist from grpclb balancer";
}
}

@ -661,7 +661,7 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
if (args.addresses.ok()) {
std::set<EndpointAddressSet> current_endpoints;
std::set<grpc_resolved_address, ResolvedAddressLessThan> current_addresses;
for (const EndpointAddresses& endpoint : *args.addresses) {
(*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
EndpointAddressSet key(endpoint.addresses());
current_endpoints.emplace(key);
for (const grpc_resolved_address& address : endpoint.addresses()) {
@ -708,7 +708,7 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
}
it->second->DisableEjection();
}
}
});
// Remove any entries we no longer need in the subchannel map.
for (auto it = subchannel_state_map_.begin();
it != subchannel_state_map_.end();) {
@ -753,7 +753,6 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
update_args.addresses = std::move(args.addresses);
update_args.resolution_note = std::move(args.resolution_note);
update_args.config = config_->child_policy();
// Update the policy.
update_args.args = std::move(args.args);
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,

@ -21,7 +21,6 @@
#include <inttypes.h>
#include <string.h>
#include <algorithm>
#include <memory>
#include <set>
#include <string>
@ -114,7 +113,7 @@ class PickFirst : public LoadBalancingPolicy {
public:
class SubchannelData {
public:
SubchannelData(SubchannelList* subchannel_list,
SubchannelData(SubchannelList* subchannel_list, size_t index,
RefCountedPtr<SubchannelInterface> subchannel);
SubchannelInterface* subchannel() const { return subchannel_.get(); }
@ -125,12 +124,6 @@ class PickFirst : public LoadBalancingPolicy {
return connectivity_status_;
}
// Returns the index into the subchannel list of this object.
size_t Index() const {
return static_cast<size_t>(this -
&subchannel_list_->subchannels_.front());
}
// Resets the connection backoff.
void ResetBackoffLocked() {
if (subchannel_ != nullptr) subchannel_->ResetBackoff();
@ -153,10 +146,8 @@ class PickFirst : public LoadBalancingPolicy {
class Watcher
: public SubchannelInterface::ConnectivityStateWatcherInterface {
public:
Watcher(SubchannelData* subchannel_data,
RefCountedPtr<SubchannelList> subchannel_list)
: subchannel_data_(subchannel_data),
subchannel_list_(std::move(subchannel_list)) {}
Watcher(RefCountedPtr<SubchannelList> subchannel_list, size_t index)
: subchannel_list_(std::move(subchannel_list)), index_(index) {}
~Watcher() override {
subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor");
@ -164,8 +155,8 @@ class PickFirst : public LoadBalancingPolicy {
void OnConnectivityStateChange(grpc_connectivity_state new_state,
absl::Status status) override {
subchannel_data_->OnConnectivityStateChange(new_state,
std::move(status));
subchannel_list_->subchannels_[index_].OnConnectivityStateChange(
new_state, std::move(status));
}
grpc_pollset_set* interested_parties() override {
@ -173,8 +164,8 @@ class PickFirst : public LoadBalancingPolicy {
}
private:
SubchannelData* subchannel_data_;
RefCountedPtr<SubchannelList> subchannel_list_;
const size_t index_;
};
// This method will be invoked once soon after instantiation to report
@ -193,6 +184,7 @@ class PickFirst : public LoadBalancingPolicy {
// Backpointer to owning subchannel list. Not owned.
SubchannelList* subchannel_list_;
const size_t index_;
// The subchannel.
RefCountedPtr<SubchannelInterface> subchannel_;
// Will be non-null when the subchannel's state is being watched.
@ -205,7 +197,8 @@ class PickFirst : public LoadBalancingPolicy {
};
SubchannelList(RefCountedPtr<PickFirst> policy,
EndpointAddressesList addresses, const ChannelArgs& args);
EndpointAddressesIterator* addresses,
const ChannelArgs& args);
~SubchannelList() override;
@ -413,9 +406,9 @@ void PickFirst::ResetBackoffLocked() {
void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
// Create a subchannel list from latest_update_args_.
EndpointAddressesList addresses;
EndpointAddressesIterator* addresses = nullptr;
if (latest_update_args_.addresses.ok()) {
addresses = *latest_update_args_.addresses;
addresses = latest_update_args_.addresses->get();
}
// Replace latest_pending_subchannel_list_.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) &&
@ -425,7 +418,7 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ = MakeOrphanable<SubchannelList>(
Ref(), std::move(addresses), latest_update_args_.args);
Ref(), addresses, latest_update_args_.args);
// Empty update or no valid subchannels. Put the channel in
// TRANSIENT_FAILURE and request re-resolution.
if (latest_pending_subchannel_list_->size() == 0) {
@ -483,9 +476,7 @@ class AddressFamilyIterator {
absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
if (args.addresses.ok()) {
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
args.addresses->size());
gpr_log(GPR_INFO, "Pick First %p received update", this);
} else {
gpr_log(GPR_INFO, "Pick First %p received update with address error: %s",
this, args.addresses.status().ToString().c_str());
@ -495,51 +486,59 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
absl::Status status;
if (!args.addresses.ok()) {
status = args.addresses.status();
} else if (args.addresses->empty()) {
status = absl::UnavailableError("address list must not be empty");
} else {
// Shuffle the list if needed.
auto config = static_cast<PickFirstConfig*>(args.config.get());
if (config->shuffle_addresses()) {
absl::c_shuffle(*args.addresses, bit_gen_);
}
// Flatten the list so that we have one address per endpoint.
// While we're iterating, also determine the desired address family
// order and the index of the first element of each family, for use in
// the interleaving below.
std::set<absl::string_view> address_families;
std::vector<AddressFamilyIterator> address_family_order;
EndpointAddressesList endpoints;
for (const auto& endpoint : *args.addresses) {
for (const auto& address : endpoint.addresses()) {
endpoints.emplace_back(address, endpoint.args());
if (IsPickFirstHappyEyeballsEnabled()) {
absl::string_view scheme = GetAddressFamily(address);
bool inserted = address_families.insert(scheme).second;
if (inserted) {
address_family_order.emplace_back(scheme, endpoints.size() - 1);
(*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
endpoints.push_back(endpoint);
});
if (endpoints.empty()) {
status = absl::UnavailableError("address list must not be empty");
} else {
// Shuffle the list if needed.
auto config = static_cast<PickFirstConfig*>(args.config.get());
if (config->shuffle_addresses()) {
absl::c_shuffle(endpoints, bit_gen_);
}
// Flatten the list so that we have one address per endpoint.
// While we're iterating, also determine the desired address family
// order and the index of the first element of each family, for use in
// the interleaving below.
std::set<absl::string_view> address_families;
std::vector<AddressFamilyIterator> address_family_order;
EndpointAddressesList flattened_endpoints;
for (const auto& endpoint : endpoints) {
for (const auto& address : endpoint.addresses()) {
flattened_endpoints.emplace_back(address, endpoint.args());
if (IsPickFirstHappyEyeballsEnabled()) {
absl::string_view scheme = GetAddressFamily(address);
bool inserted = address_families.insert(scheme).second;
if (inserted) {
address_family_order.emplace_back(scheme,
flattened_endpoints.size() - 1);
}
}
}
}
}
// Interleave addresses as per RFC-8305 section 4.
if (IsPickFirstHappyEyeballsEnabled()) {
EndpointAddressesList interleaved_endpoints;
interleaved_endpoints.reserve(endpoints.size());
std::vector<bool> endpoints_moved(endpoints.size());
size_t scheme_index = 0;
for (size_t i = 0; i < endpoints.size(); ++i) {
EndpointAddresses* endpoint;
do {
auto& iterator = address_family_order[scheme_index++ %
address_family_order.size()];
endpoint = iterator.Next(endpoints, &endpoints_moved);
} while (endpoint == nullptr);
interleaved_endpoints.emplace_back(std::move(*endpoint));
endpoints = std::move(flattened_endpoints);
// Interleave addresses as per RFC-8305 section 4.
if (IsPickFirstHappyEyeballsEnabled()) {
EndpointAddressesList interleaved_endpoints;
interleaved_endpoints.reserve(endpoints.size());
std::vector<bool> endpoints_moved(endpoints.size());
size_t scheme_index = 0;
for (size_t i = 0; i < endpoints.size(); ++i) {
EndpointAddresses* endpoint;
do {
auto& iterator = address_family_order[scheme_index++ %
address_family_order.size()];
endpoint = iterator.Next(endpoints, &endpoints_moved);
} while (endpoint == nullptr);
interleaved_endpoints.emplace_back(std::move(*endpoint));
}
endpoints = std::move(interleaved_endpoints);
}
args.addresses = std::move(interleaved_endpoints);
} else {
args.addresses = std::move(endpoints);
args.addresses =
std::make_shared<EndpointAddressesListIterator>(std::move(endpoints));
}
}
// If the update contains a resolver error and we have a previous update
@ -617,18 +616,20 @@ void PickFirst::HealthWatcher::OnConnectivityStateChange(
//
PickFirst::SubchannelList::SubchannelData::SubchannelData(
SubchannelList* subchannel_list,
SubchannelList* subchannel_list, size_t index,
RefCountedPtr<SubchannelInterface> subchannel)
: subchannel_list_(subchannel_list), subchannel_(std::move(subchannel)) {
: subchannel_list_(subchannel_list),
index_(index),
subchannel_(std::move(subchannel)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"[PF %p] subchannel list %p index %" PRIuPTR
" (subchannel %p): starting watch",
subchannel_list_->policy_.get(), subchannel_list_,
subchannel_list_->size(), subchannel_.get());
subchannel_list_->policy_.get(), subchannel_list_, index_,
subchannel_.get());
}
auto watcher = std::make_unique<Watcher>(
this, subchannel_list_->Ref(DEBUG_LOCATION, "Watcher"));
subchannel_list_->Ref(DEBUG_LOCATION, "Watcher"), index_);
pending_watcher_ = watcher.get();
subchannel_->WatchConnectivityState(std::move(watcher));
}
@ -639,7 +640,7 @@ void PickFirst::SubchannelList::SubchannelData::ShutdownLocked() {
gpr_log(GPR_INFO,
"[PF %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): cancelling watch and unreffing subchannel",
subchannel_list_->policy_.get(), subchannel_list_, Index(),
subchannel_list_->policy_.get(), subchannel_list_, index_,
subchannel_list_->size(), subchannel_.get());
}
subchannel_->CancelConnectivityStateWatch(pending_watcher_);
@ -659,7 +660,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
"status=%s, shutting_down=%d, pending_watcher=%p, "
"seen_transient_failure=%d, p->selected_=%p, "
"p->subchannel_list_=%p, p->latest_pending_subchannel_list_=%p",
p, subchannel_list_, Index(), subchannel_list_->size(),
p, subchannel_list_, index_, subchannel_list_->size(),
subchannel_.get(),
(connectivity_state_.has_value()
? ConnectivityStateName(*connectivity_state_)
@ -771,7 +772,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
if (!IsPickFirstHappyEyeballsEnabled()) {
// Ignore any other updates for subchannels we're not currently trying to
// connect to.
if (Index() != subchannel_list_->attempting_index_) return;
if (index_ != subchannel_list_->attempting_index_) return;
// React to the connectivity state.
ReactToConnectivityStateLocked();
return;
@ -784,7 +785,7 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
if (!prev_seen_transient_failure && seen_transient_failure_) {
// If a connection attempt fails before the timer fires, then
// cancel the timer and start connecting on the next subchannel.
if (Index() == subchannel_list_->attempting_index_) {
if (index_ == subchannel_list_->attempting_index_) {
if (subchannel_list_->timer_handle_.has_value()) {
p->channel_control_helper()->GetEventEngine()->Cancel(
*subchannel_list_->timer_handle_);
@ -858,7 +859,7 @@ void PickFirst::SubchannelList::SubchannelData::
// We skip subchannels in state TRANSIENT_FAILURE to avoid a
// large recursion that could overflow the stack.
SubchannelData* found_subchannel = nullptr;
for (size_t next_index = Index() + 1;
for (size_t next_index = index_ + 1;
next_index < subchannel_list_->size(); ++next_index) {
SubchannelData* sc = &subchannel_list_->subchannels_[next_index];
GPR_ASSERT(sc->connectivity_state_.has_value());
@ -946,14 +947,14 @@ void PickFirst::SubchannelList::SubchannelData::RequestConnectionWithTimer() {
GPR_ASSERT(connectivity_state_ == GRPC_CHANNEL_CONNECTING);
}
// If this is not the last subchannel in the list, start the timer.
if (Index() != subchannel_list_->size() - 1) {
if (index_ != subchannel_list_->size() - 1) {
PickFirst* p = subchannel_list_->policy_.get();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p subchannel list %p: starting Connection "
"Attempt Delay timer for %" PRId64 "ms for index %" PRIuPTR,
p, subchannel_list_, p->connection_attempt_delay_.millis(),
Index());
index_);
}
subchannel_list_->timer_handle_ =
p->channel_control_helper()->GetEventEngine()->RunAfter(
@ -1041,7 +1042,7 @@ void PickFirst::SubchannelList::SubchannelData::ProcessUnselectedReadyLocked() {
}
// Unref all other subchannels in the list.
for (size_t i = 0; i < subchannel_list_->size(); ++i) {
if (i != Index()) {
if (i != index_) {
subchannel_list_->subchannels_[i].ShutdownLocked();
}
}
@ -1052,7 +1053,7 @@ void PickFirst::SubchannelList::SubchannelData::ProcessUnselectedReadyLocked() {
//
PickFirst::SubchannelList::SubchannelList(RefCountedPtr<PickFirst> policy,
EndpointAddressesList addresses,
EndpointAddressesIterator* addresses,
const ChannelArgs& args)
: InternallyRefCounted<SubchannelList>(
GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) ? "SubchannelList"
@ -1062,14 +1063,12 @@ PickFirst::SubchannelList::SubchannelList(RefCountedPtr<PickFirst> policy,
.Remove(
GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"[PF %p] Creating subchannel list %p for %" PRIuPTR
" subchannels - channel args: %s",
policy_.get(), this, addresses.size(), args_.ToString().c_str());
gpr_log(GPR_INFO, "[PF %p] Creating subchannel list %p - channel args: %s",
policy_.get(), this, args_.ToString().c_str());
}
subchannels_.reserve(addresses.size());
if (addresses == nullptr) return;
// Create a subchannel for each address.
for (const EndpointAddresses& address : addresses) {
addresses->ForEach([&](const EndpointAddresses& address) {
GPR_ASSERT(address.addresses().size() == 1);
RefCountedPtr<SubchannelInterface> subchannel =
policy_->channel_control_helper()->CreateSubchannel(
@ -1081,7 +1080,7 @@ PickFirst::SubchannelList::SubchannelList(RefCountedPtr<PickFirst> policy,
"[PF %p] could not create subchannel for address %s, ignoring",
policy_.get(), address.ToString().c_str());
}
continue;
return;
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
@ -1090,8 +1089,8 @@ PickFirst::SubchannelList::SubchannelList(RefCountedPtr<PickFirst> policy,
policy_.get(), this, subchannels_.size(), subchannel.get(),
address.ToString().c_str());
}
subchannels_.emplace_back(this, std::move(subchannel));
}
subchannels_.emplace_back(this, subchannels_.size(), std::move(subchannel));
});
}
PickFirst::SubchannelList::~SubchannelList() {

@ -684,7 +684,8 @@ absl::Status PriorityLb::ChildPriority::UpdateLocked(
if (priority_policy_->addresses_.ok()) {
auto it = priority_policy_->addresses_->find(name_);
if (it == priority_policy_->addresses_->end()) {
update_args.addresses.emplace();
update_args.addresses = std::make_shared<EndpointAddressesListIterator>(
EndpointAddressesList());
} else {
update_args.addresses = it->second;
}

@ -554,7 +554,8 @@ void RingHash::RingHashEndpoint::UpdateChildPolicyLocked() {
GPR_ASSERT(config.ok());
// Update child policy.
LoadBalancingPolicy::UpdateArgs update_args;
update_args.addresses.emplace().emplace_back(ring_hash_->endpoints_[index_]);
update_args.addresses =
std::make_shared<SingleEndpointIterator>(ring_hash_->endpoints_[index_]);
update_args.args = ring_hash_->args_;
update_args.config = std::move(*config);
// TODO(roth): If the child reports a non-OK status with the update,
@ -622,18 +623,14 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) {
// Check address list.
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
gpr_log(GPR_INFO, "[RH %p] received update", this);
}
// De-dup endpoints, taking weight into account.
endpoints_.clear();
endpoints_.reserve(args.addresses->size());
std::map<EndpointAddressSet, size_t> endpoint_indices;
size_t num_skipped = 0;
for (size_t i = 0; i < args.addresses->size(); ++i) {
EndpointAddresses& endpoint = (*args.addresses)[i];
(*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
const EndpointAddressSet key(endpoint.addresses());
auto p = endpoint_indices.emplace(key, i - num_skipped);
auto p = endpoint_indices.emplace(key, endpoints_.size());
if (!p.second) {
// Duplicate endpoint. Combine weights and skip the dup.
EndpointAddresses& prev_endpoint = endpoints_[p.first->second];
@ -651,11 +648,10 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) {
prev_endpoint.addresses(),
prev_endpoint.args().Set(GRPC_ARG_ADDRESS_WEIGHT,
weight_arg + prev_weight_arg));
++num_skipped;
} else {
endpoints_.push_back(std::move(endpoint));
endpoints_.push_back(endpoint);
}
}
});
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s",

@ -707,7 +707,7 @@ class RlsLb : public LoadBalancingPolicy {
OrphanablePtr<RlsChannel> rls_channel_ ABSL_GUARDED_BY(mu_);
// Accessed only from within WorkSerializer.
absl::StatusOr<EndpointAddressesList> addresses_;
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses_;
ChannelArgs channel_args_;
RefCountedPtr<RlsLbConfig> config_;
RefCountedPtr<ChildPolicyWrapper> default_child_policy_;
@ -1858,6 +1858,27 @@ RlsLb::RlsLb(Args args) : LoadBalancingPolicy(std::move(args)), cache_(this) {
}
}
bool EndpointsEqual(
const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> endpoints1,
const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>
endpoints2) {
if (endpoints1.status() != endpoints2.status()) return false;
if (endpoints1.ok()) {
std::vector<EndpointAddresses> e1_list;
(*endpoints1)->ForEach([&](const EndpointAddresses& endpoint) {
e1_list.push_back(endpoint);
});
size_t i = 0;
bool different = false;
(*endpoints2)->ForEach([&](const EndpointAddresses& endpoint) {
if (endpoint != e1_list[i++]) different = true;
});
if (different) return false;
if (i != e1_list.size()) return false;
}
return true;
}
absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_rls_trace)) {
gpr_log(GPR_INFO, "[rlslb %p] policy updated", this);
@ -1875,7 +1896,7 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
// Swap out addresses.
// If the new address list is an error and we have an existing address list,
// stick with the existing addresses.
absl::StatusOr<EndpointAddressesList> old_addresses;
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> old_addresses;
if (args.addresses.ok()) {
old_addresses = std::move(addresses_);
addresses_ = std::move(args.addresses);
@ -1888,7 +1909,7 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
bool update_child_policies =
old_config == nullptr ||
old_config->child_policy_config() != config_->child_policy_config() ||
old_addresses != addresses_ || args.args != channel_args_;
!EndpointsEqual(old_addresses, addresses_) || args.args != channel_args_;
// If default target changes, swap out child policy.
bool created_default_child = false;
if (old_config == nullptr ||

@ -125,14 +125,14 @@ class OldRoundRobin : public LoadBalancingPolicy {
: public SubchannelList<RoundRobinSubchannelList,
RoundRobinSubchannelData> {
public:
RoundRobinSubchannelList(OldRoundRobin* policy, ServerAddressList addresses,
RoundRobinSubchannelList(OldRoundRobin* policy,
EndpointAddressesIterator* addresses,
const ChannelArgs& args)
: SubchannelList(policy,
(GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)
? "RoundRobinSubchannelList"
: nullptr),
std::move(addresses), policy->channel_control_helper(),
args) {
addresses, policy->channel_control_helper(), args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
@ -277,13 +277,12 @@ void OldRoundRobin::ResetBackoffLocked() {
}
absl::Status OldRoundRobin::UpdateLocked(UpdateArgs args) {
ServerAddressList addresses;
EndpointAddressesIterator* addresses = nullptr;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
gpr_log(GPR_INFO, "[RR %p] received update", this);
}
addresses = std::move(*args.addresses);
addresses = args.addresses->get();
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this,
@ -299,8 +298,8 @@ absl::Status OldRoundRobin::UpdateLocked(UpdateArgs args) {
gpr_log(GPR_INFO, "[RR %p] replacing previous pending subchannel list %p",
this, latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ = MakeRefCounted<RoundRobinSubchannelList>(
this, std::move(addresses), args.args);
latest_pending_subchannel_list_ =
MakeRefCounted<RoundRobinSubchannelList>(this, addresses, args.args);
latest_pending_subchannel_list_->StartWatchingLocked(args.args);
// If the new list is empty, immediately promote it to
// subchannel_list_ and report TRANSIENT_FAILURE.
@ -524,7 +523,7 @@ class RoundRobin : public LoadBalancingPolicy {
class RoundRobinEndpointList : public EndpointList {
public:
RoundRobinEndpointList(RefCountedPtr<RoundRobin> round_robin,
const EndpointAddressesList& endpoints,
EndpointAddressesIterator* endpoints,
const ChannelArgs& args)
: EndpointList(std::move(round_robin),
GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)
@ -687,13 +686,12 @@ void RoundRobin::ResetBackoffLocked() {
}
absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
EndpointAddressesList addresses;
EndpointAddressesIterator* addresses = nullptr;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " endpoints",
this, args.addresses->size());
gpr_log(GPR_INFO, "[RR %p] received update", this);
}
addresses = std::move(*args.addresses);
addresses = args.addresses->get();
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] received update with address error: %s", this,
@ -710,8 +708,7 @@ absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
latest_pending_endpoint_list_.get());
}
latest_pending_endpoint_list_ = MakeOrphanable<RoundRobinEndpointList>(
Ref(DEBUG_LOCATION, "RoundRobinEndpointList"), std::move(addresses),
args.args);
Ref(DEBUG_LOCATION, "RoundRobinEndpointList"), addresses, args.args);
// If the new list is empty, immediately promote it to
// endpoint_list_ and report TRANSIENT_FAILURE.
if (latest_pending_endpoint_list_->size() == 0) {

@ -42,6 +42,7 @@
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
@ -208,7 +209,7 @@ class SubchannelList : public DualRefCounted<SubchannelListType> {
protected:
SubchannelList(LoadBalancingPolicy* policy, const char* tracer,
ServerAddressList addresses,
EndpointAddressesIterator* addresses,
LoadBalancingPolicy::ChannelControlHelper* helper,
const ChannelArgs& args);
@ -365,19 +366,18 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
template <typename SubchannelListType, typename SubchannelDataType>
SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
LoadBalancingPolicy* policy, const char* tracer,
ServerAddressList addresses,
EndpointAddressesIterator* addresses,
LoadBalancingPolicy::ChannelControlHelper* helper, const ChannelArgs& args)
: DualRefCounted<SubchannelListType>(tracer),
policy_(policy),
tracer_(tracer) {
if (GPR_UNLIKELY(tracer_ != nullptr)) {
gpr_log(GPR_INFO,
"[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
tracer_, policy, this, addresses.size());
gpr_log(GPR_INFO, "[%s %p] Creating subchannel list %p", tracer_, policy,
this);
}
subchannels_.reserve(addresses.size());
if (addresses == nullptr) return;
// Create a subchannel for each address.
for (ServerAddress address : addresses) {
addresses->ForEach([&](const EndpointAddresses& address) {
RefCountedPtr<SubchannelInterface> subchannel =
helper->CreateSubchannel(address.address(), address.args(), args);
if (subchannel == nullptr) {
@ -387,7 +387,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
"[%s %p] could not create subchannel for address %s, ignoring",
tracer_, policy_, address.ToString().c_str());
}
continue;
return;
}
if (GPR_UNLIKELY(tracer_ != nullptr)) {
gpr_log(GPR_INFO,
@ -397,8 +397,8 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
address.ToString().c_str());
}
subchannels_.emplace_back();
subchannels_.back().Init(this, std::move(address), std::move(subchannel));
}
subchannels_.back().Init(this, address, std::move(subchannel));
});
}
template <typename SubchannelListType, typename SubchannelDataType>

@ -247,14 +247,13 @@ class OldWeightedRoundRobin : public LoadBalancingPolicy {
WeightedRoundRobinSubchannelData> {
public:
WeightedRoundRobinSubchannelList(OldWeightedRoundRobin* policy,
ServerAddressList addresses,
EndpointAddressesIterator* addresses,
const ChannelArgs& args)
: SubchannelList(policy,
(GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)
? "WeightedRoundRobinSubchannelList"
: nullptr),
std::move(addresses), policy->channel_control_helper(),
args) {
addresses, policy->channel_control_helper(), args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
@ -675,11 +674,10 @@ void OldWeightedRoundRobin::ResetBackoffLocked() {
absl::Status OldWeightedRoundRobin::UpdateLocked(UpdateArgs args) {
global_stats().IncrementWrrUpdates();
config_ = std::move(args.config);
ServerAddressList addresses;
std::shared_ptr<EndpointAddressesIterator> addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
gpr_log(GPR_INFO, "[WRR %p] received update", this);
}
// Weed out duplicate addresses. Also sort the addresses so that if
// the set of the addresses don't change, their indexes in the
@ -698,10 +696,12 @@ absl::Status OldWeightedRoundRobin::UpdateLocked(UpdateArgs args) {
return memcmp(addr1.addr, addr2.addr, addr1.len) < 0;
}
};
std::set<ServerAddress, AddressLessThan> ordered_addresses(
args.addresses->begin(), args.addresses->end());
addresses =
ServerAddressList(ordered_addresses.begin(), ordered_addresses.end());
std::set<ServerAddress, AddressLessThan> ordered_addresses;
(*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
ordered_addresses.insert(endpoint);
});
addresses = std::make_shared<EndpointAddressesListIterator>(
ServerAddressList(ordered_addresses.begin(), ordered_addresses.end()));
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] received update with address error: %s", this,
@ -718,8 +718,8 @@ absl::Status OldWeightedRoundRobin::UpdateLocked(UpdateArgs args) {
this, latest_pending_subchannel_list_.get());
}
latest_pending_subchannel_list_ =
MakeRefCounted<WeightedRoundRobinSubchannelList>(
this, std::move(addresses), args.args);
MakeRefCounted<WeightedRoundRobinSubchannelList>(this, addresses.get(),
args.args);
latest_pending_subchannel_list_->StartWatchingLocked(args.args);
// If the new list is empty, immediately promote it to
// subchannel_list_ and report TRANSIENT_FAILURE.
@ -1079,7 +1079,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
};
WrrEndpointList(RefCountedPtr<WeightedRoundRobin> wrr,
const EndpointAddressesList& endpoints,
EndpointAddressesIterator* endpoints,
const ChannelArgs& args)
: EndpointList(std::move(wrr),
GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)
@ -1516,11 +1516,10 @@ void WeightedRoundRobin::ResetBackoffLocked() {
absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
global_stats().IncrementWrrUpdates();
config_ = std::move(args.config);
EndpointAddressesList addresses;
std::shared_ptr<EndpointAddressesIterator> addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
gpr_log(GPR_INFO, "[WRR %p] received update", this);
}
// Weed out duplicate endpoints. Also sort the endpoints so that if
// the set of endpoints doesn't change, their indexes in the endpoint
@ -1539,10 +1538,13 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
return e1 < e2;
}
};
std::set<EndpointAddresses, EndpointAddressesLessThan> ordered_addresses(
args.addresses->begin(), args.addresses->end());
addresses = EndpointAddressesList(ordered_addresses.begin(),
ordered_addresses.end());
std::set<EndpointAddresses, EndpointAddressesLessThan> ordered_addresses;
(*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
ordered_addresses.insert(endpoint);
});
addresses =
std::make_shared<EndpointAddressesListIterator>(EndpointAddressesList(
ordered_addresses.begin(), ordered_addresses.end()));
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] received update with address error: %s", this,
@ -1559,7 +1561,7 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
this, latest_pending_endpoint_list_.get());
}
latest_pending_endpoint_list_ =
MakeOrphanable<WrrEndpointList>(Ref(), std::move(addresses), args.args);
MakeOrphanable<WrrEndpointList>(Ref(), addresses.get(), args.args);
// If the new list is empty, immediately promote it to
// endpoint_list_ and report TRANSIENT_FAILURE.
if (latest_pending_endpoint_list_->size() == 0) {

@ -157,10 +157,10 @@ class WeightedTargetLb : public LoadBalancingPolicy {
void Orphan() override;
absl::Status UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config,
absl::StatusOr<EndpointAddressesList> addresses,
const std::string& resolution_note,
const ChannelArgs& args);
absl::Status UpdateLocked(
const WeightedTargetLbConfig::ChildConfig& config,
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
const std::string& resolution_note, const ChannelArgs& args);
void ResetBackoffLocked();
void DeactivateLocked();
@ -338,11 +338,12 @@ absl::Status WeightedTargetLb::UpdateLocked(UpdateArgs args) {
target = MakeOrphanable<WeightedChild>(
Ref(DEBUG_LOCATION, "WeightedChild"), name);
}
absl::StatusOr<EndpointAddressesList> addresses;
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses;
if (address_map.ok()) {
auto it = address_map->find(name);
if (it == address_map->end()) {
addresses.emplace();
addresses = std::make_shared<EndpointAddressesListIterator>(
EndpointAddressesList());
} else {
addresses = std::move(it->second);
}
@ -589,7 +590,7 @@ WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
absl::Status WeightedTargetLb::WeightedChild::UpdateLocked(
const WeightedTargetLbConfig::ChildConfig& config,
absl::StatusOr<EndpointAddressesList> addresses,
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
const std::string& resolution_note, const ChannelArgs& args) {
if (weighted_target_policy_->shutting_down_) return absl::OkStatus();
// Update child weight.

@ -250,7 +250,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const ChannelArgs& args);
absl::Status UpdateChildPolicyLocked(
absl::StatusOr<EndpointAddressesList> addresses,
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
std::string resolution_note, const ChannelArgs& args);
void MaybeUpdatePickerLocked();
@ -569,7 +569,7 @@ OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
}
absl::Status XdsClusterImplLb::UpdateChildPolicyLocked(
absl::StatusOr<EndpointAddressesList> addresses,
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
std::string resolution_note, const ChannelArgs& args) {
// Create policy if needed.
if (child_policy_ == nullptr) {

@ -149,7 +149,8 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
absl::Status UpdateLocked(
RefCountedPtr<LoadBalancingPolicy::Config> config,
const absl::StatusOr<EndpointAddressesList>& addresses,
const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>&
addresses,
const ChannelArgs& args);
void ExitIdleLocked();
void ResetBackoffLocked();
@ -482,7 +483,7 @@ XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked(
RefCountedPtr<LoadBalancingPolicy::Config> config,
const absl::StatusOr<EndpointAddressesList>& addresses,
const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>& addresses,
const ChannelArgs& args) {
if (xds_cluster_manager_policy_->shutting_down_) return absl::OkStatus();
// Update child weight.

@ -28,6 +28,7 @@
#include <utility>
#include <vector>
#include "absl/functional/function_ref.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
@ -61,6 +62,7 @@
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/gprpp/work_serializer.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
@ -390,7 +392,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
absl::Status UpdateChildPolicyLocked();
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const ChannelArgs& args);
EndpointAddressesList CreateChildPolicyAddressesLocked();
std::shared_ptr<EndpointAddressesIterator> CreateChildPolicyAddressesLocked();
std::string CreateChildPolicyResolutionNoteLocked();
RefCountedPtr<Config> CreateChildPolicyConfigLocked();
ChannelArgs CreateChildPolicyArgsLocked(const ChannelArgs& args_in);
@ -529,10 +531,16 @@ XdsClusterResolverLb::DiscoveryMechanismEntry::config() const {
->config_->discovery_mechanisms()[discovery_mechanism->index()];
}
std::string MakeChildPolicyName(absl::string_view cluster_name,
size_t child_number) {
return absl::StrCat("{cluster=", cluster_name,
", child_number=", child_number, "}");
}
std::string XdsClusterResolverLb::DiscoveryMechanismEntry::GetChildPolicyName(
size_t priority) const {
return absl::StrCat("{cluster=", config().cluster_name,
", child_number=", priority_child_numbers[priority], "}");
return MakeChildPolicyName(config().cluster_name,
priority_child_numbers[priority]);
}
//
@ -768,39 +776,76 @@ void XdsClusterResolverLb::OnResourceDoesNotExist(size_t index,
// child policy-related methods
//
EndpointAddressesList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
EndpointAddressesList addresses;
for (const auto& discovery_entry : discovery_mechanisms_) {
const auto& priority_list =
GetUpdatePriorityList(*discovery_entry.latest_update);
for (size_t priority = 0; priority < priority_list.size(); ++priority) {
const auto& priority_entry = priority_list[priority];
std::string priority_child_name =
discovery_entry.GetChildPolicyName(priority);
for (const auto& p : priority_entry.localities) {
const auto& locality_name = p.first;
const auto& locality = p.second;
std::vector<RefCountedStringValue> hierarchical_path = {
RefCountedStringValue(priority_child_name),
RefCountedStringValue(locality_name->AsHumanReadableString())};
auto hierarchical_path_attr =
MakeRefCounted<HierarchicalPathArg>(std::move(hierarchical_path));
for (const auto& endpoint : locality.endpoints) {
uint32_t endpoint_weight =
locality.lb_weight *
endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1);
addresses.emplace_back(
endpoint.addresses(),
endpoint.args()
.SetObject(hierarchical_path_attr)
.Set(GRPC_ARG_ADDRESS_WEIGHT, endpoint_weight)
.SetObject(locality_name->Ref())
.Set(GRPC_ARG_XDS_LOCALITY_WEIGHT, locality.lb_weight));
class PriorityEndpointIterator : public EndpointAddressesIterator {
public:
struct DiscoveryMechanismResult {
std::shared_ptr<const XdsEndpointResource> update;
std::string cluster_name;
std::vector<size_t /*child_number*/> priority_child_numbers;
DiscoveryMechanismResult(
std::shared_ptr<const XdsEndpointResource> resource,
std::string cluster, std::vector<size_t> child_numbers)
: update(std::move(resource)),
cluster_name(std::move(cluster)),
priority_child_numbers(std::move(child_numbers)) {}
std::string GetChildPolicyName(size_t priority) const {
return MakeChildPolicyName(cluster_name,
priority_child_numbers[priority]);
}
};
explicit PriorityEndpointIterator(
std::vector<DiscoveryMechanismResult> results)
: results_(std::move(results)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
for (const auto& entry : results_) {
const auto& priority_list = GetUpdatePriorityList(*entry.update);
for (size_t priority = 0; priority < priority_list.size(); ++priority) {
const auto& priority_entry = priority_list[priority];
std::string priority_child_name = entry.GetChildPolicyName(priority);
for (const auto& p : priority_entry.localities) {
const auto& locality_name = p.first;
const auto& locality = p.second;
std::vector<RefCountedStringValue> hierarchical_path = {
RefCountedStringValue(priority_child_name),
RefCountedStringValue(locality_name->AsHumanReadableString())};
auto hierarchical_path_attr =
MakeRefCounted<HierarchicalPathArg>(std::move(hierarchical_path));
for (const auto& endpoint : locality.endpoints) {
uint32_t endpoint_weight =
locality.lb_weight *
endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1);
callback(EndpointAddresses(
endpoint.addresses(),
endpoint.args()
.SetObject(hierarchical_path_attr)
.Set(GRPC_ARG_ADDRESS_WEIGHT, endpoint_weight)
.SetObject(locality_name->Ref())
.Set(GRPC_ARG_XDS_LOCALITY_WEIGHT, locality.lb_weight)));
}
}
}
}
}
return addresses;
private:
std::vector<DiscoveryMechanismResult> results_;
};
std::shared_ptr<EndpointAddressesIterator>
XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
std::vector<PriorityEndpointIterator::DiscoveryMechanismResult> entries;
entries.reserve(discovery_mechanisms_.size());
for (const auto& discovery_entry : discovery_mechanisms_) {
entries.emplace_back(discovery_entry.latest_update,
discovery_entry.config().cluster_name,
discovery_entry.priority_child_numbers);
}
return std::make_shared<PriorityEndpointIterator>(std::move(entries));
}
std::string XdsClusterResolverLb::CreateChildPolicyResolutionNoteLocked() {

@ -18,7 +18,6 @@
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.h"
#include <inttypes.h>
#include <stddef.h>
#include <algorithm>
@ -34,6 +33,7 @@
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/functional/function_ref.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
@ -300,8 +300,7 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
void MaybeUpdatePickerLocked();
absl::StatusOr<EndpointAddressesList> UpdateAddressMap(
absl::StatusOr<EndpointAddressesList> endpoints);
void UpdateAddressMap(const EndpointAddressesIterator& endpoints);
RefCountedPtr<SubchannelWrapper> AdoptSubchannel(
const grpc_resolved_address& address,
@ -508,12 +507,36 @@ void XdsOverrideHostLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}
// Wraps the endpoint iterator and filters out endpoints in state DRAINING.
class ChildEndpointIterator : public EndpointAddressesIterator {
public:
explicit ChildEndpointIterator(
std::shared_ptr<EndpointAddressesIterator> parent_it)
: parent_it_(std::move(parent_it)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
parent_it_->ForEach([&](const EndpointAddresses& endpoint) {
XdsHealthStatus status = GetEndpointHealthStatus(endpoint);
if (status.status() != XdsHealthStatus::kDraining) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] endpoint %s: not draining, "
"passing to child",
this, endpoint.ToString().c_str());
}
callback(endpoint);
}
});
}
private:
std::shared_ptr<EndpointAddressesIterator> parent_it_;
};
absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] Received update with %" PRIuPTR
" addresses",
this, args.addresses.ok() ? args.addresses->size() : 0);
gpr_log(GPR_INFO, "[xds_override_host_lb %p] Received update", this);
}
auto old_config = std::move(config_);
// Update config.
@ -521,13 +544,24 @@ absl::Status XdsOverrideHostLb::UpdateLocked(UpdateArgs args) {
if (config_ == nullptr) {
return absl::InvalidArgumentError("Missing policy config");
}
// Update address map and wrap endpoint iterator for child policy.
if (args.addresses.ok()) {
UpdateAddressMap(**args.addresses);
args.addresses =
std::make_shared<ChildEndpointIterator>(std::move(*args.addresses));
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] address error: %s", this,
args.addresses.status().ToString().c_str());
}
}
// Create child policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args.args);
}
// Update child policy.
UpdateArgs update_args;
update_args.addresses = UpdateAddressMap(std::move(args.addresses));
update_args.addresses = std::move(args.addresses);
update_args.resolution_note = std::move(args.resolution_note);
update_args.config = config_->child_config();
update_args.args = std::move(args.args);
@ -578,18 +612,9 @@ OrphanablePtr<LoadBalancingPolicy> XdsOverrideHostLb::CreateChildPolicyLocked(
return lb_policy;
}
absl::StatusOr<EndpointAddressesList> XdsOverrideHostLb::UpdateAddressMap(
absl::StatusOr<EndpointAddressesList> endpoints) {
if (!endpoints.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] address error: %s", this,
endpoints.status().ToString().c_str());
}
return endpoints;
}
// Construct the list of addresses to pass to the child policy and a
// map of address info from which to update subchannel_map_.
EndpointAddressesList child_addresses;
void XdsOverrideHostLb::UpdateAddressMap(
const EndpointAddressesIterator& endpoints) {
// Construct a map of address info from which to update subchannel_map_.
struct AddressInfo {
XdsHealthStatus eds_health_status;
RefCountedStringValue address_list;
@ -597,25 +622,18 @@ absl::StatusOr<EndpointAddressesList> XdsOverrideHostLb::UpdateAddressMap(
: eds_health_status(status), address_list(std::move(addresses)) {}
};
std::map<const std::string, AddressInfo> addresses_for_map;
for (const auto& endpoint : *endpoints) {
endpoints.ForEach([&](const EndpointAddresses& endpoint) {
XdsHealthStatus status = GetEndpointHealthStatus(endpoint);
if (status.status() != XdsHealthStatus::kDraining) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] endpoint %s: not draining, "
"passing to child",
this, endpoint.ToString().c_str());
}
child_addresses.push_back(endpoint);
} else if (!config_->override_host_status_set().Contains(status)) {
// Skip draining hosts if not in the override status set.
// Skip draining hosts if not in the override status set.
if (status.status() == XdsHealthStatus::kDraining &&
!config_->override_host_status_set().Contains(status)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
"[xds_override_host_lb %p] endpoint %s: draining but not in "
"override_host_status set -- ignoring",
this, endpoint.ToString().c_str());
}
continue;
return;
}
std::vector<std::string> addresses;
addresses.reserve(endpoint.addresses().size());
@ -641,7 +659,7 @@ absl::StatusOr<EndpointAddressesList> XdsOverrideHostLb::UpdateAddressMap(
std::piecewise_construct, std::forward_as_tuple(addresses[i]),
std::forward_as_tuple(status, std::move(address_list)));
}
}
});
// Now grab the lock and update subchannel_map_ from addresses_for_map.
{
MutexLock lock(&subchannel_map_mu_);
@ -688,7 +706,6 @@ absl::StatusOr<EndpointAddressesList> XdsOverrideHostLb::UpdateAddressMap(
it->second.set_address_list(std::move(address_info.address_list));
}
}
return child_addresses;
}
RefCountedPtr<XdsOverrideHostLb::SubchannelWrapper>

@ -21,7 +21,6 @@
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
@ -170,10 +169,10 @@ absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) {
// Scan the addresses to find the weight for each locality.
std::map<std::string, uint32_t> locality_weights;
if (args.addresses.ok()) {
for (const auto& address : *args.addresses) {
auto* locality_name = address.args().GetObject<XdsLocalityName>();
(*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) {
auto* locality_name = endpoint.args().GetObject<XdsLocalityName>();
uint32_t weight =
address.args().GetInt(GRPC_ARG_XDS_LOCALITY_WEIGHT).value_or(0);
endpoint.args().GetInt(GRPC_ARG_XDS_LOCALITY_WEIGHT).value_or(0);
if (locality_name != nullptr && weight > 0) {
auto p = locality_weights.emplace(
locality_name->AsHumanReadableString(), weight);
@ -184,7 +183,7 @@ absl::Status XdsWrrLocalityLb::UpdateLocked(UpdateArgs args) {
p.first->first.c_str(), p.first->second, weight);
}
}
}
});
}
// Construct the config for the weighted_target policy.
Json::Object weighted_targets;

@ -104,6 +104,19 @@ inline bool operator==(const RefCountedStringValue& lhs,
return lhs.as_string_view() == rhs.as_string_view();
}
inline bool operator!=(const RefCountedStringValue& lhs,
absl::string_view rhs) {
return lhs.as_string_view() != rhs;
}
inline bool operator!=(absl::string_view lhs,
const RefCountedStringValue& rhs) {
return lhs != rhs.as_string_view();
}
inline bool operator!=(const RefCountedStringValue& lhs,
const RefCountedStringValue& rhs) {
return lhs.as_string_view() != rhs.as_string_view();
}
inline bool operator<(const RefCountedStringValue& lhs, absl::string_view rhs) {
return lhs.as_string_view() < rhs;
}

@ -346,7 +346,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
struct UpdateArgs {
/// A list of endpoints, each with one or more address, or an error
/// indicating a failure to obtain the list of addresses.
absl::StatusOr<EndpointAddressesList> addresses;
absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses;
/// The LB policy config.
RefCountedPtr<Config> config;
/// A human-readable note providing context about the name resolution that

@ -22,7 +22,6 @@
#include <string.h>
#include <algorithm>
#include <string>
#include <utility>
#include <vector>

@ -23,8 +23,11 @@
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "absl/functional/function_ref.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/resolved_address.h"
@ -64,6 +67,9 @@ class EndpointAddresses {
bool operator==(const EndpointAddresses& other) const {
return Cmp(other) == 0;
}
bool operator!=(const EndpointAddresses& other) const {
return Cmp(other) != 0;
}
bool operator<(const EndpointAddresses& other) const {
return Cmp(other) < 0;
}
@ -111,6 +117,48 @@ class EndpointAddressSet {
std::set<grpc_resolved_address, ResolvedAddressLessThan> addresses_;
};
// An iterator interface for endpoints.
class EndpointAddressesIterator {
public:
virtual ~EndpointAddressesIterator() = default;
// Invokes callback once for each endpoint.
virtual void ForEach(
absl::FunctionRef<void(const EndpointAddresses&)> callback) const = 0;
};
// Iterator over a fixed list of endpoints.
class EndpointAddressesListIterator : public EndpointAddressesIterator {
public:
explicit EndpointAddressesListIterator(EndpointAddressesList endpoints)
: endpoints_(std::move(endpoints)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
for (const auto& endpoint : endpoints_) {
callback(endpoint);
}
}
private:
EndpointAddressesList endpoints_;
};
// Iterator that returns only a single endpoint.
class SingleEndpointIterator : public EndpointAddressesIterator {
public:
explicit SingleEndpointIterator(EndpointAddresses endpoint)
: endpoint_(std::move(endpoint)) {}
void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
const override {
callback(endpoint_);
}
private:
EndpointAddresses endpoint_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_RESOLVER_ENDPOINT_ADDRESSES_H

@ -773,7 +773,8 @@ class LoadBalancingPolicyTest : public ::testing::Test {
absl::Span<const EndpointAddresses> endpoints,
RefCountedPtr<LoadBalancingPolicy::Config> config) {
LoadBalancingPolicy::UpdateArgs update;
update.addresses.emplace(endpoints.begin(), endpoints.end());
update.addresses = std::make_shared<EndpointAddressesListIterator>(
EndpointAddressesList(endpoints.begin(), endpoints.end()));
update.config = std::move(config);
return update;
}

@ -99,11 +99,13 @@ class XdsOverrideHostTest : public LoadBalancingPolicyTest {
"HEALTHY"}) {
LoadBalancingPolicy::UpdateArgs update;
update.config = MakeXdsOverrideHostConfig(override_host_status);
update.addresses.emplace();
EndpointAddressesList endpoints;
for (auto address_and_status : addresses_and_statuses) {
update.addresses->push_back(MakeAddressWithHealthStatus(
endpoints.push_back(MakeAddressWithHealthStatus(
address_and_status.first, address_and_status.second));
}
update.addresses =
std::make_shared<EndpointAddressesListIterator>(std::move(endpoints));
EXPECT_EQ(ApplyUpdate(update, lb_policy()), absl::OkStatus());
}

@ -414,17 +414,19 @@ class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
config->address().c_str());
auto uri = URI::Parse(config->address());
args.config.reset();
args.addresses = EndpointAddressesList();
EndpointAddressesList addresses;
if (uri.ok()) {
grpc_resolved_address address;
GPR_ASSERT(grpc_parse_uri(*uri, &address));
args.addresses->emplace_back(address, ChannelArgs());
addresses.emplace_back(address, ChannelArgs());
} else {
gpr_log(GPR_ERROR,
"%s: could not parse URI (%s), using empty address list",
kFixedAddressLbPolicyName, uri.status().ToString().c_str());
args.resolution_note = "no address in fixed_address_lb policy";
}
args.addresses =
std::make_shared<EndpointAddressesListIterator>(std::move(addresses));
return ForwardingLoadBalancingPolicy::UpdateLocked(std::move(args));
}

Loading…
Cancel
Save