Pass address to CreateSubchannel() and expose attributes in SubchannelInterface.

pull/24172/head
Mark D. Roth 4 years ago
parent 3ee45eceef
commit f7a72b040d
  1. 73
      src/core/ext/filters/client_channel/client_channel.cc
  2. 2
      src/core/ext/filters/client_channel/lb_policy.h
  3. 5
      src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
  4. 7
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  5. 4
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  6. 6
      src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
  7. 6
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  8. 41
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  9. 6
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  10. 7
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  11. 7
      src/core/ext/filters/client_channel/lb_policy/xds/eds.cc
  12. 7
      src/core/ext/filters/client_channel/lb_policy/xds/lrs.cc
  13. 6
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
  14. 5
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  15. 4
      src/core/ext/filters/client_channel/server_address.h
  16. 6
      src/core/ext/filters/client_channel/subchannel_interface.h
  17. 96
      test/core/util/test_lb_policies.cc
  18. 9
      test/core/util/test_lb_policies.h
  19. 102
      test/cpp/end2end/client_lb_end2end_test.cc

@ -881,6 +881,9 @@ class CallData {
// ChannelData::SubchannelWrapper // ChannelData::SubchannelWrapper
// //
using ServerAddressAttributeMap =
std::map<const char*, std::unique_ptr<ServerAddress::AttributeInterface>>;
// This class is a wrapper for Subchannel that hides details of the // This class is a wrapper for Subchannel that hides details of the
// channel's implementation (such as the health check service name and // channel's implementation (such as the health check service name and
// connected subchannel) from the LB policy API. // connected subchannel) from the LB policy API.
@ -892,11 +895,13 @@ class CallData {
class ChannelData::SubchannelWrapper : public SubchannelInterface { class ChannelData::SubchannelWrapper : public SubchannelInterface {
public: public:
SubchannelWrapper(ChannelData* chand, Subchannel* subchannel, SubchannelWrapper(ChannelData* chand, Subchannel* subchannel,
grpc_core::UniquePtr<char> health_check_service_name) grpc_core::UniquePtr<char> health_check_service_name,
ServerAddressAttributeMap attributes)
: SubchannelInterface(&grpc_client_channel_routing_trace), : SubchannelInterface(&grpc_client_channel_routing_trace),
chand_(chand), chand_(chand),
subchannel_(subchannel), subchannel_(subchannel),
health_check_service_name_(std::move(health_check_service_name)) { health_check_service_name_(std::move(health_check_service_name)),
attributes_(std::move(attributes)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"chand=%p: creating subchannel wrapper %p for subchannel %p", "chand=%p: creating subchannel wrapper %p for subchannel %p",
@ -974,14 +979,21 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
void ResetBackoff() override { subchannel_->ResetBackoff(); } void ResetBackoff() override { subchannel_->ResetBackoff(); }
void ThrottleKeepaliveTime(int new_keepalive_time) {
subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
}
const grpc_channel_args* channel_args() override { const grpc_channel_args* channel_args() override {
return subchannel_->channel_args(); return subchannel_->channel_args();
} }
const ServerAddress::AttributeInterface* GetAttribute(
const char* key) const override {
auto it = attributes_.find(key);
if (it == attributes_.end()) return nullptr;
return it->second.get();
}
void ThrottleKeepaliveTime(int new_keepalive_time) {
subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
}
void UpdateHealthCheckServiceName( void UpdateHealthCheckServiceName(
grpc_core::UniquePtr<char> health_check_service_name) { grpc_core::UniquePtr<char> health_check_service_name) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
@ -1175,6 +1187,7 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
ChannelData* chand_; ChannelData* chand_;
Subchannel* subchannel_; Subchannel* subchannel_;
grpc_core::UniquePtr<char> health_check_service_name_; grpc_core::UniquePtr<char> health_check_service_name_;
ServerAddressAttributeMap attributes_;
// Maps from the address of the watcher passed to us by the LB policy // Maps from the address of the watcher passed to us by the LB policy
// to the address of the WrapperWatcher that we passed to the underlying // to the address of the WrapperWatcher that we passed to the underlying
// subchannel. This is needed so that when the LB policy calls // subchannel. This is needed so that when the LB policy calls
@ -1349,6 +1362,18 @@ class ChannelData::ConnectivityWatcherRemover {
// ChannelData::ClientChannelControlHelper // ChannelData::ClientChannelControlHelper
// //
} // namespace
// Allows accessing the attributes from a ServerAddress.
class ChannelServerAddressPeer {
public:
static ServerAddressAttributeMap GetAttributes(ServerAddress* address) {
return std::move(address->attributes_);
}
};
namespace {
class ChannelData::ClientChannelControlHelper class ChannelData::ClientChannelControlHelper
: public LoadBalancingPolicy::ChannelControlHelper { : public LoadBalancingPolicy::ChannelControlHelper {
public: public:
@ -1362,7 +1387,8 @@ class ChannelData::ClientChannelControlHelper
} }
RefCountedPtr<SubchannelInterface> CreateSubchannel( RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override { ServerAddress address, const grpc_channel_args& args) override {
// Determine health check service name.
bool inhibit_health_checking = grpc_channel_arg_get_bool( bool inhibit_health_checking = grpc_channel_arg_get_bool(
grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false); grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
grpc_core::UniquePtr<char> health_check_service_name; grpc_core::UniquePtr<char> health_check_service_name;
@ -1370,21 +1396,37 @@ class ChannelData::ClientChannelControlHelper
health_check_service_name.reset( health_check_service_name.reset(
gpr_strdup(chand_->health_check_service_name_.get())); gpr_strdup(chand_->health_check_service_name_.get()));
} }
// Remove channel args that should not affect subchannel uniqueness.
static const char* args_to_remove[] = { static const char* args_to_remove[] = {
GRPC_ARG_INHIBIT_HEALTH_CHECKING, GRPC_ARG_INHIBIT_HEALTH_CHECKING,
GRPC_ARG_CHANNELZ_CHANNEL_NODE, GRPC_ARG_CHANNELZ_CHANNEL_NODE,
}; };
grpc_arg arg = SubchannelPoolInterface::CreateChannelArg( // Add channel args needed for the subchannel.
chand_->subchannel_pool_.get()); absl::InlinedVector<grpc_arg, 3> args_to_add = {
Subchannel::CreateSubchannelAddressArg(&address.address()),
SubchannelPoolInterface::CreateChannelArg(
chand_->subchannel_pool_.get()),
};
if (address.args() != nullptr) {
for (size_t j = 0; j < address.args()->num_args; ++j) {
args_to_add.emplace_back(address.args()->args[j]);
}
}
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
&args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &arg, 1); &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove),
args_to_add.data(), args_to_add.size());
gpr_free(args_to_add[0].value.string);
// Create subchannel.
Subchannel* subchannel = Subchannel* subchannel =
chand_->client_channel_factory_->CreateSubchannel(new_args); chand_->client_channel_factory_->CreateSubchannel(new_args);
grpc_channel_args_destroy(new_args); grpc_channel_args_destroy(new_args);
if (subchannel == nullptr) return nullptr; if (subchannel == nullptr) return nullptr;
// Make sure the subchannel has updated keepalive time.
subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_); subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_);
// Create and return wrapper for the subchannel.
return MakeRefCounted<SubchannelWrapper>( return MakeRefCounted<SubchannelWrapper>(
chand_, subchannel, std::move(health_check_service_name)); chand_, subchannel, std::move(health_check_service_name),
ChannelServerAddressPeer::GetAttributes(&address));
} }
void UpdateState( void UpdateState(
@ -1662,9 +1704,12 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
&new_args); &new_args);
target_uri_.reset(proxy_name != nullptr ? proxy_name target_uri_.reset(proxy_name != nullptr ? proxy_name
: gpr_strdup(server_uri)); : gpr_strdup(server_uri));
channel_args_ = new_args != nullptr // Strip out service config channel arg, so that it doesn't affect
? new_args // subchannel uniqueness when the args flow down to that layer.
: grpc_channel_args_copy(args->channel_args); const char* arg_to_remove = GRPC_ARG_SERVICE_CONFIG;
channel_args_ = grpc_channel_args_copy_and_remove(
new_args != nullptr ? new_args : args->channel_args, &arg_to_remove, 1);
grpc_channel_args_destroy(new_args);
keepalive_time_ = grpc_channel_args_find_integer( keepalive_time_ = grpc_channel_args_find_integer(
channel_args_, GRPC_ARG_KEEPALIVE_TIME_MS, channel_args_, GRPC_ARG_KEEPALIVE_TIME_MS,
{-1 /* default value, unset */, 1, INT_MAX}); {-1 /* default value, unset */, 1, INT_MAX});

@ -279,7 +279,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Creates a new subchannel with the specified channel args. /// Creates a new subchannel with the specified channel args.
virtual RefCountedPtr<SubchannelInterface> CreateSubchannel( virtual RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) = 0; ServerAddress address, const grpc_channel_args& args) = 0;
/// Sets the connectivity state and returns a new picker to be used /// Sets the connectivity state and returns a new picker to be used
/// by the client channel. /// by the client channel.

@ -39,10 +39,11 @@ class ChildPolicyHandler::Helper
~Helper() { parent_.reset(DEBUG_LOCATION, "Helper"); } ~Helper() { parent_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel( RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override { ServerAddress address, const grpc_channel_args& args) override {
if (parent_->shutting_down_) return nullptr; if (parent_->shutting_down_) return nullptr;
if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr; if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
return parent_->channel_control_helper()->CreateSubchannel(args); return parent_->channel_control_helper()->CreateSubchannel(
std::move(address), args);
} }
void UpdateState(grpc_connectivity_state state, const absl::Status& status, void UpdateState(grpc_connectivity_state state, const absl::Status& status,

@ -302,7 +302,7 @@ class GrpcLb : public LoadBalancingPolicy {
: parent_(std::move(parent)) {} : parent_(std::move(parent)) {}
RefCountedPtr<SubchannelInterface> CreateSubchannel( RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override; ServerAddress address, const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status, void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override; std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override; void RequestReresolution() override;
@ -654,9 +654,10 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
// //
RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel( RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
const grpc_channel_args& args) { ServerAddress address, const grpc_channel_args& args) {
if (parent_->shutting_down_) return nullptr; if (parent_->shutting_down_) return nullptr;
return parent_->channel_control_helper()->CreateSubchannel(args); return parent_->channel_control_helper()->CreateSubchannel(std::move(address),
args);
} }
void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,

@ -84,9 +84,9 @@ class PickFirst : public LoadBalancingPolicy {
PickFirstSubchannelData> { PickFirstSubchannelData> {
public: public:
PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer, PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
const ServerAddressList& addresses, ServerAddressList addresses,
const grpc_channel_args& args) const grpc_channel_args& args)
: SubchannelList(policy, tracer, addresses, : SubchannelList(policy, tracer, std::move(addresses),
policy->channel_control_helper(), args) { policy->channel_control_helper(), args) {
// Need to maintain a ref to the LB policy as long as we maintain // Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels' // any references to subchannels, since the subchannels'

@ -155,7 +155,7 @@ class PriorityLb : public LoadBalancingPolicy {
~Helper() { priority_.reset(DEBUG_LOCATION, "Helper"); } ~Helper() { priority_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel( RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override; ServerAddress address, const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, void UpdateState(grpc_connectivity_state state,
const absl::Status& status, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override; std::unique_ptr<SubchannelPicker> picker) override;
@ -736,10 +736,10 @@ void PriorityLb::ChildPriority::Helper::RequestReresolution() {
RefCountedPtr<SubchannelInterface> RefCountedPtr<SubchannelInterface>
PriorityLb::ChildPriority::Helper::CreateSubchannel( PriorityLb::ChildPriority::Helper::CreateSubchannel(
const grpc_channel_args& args) { ServerAddress address, const grpc_channel_args& args) {
if (priority_->priority_policy_->shutting_down_) return nullptr; if (priority_->priority_policy_->shutting_down_) return nullptr;
return priority_->priority_policy_->channel_control_helper() return priority_->priority_policy_->channel_control_helper()
->CreateSubchannel(args); ->CreateSubchannel(std::move(address), args);
} }
void PriorityLb::ChildPriority::Helper::UpdateState( void PriorityLb::ChildPriority::Helper::UpdateState(

@ -112,9 +112,9 @@ class RoundRobin : public LoadBalancingPolicy {
RoundRobinSubchannelData> { RoundRobinSubchannelData> {
public: public:
RoundRobinSubchannelList(RoundRobin* policy, TraceFlag* tracer, RoundRobinSubchannelList(RoundRobin* policy, TraceFlag* tracer,
const ServerAddressList& addresses, ServerAddressList addresses,
const grpc_channel_args& args) const grpc_channel_args& args)
: SubchannelList(policy, tracer, addresses, : SubchannelList(policy, tracer, std::move(addresses),
policy->channel_control_helper(), args) { policy->channel_control_helper(), args) {
// Need to maintain a ref to the LB policy as long as we maintain // Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels' // any references to subchannels, since the subchannels'
@ -445,7 +445,7 @@ void RoundRobin::UpdateLocked(UpdateArgs args) {
} }
} }
latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>( latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
this, &grpc_lb_round_robin_trace, args.addresses, *args.args); this, &grpc_lb_round_robin_trace, std::move(args.addresses), *args.args);
if (latest_pending_subchannel_list_->num_subchannels() == 0) { if (latest_pending_subchannel_list_->num_subchannels() == 0) {
// If the new list is empty, immediately promote the new list to the // If the new list is empty, immediately promote the new list to the
// current list and transition to TRANSIENT_FAILURE. // current list and transition to TRANSIENT_FAILURE.

@ -200,7 +200,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
protected: protected:
SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer, SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
const ServerAddressList& addresses, ServerAddressList addresses,
LoadBalancingPolicy::ChannelControlHelper* helper, LoadBalancingPolicy::ChannelControlHelper* helper,
const grpc_channel_args& args); const grpc_channel_args& args);
@ -350,8 +350,7 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
template <typename SubchannelListType, typename SubchannelDataType> template <typename SubchannelListType, typename SubchannelDataType>
SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList( SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
LoadBalancingPolicy* policy, TraceFlag* tracer, LoadBalancingPolicy* policy, TraceFlag* tracer, ServerAddressList addresses,
const ServerAddressList& addresses,
LoadBalancingPolicy::ChannelControlHelper* helper, LoadBalancingPolicy::ChannelControlHelper* helper,
const grpc_channel_args& args) const grpc_channel_args& args)
: InternallyRefCounted<SubchannelListType>(tracer), : InternallyRefCounted<SubchannelListType>(tracer),
@ -363,50 +362,28 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
tracer_->name(), policy, this, addresses.size()); tracer_->name(), policy, this, addresses.size());
} }
subchannels_.reserve(addresses.size()); subchannels_.reserve(addresses.size());
// We need to remove the LB addresses in order to be able to compare the
// subchannel keys of subchannels from a different batch of addresses.
// We remove the service config, since it will be passed into the
// subchannel via call context.
static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
GRPC_ARG_SERVICE_CONFIG};
// Create a subchannel for each address. // Create a subchannel for each address.
for (size_t i = 0; i < addresses.size(); i++) { for (const ServerAddress& address : addresses) {
absl::InlinedVector<grpc_arg, 3> args_to_add;
const size_t subchannel_address_arg_index = args_to_add.size();
args_to_add.emplace_back(
Subchannel::CreateSubchannelAddressArg(&addresses[i].address()));
if (addresses[i].args() != nullptr) {
for (size_t j = 0; j < addresses[i].args()->num_args; ++j) {
args_to_add.emplace_back(addresses[i].args()->args[j]);
}
}
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
&args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove),
args_to_add.data(), args_to_add.size());
gpr_free(args_to_add[subchannel_address_arg_index].value.string);
RefCountedPtr<SubchannelInterface> subchannel = RefCountedPtr<SubchannelInterface> subchannel =
helper->CreateSubchannel(*new_args); helper->CreateSubchannel(std::move(address), args);
grpc_channel_args_destroy(new_args);
if (subchannel == nullptr) { if (subchannel == nullptr) {
// Subchannel could not be created. // Subchannel could not be created.
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[%s %p] could not create subchannel for address uri %s, " "[%s %p] could not create subchannel for address %s, "
"ignoring", "ignoring",
tracer_->name(), policy_, tracer_->name(), policy_, address.ToString().c_str());
grpc_sockaddr_to_uri(&addresses[i].address()).c_str());
} }
continue; continue;
} }
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR "[%s %p] subchannel list %p index %" PRIuPTR
": Created subchannel %p for address uri %s", ": Created subchannel %p for address %s",
tracer_->name(), policy_, this, subchannels_.size(), tracer_->name(), policy_, this, subchannels_.size(),
subchannel.get(), subchannel.get(), address.ToString().c_str());
grpc_sockaddr_to_uri(&addresses[i].address()).c_str());
} }
subchannels_.emplace_back(this, addresses[i], std::move(subchannel)); subchannels_.emplace_back(this, address, std::move(subchannel));
} }
} }

@ -145,7 +145,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {
~Helper() { weighted_child_.reset(DEBUG_LOCATION, "Helper"); } ~Helper() { weighted_child_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel( RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override; ServerAddress address, const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, void UpdateState(grpc_connectivity_state state,
const absl::Status& status, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override; std::unique_ptr<SubchannelPicker> picker) override;
@ -590,10 +590,10 @@ void WeightedTargetLb::WeightedChild::OnDelayedRemovalTimerLocked(
RefCountedPtr<SubchannelInterface> RefCountedPtr<SubchannelInterface>
WeightedTargetLb::WeightedChild::Helper::CreateSubchannel( WeightedTargetLb::WeightedChild::Helper::CreateSubchannel(
const grpc_channel_args& args) { ServerAddress address, const grpc_channel_args& args) {
if (weighted_child_->weighted_target_policy_->shutting_down_) return nullptr; if (weighted_child_->weighted_target_policy_->shutting_down_) return nullptr;
return weighted_child_->weighted_target_policy_->channel_control_helper() return weighted_child_->weighted_target_policy_->channel_control_helper()
->CreateSubchannel(args); ->CreateSubchannel(std::move(address), args);
} }
void WeightedTargetLb::WeightedChild::Helper::UpdateState( void WeightedTargetLb::WeightedChild::Helper::UpdateState(

@ -79,7 +79,7 @@ class CdsLb : public LoadBalancingPolicy {
public: public:
explicit Helper(RefCountedPtr<CdsLb> parent) : parent_(std::move(parent)) {} explicit Helper(RefCountedPtr<CdsLb> parent) : parent_(std::move(parent)) {}
RefCountedPtr<SubchannelInterface> CreateSubchannel( RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override; ServerAddress address, const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status, void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override; std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override; void RequestReresolution() override;
@ -239,9 +239,10 @@ void CdsLb::ClusterWatcher::OnResourceDoesNotExist() {
// //
RefCountedPtr<SubchannelInterface> CdsLb::Helper::CreateSubchannel( RefCountedPtr<SubchannelInterface> CdsLb::Helper::CreateSubchannel(
const grpc_channel_args& args) { ServerAddress address, const grpc_channel_args& args) {
if (parent_->shutting_down_) return nullptr; if (parent_->shutting_down_) return nullptr;
return parent_->channel_control_helper()->CreateSubchannel(args); return parent_->channel_control_helper()->CreateSubchannel(std::move(address),
args);
} }
void CdsLb::Helper::UpdateState(grpc_connectivity_state state, void CdsLb::Helper::UpdateState(grpc_connectivity_state state,

@ -133,7 +133,7 @@ class EdsLb : public LoadBalancingPolicy {
~Helper() { eds_policy_.reset(DEBUG_LOCATION, "Helper"); } ~Helper() { eds_policy_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel( RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override; ServerAddress address, const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status, void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override; std::unique_ptr<SubchannelPicker> picker) override;
// This is a no-op, because we get the addresses from the xds // This is a no-op, because we get the addresses from the xds
@ -261,9 +261,10 @@ EdsLb::PickResult EdsLb::DropPicker::Pick(PickArgs args) {
// //
RefCountedPtr<SubchannelInterface> EdsLb::Helper::CreateSubchannel( RefCountedPtr<SubchannelInterface> EdsLb::Helper::CreateSubchannel(
const grpc_channel_args& args) { ServerAddress address, const grpc_channel_args& args) {
if (eds_policy_->shutting_down_) return nullptr; if (eds_policy_->shutting_down_) return nullptr;
return eds_policy_->channel_control_helper()->CreateSubchannel(args); return eds_policy_->channel_control_helper()->CreateSubchannel(
std::move(address), args);
} }
void EdsLb::Helper::UpdateState(grpc_connectivity_state state, void EdsLb::Helper::UpdateState(grpc_connectivity_state state,

@ -119,7 +119,7 @@ class LrsLb : public LoadBalancingPolicy {
~Helper() { lrs_policy_.reset(DEBUG_LOCATION, "Helper"); } ~Helper() { lrs_policy_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel( RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override; ServerAddress address, const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status, void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override; std::unique_ptr<SubchannelPicker> picker) override;
void RequestReresolution() override; void RequestReresolution() override;
@ -324,9 +324,10 @@ void LrsLb::UpdateChildPolicyLocked(ServerAddressList addresses,
// //
RefCountedPtr<SubchannelInterface> LrsLb::Helper::CreateSubchannel( RefCountedPtr<SubchannelInterface> LrsLb::Helper::CreateSubchannel(
const grpc_channel_args& args) { ServerAddress address, const grpc_channel_args& args) {
if (lrs_policy_->shutting_down_) return nullptr; if (lrs_policy_->shutting_down_) return nullptr;
return lrs_policy_->channel_control_helper()->CreateSubchannel(args); return lrs_policy_->channel_control_helper()->CreateSubchannel(
std::move(address), args);
} }
void LrsLb::Helper::UpdateState(grpc_connectivity_state state, void LrsLb::Helper::UpdateState(grpc_connectivity_state state,

@ -154,7 +154,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
~Helper() { xds_cluster_manager_child_.reset(DEBUG_LOCATION, "Helper"); } ~Helper() { xds_cluster_manager_child_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel( RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override; ServerAddress address, const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state, void UpdateState(grpc_connectivity_state state,
const absl::Status& status, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override; std::unique_ptr<SubchannelPicker> picker) override;
@ -546,12 +546,12 @@ void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked(
RefCountedPtr<SubchannelInterface> RefCountedPtr<SubchannelInterface>
XdsClusterManagerLb::ClusterChild::Helper::CreateSubchannel( XdsClusterManagerLb::ClusterChild::Helper::CreateSubchannel(
const grpc_channel_args& args) { ServerAddress address, const grpc_channel_args& args) {
if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_)
return nullptr; return nullptr;
return xds_cluster_manager_child_->xds_cluster_manager_policy_ return xds_cluster_manager_child_->xds_cluster_manager_policy_
->channel_control_helper() ->channel_control_helper()
->CreateSubchannel(args); ->CreateSubchannel(std::move(address), args);
} }
void XdsClusterManagerLb::ClusterChild::Helper::UpdateState( void XdsClusterManagerLb::ClusterChild::Helper::UpdateState(

@ -109,9 +109,10 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
: parent_(std::move(parent)) {} : parent_(std::move(parent)) {}
RefCountedPtr<SubchannelInterface> CreateSubchannel( RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override { ServerAddress address, const grpc_channel_args& args) override {
if (parent_->resolver_ == nullptr) return nullptr; // Shutting down. if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
return parent_->channel_control_helper()->CreateSubchannel(args); return parent_->channel_control_helper()->CreateSubchannel(
std::move(address), args);
} }
void UpdateState(grpc_connectivity_state state, const absl::Status& status, void UpdateState(grpc_connectivity_state state, const absl::Status& status,

@ -97,6 +97,10 @@ class ServerAddress {
std::string ToString() const; std::string ToString() const;
private: private:
// Allows the channel to access the attributes without knowing the keys.
// (We intentionally do not allow LB policies to do this.)
friend class ChannelServerAddressPeer;
grpc_resolved_address address_; grpc_resolved_address address_;
grpc_channel_args* args_; grpc_channel_args* args_;
std::map<const char*, std::unique_ptr<AttributeInterface>> attributes_; std::map<const char*, std::unique_ptr<AttributeInterface>> attributes_;

@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -87,6 +88,11 @@ class SubchannelInterface : public RefCounted<SubchannelInterface> {
// TODO(roth): Need a better non-grpc-specific abstraction here. // TODO(roth): Need a better non-grpc-specific abstraction here.
virtual const grpc_channel_args* channel_args() = 0; virtual const grpc_channel_args* channel_args() = 0;
// Allows accessing the attributes associated with the address for
// this subchannel.
virtual const ServerAddress::AttributeInterface* GetAttribute(
const char* key) const = 0;
}; };
} // namespace grpc_core } // namespace grpc_core

@ -138,8 +138,9 @@ class TestPickArgsLb : public ForwardingLoadBalancingPolicy {
: parent_(std::move(parent)), cb_(std::move(cb)) {} : parent_(std::move(parent)), cb_(std::move(cb)) {}
RefCountedPtr<SubchannelInterface> CreateSubchannel( RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override { ServerAddress address, const grpc_channel_args& args) override {
return parent_->channel_control_helper()->CreateSubchannel(args); return parent_->channel_control_helper()->CreateSubchannel(
std::move(address), args);
} }
void UpdateState(grpc_connectivity_state state, const absl::Status& status, void UpdateState(grpc_connectivity_state state, const absl::Status& status,
@ -248,8 +249,9 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
: parent_(std::move(parent)), cb_(std::move(cb)) {} : parent_(std::move(parent)), cb_(std::move(cb)) {}
RefCountedPtr<SubchannelInterface> CreateSubchannel( RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override { ServerAddress address, const grpc_channel_args& args) override {
return parent_->channel_control_helper()->CreateSubchannel(args); return parent_->channel_control_helper()->CreateSubchannel(
std::move(address), args);
} }
void UpdateState(grpc_connectivity_state state, const absl::Status& status, void UpdateState(grpc_connectivity_state state, const absl::Status& status,
@ -331,6 +333,87 @@ class InterceptTrailingFactory : public LoadBalancingPolicyFactory {
InterceptRecvTrailingMetadataCallback cb_; InterceptRecvTrailingMetadataCallback cb_;
}; };
//
// AddressTestLoadBalancingPolicy
//
constexpr char kAddressTestLbPolicyName[] = "address_test_lb";
class AddressTestLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
public:
AddressTestLoadBalancingPolicy(Args args, AddressTestCallback cb)
: ForwardingLoadBalancingPolicy(
absl::make_unique<Helper>(
RefCountedPtr<AddressTestLoadBalancingPolicy>(this),
std::move(cb)),
std::move(args),
/*delegate_lb_policy_name=*/"pick_first",
/*initial_refcount=*/2) {}
~AddressTestLoadBalancingPolicy() override = default;
const char* name() const override { return kAddressTestLbPolicyName; }
private:
class Helper : public ChannelControlHelper {
public:
Helper(RefCountedPtr<AddressTestLoadBalancingPolicy> parent,
AddressTestCallback cb)
: parent_(std::move(parent)), cb_(std::move(cb)) {}
RefCountedPtr<SubchannelInterface> CreateSubchannel(
ServerAddress address, const grpc_channel_args& args) override {
cb_(address);
return parent_->channel_control_helper()->CreateSubchannel(
std::move(address), args);
}
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override {
parent_->channel_control_helper()->UpdateState(state, status,
std::move(picker));
}
void RequestReresolution() override {
parent_->channel_control_helper()->RequestReresolution();
}
void AddTraceEvent(TraceSeverity severity,
absl::string_view message) override {
parent_->channel_control_helper()->AddTraceEvent(severity, message);
}
private:
RefCountedPtr<AddressTestLoadBalancingPolicy> parent_;
AddressTestCallback cb_;
};
};
class AddressTestConfig : public LoadBalancingPolicy::Config {
public:
const char* name() const override { return kAddressTestLbPolicyName; }
};
class AddressTestFactory : public LoadBalancingPolicyFactory {
public:
explicit AddressTestFactory(AddressTestCallback cb) : cb_(std::move(cb)) {}
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return MakeOrphanable<AddressTestLoadBalancingPolicy>(std::move(args), cb_);
}
const char* name() const override { return kAddressTestLbPolicyName; }
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
const Json& /*json*/, grpc_error** /*error*/) const override {
return MakeRefCounted<AddressTestConfig>();
}
private:
AddressTestCallback cb_;
};
} // namespace } // namespace
void RegisterTestPickArgsLoadBalancingPolicy(TestPickArgsCallback cb) { void RegisterTestPickArgsLoadBalancingPolicy(TestPickArgsCallback cb) {
@ -344,4 +427,9 @@ void RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
absl::make_unique<InterceptTrailingFactory>(std::move(cb))); absl::make_unique<InterceptTrailingFactory>(std::move(cb)));
} }
void RegisterAddressTestLoadBalancingPolicy(AddressTestCallback cb) {
LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
absl::make_unique<AddressTestFactory>(std::move(cb)));
}
} // namespace grpc_core } // namespace grpc_core

@ -45,11 +45,16 @@ using InterceptRecvTrailingMetadataCallback =
std::function<void(const TrailingMetadataArgsSeen&)>; std::function<void(const TrailingMetadataArgsSeen&)>;
// Registers an LB policy called "intercept_trailing_metadata_lb" that // Registers an LB policy called "intercept_trailing_metadata_lb" that
// invokes cb with argument user_data when trailing metadata is received // invokes cb when trailing metadata is received for each call.
// for each call.
void RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy( void RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
InterceptRecvTrailingMetadataCallback cb); InterceptRecvTrailingMetadataCallback cb);
using AddressTestCallback = std::function<void(const ServerAddress&)>;
// Registers an LB policy called "address_test_lb" that invokes cb for each
// address used to create a subchannel.
void RegisterAddressTestLoadBalancingPolicy(AddressTestCallback cb);
} // namespace grpc_core } // namespace grpc_core
#endif // GRPC_TEST_CORE_UTIL_TEST_LB_POLICIES_H #endif // GRPC_TEST_CORE_UTIL_TEST_LB_POLICIES_H

@ -161,11 +161,14 @@ class FakeResolverResponseGeneratorWrapper {
response_generator_ = std::move(other.response_generator_); response_generator_ = std::move(other.response_generator_);
} }
void SetNextResolution(const std::vector<int>& ports, void SetNextResolution(
const char* service_config_json = nullptr) { const std::vector<int>& ports, const char* service_config_json = nullptr,
const char* attribute_key = nullptr,
std::unique_ptr<grpc_core::ServerAddress::AttributeInterface> attribute =
nullptr) {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
response_generator_->SetResponse( response_generator_->SetResponse(BuildFakeResults(
BuildFakeResults(ports, service_config_json)); ports, service_config_json, attribute_key, std::move(attribute)));
} }
void SetNextResolutionUponError(const std::vector<int>& ports) { void SetNextResolutionUponError(const std::vector<int>& ports) {
@ -184,8 +187,10 @@ class FakeResolverResponseGeneratorWrapper {
private: private:
static grpc_core::Resolver::Result BuildFakeResults( static grpc_core::Resolver::Result BuildFakeResults(
const std::vector<int>& ports, const std::vector<int>& ports, const char* service_config_json = nullptr,
const char* service_config_json = nullptr) { const char* attribute_key = nullptr,
std::unique_ptr<grpc_core::ServerAddress::AttributeInterface> attribute =
nullptr) {
grpc_core::Resolver::Result result; grpc_core::Resolver::Result result;
for (const int& port : ports) { for (const int& port : ports) {
std::string lb_uri_str = absl::StrCat("ipv4:127.0.0.1:", port); std::string lb_uri_str = absl::StrCat("ipv4:127.0.0.1:", port);
@ -193,8 +198,14 @@ class FakeResolverResponseGeneratorWrapper {
GPR_ASSERT(lb_uri != nullptr); GPR_ASSERT(lb_uri != nullptr);
grpc_resolved_address address; grpc_resolved_address address;
GPR_ASSERT(grpc_parse_uri(lb_uri, &address)); GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
std::map<const char*,
std::unique_ptr<grpc_core::ServerAddress::AttributeInterface>>
attributes;
if (attribute != nullptr) {
attributes[attribute_key] = attribute->Copy();
}
result.addresses.emplace_back(address.addr, address.len, result.addresses.emplace_back(address.addr, address.len,
nullptr /* args */); nullptr /* args */, std::move(attributes));
grpc_uri_destroy(lb_uri); grpc_uri_destroy(lb_uri);
} }
if (service_config_json != nullptr) { if (service_config_json != nullptr) {
@ -1887,6 +1898,83 @@ TEST_F(ClientLbInterceptTrailingMetadataTest, BackendMetricData) {
EXPECT_EQ(kNumRpcs, trailers_intercepted()); EXPECT_EQ(kNumRpcs, trailers_intercepted());
} }
class ClientLbAddressTest : public ClientLbEnd2endTest {
protected:
static const char* kAttributeKey;
class Attribute : public grpc_core::ServerAddress::AttributeInterface {
public:
explicit Attribute(const std::string& str) : str_(str) {}
std::unique_ptr<AttributeInterface> Copy() const override {
return absl::make_unique<Attribute>(str_);
}
int Cmp(const AttributeInterface* other) const override {
return str_.compare(static_cast<const Attribute*>(other)->str_);
}
std::string ToString() const override { return str_; }
private:
std::string str_;
};
void SetUp() override {
ClientLbEnd2endTest::SetUp();
current_test_instance_ = this;
}
static void SetUpTestCase() {
grpc_init();
grpc_core::RegisterAddressTestLoadBalancingPolicy(SaveAddress);
}
static void TearDownTestCase() { grpc_shutdown_blocking(); }
const std::vector<std::string>& addresses_seen() {
grpc::internal::MutexLock lock(&mu_);
return addresses_seen_;
}
private:
static void SaveAddress(const grpc_core::ServerAddress& address) {
ClientLbAddressTest* self = current_test_instance_;
grpc::internal::MutexLock lock(&self->mu_);
self->addresses_seen_.emplace_back(address.ToString());
}
static ClientLbAddressTest* current_test_instance_;
grpc::internal::Mutex mu_;
std::vector<std::string> addresses_seen_;
};
const char* ClientLbAddressTest::kAttributeKey = "attribute_key";
ClientLbAddressTest* ClientLbAddressTest::current_test_instance_ = nullptr;
TEST_F(ClientLbAddressTest, Basic) {
const int kNumServers = 1;
StartServers(kNumServers);
auto response_generator = BuildResolverResponseGenerator();
auto channel = BuildChannel("address_test_lb", response_generator);
auto stub = BuildStub(channel);
// Addresses returned by the resolver will have attached attributes.
response_generator.SetNextResolution(GetServersPorts(), nullptr,
kAttributeKey,
absl::make_unique<Attribute>("foo"));
CheckRpcSendOk(stub, DEBUG_LOCATION);
// Check LB policy name for the channel.
EXPECT_EQ("address_test_lb", channel->GetLoadBalancingPolicyName());
// Make sure that the attributes wind up on the subchannels.
std::vector<std::string> expected;
for (const int port : GetServersPorts()) {
expected.emplace_back(absl::StrCat(
"127.0.0.1:", port, " args={} attributes={", kAttributeKey, "=foo}"));
}
EXPECT_EQ(addresses_seen(), expected);
}
} // namespace } // namespace
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc

Loading…
Cancel
Save