Health checking service name to be passed as a channel arg for now

reviewable/pr18746/r3
Yash Tibrewal 6 years ago
parent 70839d966f
commit 4309a98b66
  1. 32
      src/core/ext/filters/client_channel/client_channel.cc
  2. 5
      src/core/ext/filters/client_channel/client_channel_factory.h
  3. 1
      src/core/ext/filters/client_channel/health/health_check_parser.cc
  4. 6
      src/core/ext/filters/client_channel/lb_policy.cc
  5. 6
      src/core/ext/filters/client_channel/lb_policy.h
  6. 11
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  7. 8
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  8. 8
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  9. 7
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  10. 10
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  11. 20
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  12. 6
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  13. 13
      src/core/ext/filters/client_channel/subchannel.cc
  14. 6
      src/core/ext/filters/client_channel/subchannel.h
  15. 6
      src/core/ext/transport/chttp2/client/insecure/channel_create.cc
  16. 6
      src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
  17. 7
      test/core/util/test_lb_policies.cc
  18. 3
      test/cpp/microbenchmarks/bm_call_create.cc

@ -224,8 +224,7 @@ class ChannelData {
static bool ProcessResolverResultLocked(
void* arg, const Resolver::Result& result, const char** lb_policy_name,
const ParsedLoadBalancingConfig** lb_policy_config,
const HealthCheckParsedObject** health_check);
const ParsedLoadBalancingConfig** lb_policy_config);
grpc_error* DoPingLocked(grpc_transport_op* op);
@ -932,15 +931,26 @@ class ChannelData::ClientChannelControlHelper
"ClientChannelControlHelper");
}
Subchannel* CreateSubchannel(
const grpc_channel_args& args,
const HealthCheckParsedObject* health_check) override {
grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
grpc_arg args_to_add[2];
int num_args_to_add = 0;
if (chand_->service_config_ != nullptr) {
/*const auto* health_check_object = static_cast<HealthCheckParsedObject*>(
chand_->service_config_->GetParsedGlobalServiceConfigObject(
HealthCheckParser::ParserIndex()));
if (health_check_object != nullptr) {
args_to_add[0] = grpc_channel_arg_string_create(
const_cast<char*>("grpc.temp.health_check"),
const_cast<char*>(health_check_object->service_name()));
num_args_to_add++;
}*/
}
args_to_add[num_args_to_add++] = SubchannelPoolInterface::CreateChannelArg(
chand_->subchannel_pool_.get());
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(&args, &arg, 1);
Subchannel* subchannel = chand_->client_channel_factory_->CreateSubchannel(
new_args, health_check);
grpc_channel_args_copy_and_add(&args, args_to_add, num_args_to_add);
Subchannel* subchannel =
chand_->client_channel_factory_->CreateSubchannel(new_args);
grpc_channel_args_destroy(new_args);
return subchannel;
}
@ -1111,8 +1121,7 @@ ChannelData::~ChannelData() {
// resolver result update.
bool ChannelData::ProcessResolverResultLocked(
void* arg, const Resolver::Result& result, const char** lb_policy_name,
const ParsedLoadBalancingConfig** lb_policy_config,
const HealthCheckParsedObject** health_check) {
const ParsedLoadBalancingConfig** lb_policy_config) {
ChannelData* chand = static_cast<ChannelData*>(arg);
ProcessedResolverResult resolver_result(result);
const char* service_config_json = resolver_result.service_config_json();
@ -1140,7 +1149,6 @@ bool ChannelData::ProcessResolverResultLocked(
// Return results.
*lb_policy_name = chand->info_lb_policy_name_.get();
*lb_policy_config = resolver_result.lb_policy_config();
*health_check = resolver_result.health_check();
return service_config_changed;
}

@ -34,9 +34,8 @@ class ClientChannelFactory {
virtual ~ClientChannelFactory() = default;
// Creates a subchannel with the specified args.
virtual Subchannel* CreateSubchannel(
const grpc_channel_args* args,
const HealthCheckParsedObject* health_check) GRPC_ABSTRACT;
virtual Subchannel* CreateSubchannel(const grpc_channel_args* args)
GRPC_ABSTRACT;
// Creates a channel for the specified target with the specified args.
virtual grpc_channel* CreateChannel(

@ -61,6 +61,7 @@ UniquePtr<ServiceConfigParsedObject> HealthCheckParser::ParseGlobalParams(
}
}
}
if (service_name == nullptr) return nullptr;
return UniquePtr<ServiceConfigParsedObject>(
New<HealthCheckParsedObject>(service_name));
}

@ -69,15 +69,12 @@ void LoadBalancingPolicy::ShutdownAndUnrefLocked(void* arg,
LoadBalancingPolicy::UpdateArgs::UpdateArgs(const UpdateArgs& other) {
addresses = other.addresses;
config = other.config;
health_check = other.health_check;
args = grpc_channel_args_copy(other.args);
}
LoadBalancingPolicy::UpdateArgs::UpdateArgs(UpdateArgs&& other) {
addresses = std::move(other.addresses);
config = std::move(other.config);
health_check = other.health_check;
other.health_check = nullptr;
// TODO(roth): Use std::move() once channel args is converted to C++.
args = other.args;
other.args = nullptr;
@ -87,7 +84,6 @@ LoadBalancingPolicy::UpdateArgs& LoadBalancingPolicy::UpdateArgs::operator=(
const UpdateArgs& other) {
addresses = other.addresses;
config = other.config;
health_check = other.health_check;
grpc_channel_args_destroy(args);
args = grpc_channel_args_copy(other.args);
return *this;
@ -97,8 +93,6 @@ LoadBalancingPolicy::UpdateArgs& LoadBalancingPolicy::UpdateArgs::operator=(
UpdateArgs&& other) {
addresses = std::move(other.addresses);
config = std::move(other.config);
health_check = other.health_check;
other.health_check = nullptr;
// TODO(roth): Use std::move() once channel args is converted to C++.
grpc_channel_args_destroy(args);
args = other.args;

@ -185,9 +185,8 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
virtual ~ChannelControlHelper() = default;
/// Creates a new subchannel with the specified channel args.
virtual Subchannel* CreateSubchannel(
const grpc_channel_args& args,
const HealthCheckParsedObject* health_check) GRPC_ABSTRACT;
virtual Subchannel* CreateSubchannel(const grpc_channel_args& args)
GRPC_ABSTRACT;
/// Creates a channel with the specified target and channel args.
/// This can be used in cases where the LB policy needs to create a
@ -211,7 +210,6 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
struct UpdateArgs {
ServerAddressList addresses;
const ParsedLoadBalancingConfig* config = nullptr;
const HealthCheckParsedObject* health_check = nullptr;
const grpc_channel_args* args = nullptr;
// TODO(roth): Remove everything below once channel args is

@ -292,9 +292,7 @@ class GrpcLb : public LoadBalancingPolicy {
explicit Helper(RefCountedPtr<GrpcLb> parent)
: parent_(std::move(parent)) {}
Subchannel* CreateSubchannel(
const grpc_channel_args& args,
const HealthCheckParsedObject* health_check) override;
Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state,
@ -619,15 +617,12 @@ bool GrpcLb::Helper::CalledByCurrentChild() const {
return child_ == parent_->child_policy_.get();
}
Subchannel* GrpcLb::Helper::CreateSubchannel(
const grpc_channel_args& args,
const HealthCheckParsedObject* health_check) {
Subchannel* GrpcLb::Helper::CreateSubchannel(const grpc_channel_args& args) {
if (parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return nullptr;
}
return parent_->channel_control_helper()->CreateSubchannel(args,
health_check);
return parent_->channel_control_helper()->CreateSubchannel(args);
}
grpc_channel* GrpcLb::Helper::CreateChannel(const char* target,

@ -88,10 +88,9 @@ class PickFirst : public LoadBalancingPolicy {
PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
const ServerAddressList& addresses,
grpc_combiner* combiner,
const grpc_channel_args& args,
const HealthCheckParsedObject* health_check)
const grpc_channel_args& args)
: SubchannelList(policy, tracer, addresses, combiner,
policy->channel_control_helper(), args, health_check) {
policy->channel_control_helper(), args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
@ -256,8 +255,7 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(args.args, &new_arg, 1);
auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
this, &grpc_lb_pick_first_trace, args.addresses, combiner(), *new_args,
args.health_check);
this, &grpc_lb_pick_first_trace, args.addresses, combiner(), *new_args);
grpc_channel_args_destroy(new_args);
if (subchannel_list->num_subchannels() == 0) {
// Empty update or no valid subchannels. Unsubscribe from all current

@ -109,10 +109,9 @@ class RoundRobin : public LoadBalancingPolicy {
RoundRobinSubchannelList(RoundRobin* policy, TraceFlag* tracer,
const ServerAddressList& addresses,
grpc_combiner* combiner,
const grpc_channel_args& args,
const HealthCheckParsedObject* health_check)
const grpc_channel_args& args)
: SubchannelList(policy, tracer, addresses, combiner,
policy->channel_control_helper(), args, health_check) {
policy->channel_control_helper(), args) {
// Need to maintain a ref to the LB policy as long as we maintain
// any references to subchannels, since the subchannels'
// pollset_sets will include the LB policy's pollset_set.
@ -481,8 +480,7 @@ void RoundRobin::UpdateLocked(UpdateArgs args) {
}
}
latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
this, &grpc_lb_round_robin_trace, args.addresses, combiner(), *args.args,
args.health_check);
this, &grpc_lb_round_robin_trace, args.addresses, combiner(), *args.args);
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
// If the new list is empty, immediately promote the new list to the
// current list and transition to TRANSIENT_FAILURE.

@ -232,8 +232,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
const ServerAddressList& addresses, grpc_combiner* combiner,
LoadBalancingPolicy::ChannelControlHelper* helper,
const grpc_channel_args& args,
const HealthCheckParsedObject* health_check);
const grpc_channel_args& args);
virtual ~SubchannelList();
@ -486,7 +485,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
LoadBalancingPolicy* policy, TraceFlag* tracer,
const ServerAddressList& addresses, grpc_combiner* combiner,
LoadBalancingPolicy::ChannelControlHelper* helper,
const grpc_channel_args& args, const HealthCheckParsedObject* health_check)
const grpc_channel_args& args)
: InternallyRefCounted<SubchannelListType>(tracer),
policy_(policy),
tracer_(tracer),
@ -521,7 +520,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
&args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove),
args_to_add.data(), args_to_add.size());
gpr_free(args_to_add[subchannel_address_arg_index].value.string);
Subchannel* subchannel = helper->CreateSubchannel(*new_args, health_check);
Subchannel* subchannel = helper->CreateSubchannel(*new_args);
grpc_channel_args_destroy(new_args);
if (subchannel == nullptr) {
// Subchannel could not be created.

@ -323,9 +323,7 @@ class XdsLb : public LoadBalancingPolicy {
explicit Helper(RefCountedPtr<LocalityEntry> entry)
: entry_(std::move(entry)) {}
Subchannel* CreateSubchannel(
const grpc_channel_args& args,
const HealthCheckParsedObject* health_check) override;
Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
grpc_channel* CreateChannel(const char* target,
const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state,
@ -1574,14 +1572,12 @@ bool XdsLb::LocalityMap::LocalityEntry::Helper::CalledByCurrentChild() const {
}
Subchannel* XdsLb::LocalityMap::LocalityEntry::Helper::CreateSubchannel(
const grpc_channel_args& args,
const HealthCheckParsedObject* health_check) {
const grpc_channel_args& args) {
if (entry_->parent_->shutting_down_ ||
(!CalledByPendingChild() && !CalledByCurrentChild())) {
return nullptr;
}
return entry_->parent_->channel_control_helper()->CreateSubchannel(
args, health_check);
return entry_->parent_->channel_control_helper()->CreateSubchannel(args);
}
grpc_channel* XdsLb::LocalityMap::LocalityEntry::Helper::CreateChannel(

@ -106,13 +106,10 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
: parent_(std::move(parent)) {}
Subchannel* CreateSubchannel(
const grpc_channel_args& args,
const HealthCheckParsedObject* health_check) override {
Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
return parent_->channel_control_helper()->CreateSubchannel(args,
health_check);
return parent_->channel_control_helper()->CreateSubchannel(args);
}
grpc_channel* CreateChannel(const char* target,
@ -336,8 +333,7 @@ void ResolvingLoadBalancingPolicy::OnResolverError(grpc_error* error) {
void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
const char* lb_policy_name,
const ParsedLoadBalancingConfig* lb_policy_config, Resolver::Result result,
TraceStringVector* trace_strings,
const HealthCheckParsedObject* health_check) {
TraceStringVector* trace_strings) {
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
@ -430,7 +426,6 @@ void ResolvingLoadBalancingPolicy::CreateOrUpdateLbPolicyLocked(
UpdateArgs update_args;
update_args.addresses = std::move(result.addresses);
update_args.config = std::move(lb_policy_config);
update_args.health_check = health_check;
// TODO(roth): Once channel args is converted to C++, use std::move() here.
update_args.args = result.args;
result.args = nullptr;
@ -535,12 +530,11 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
// Process the resolver result.
const char* lb_policy_name = nullptr;
const ParsedLoadBalancingConfig* lb_policy_config = nullptr;
const HealthCheckParsedObject* health_check = nullptr;
bool service_config_changed = false;
if (process_resolver_result_ != nullptr) {
service_config_changed = process_resolver_result_(
process_resolver_result_user_data_, result, &lb_policy_name,
&lb_policy_config, &health_check);
service_config_changed =
process_resolver_result_(process_resolver_result_user_data_, result,
&lb_policy_name, &lb_policy_config);
} else {
lb_policy_name = child_policy_name_.get();
lb_policy_config = child_lb_config_;
@ -548,7 +542,7 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
GPR_ASSERT(lb_policy_name != nullptr);
// Create or update LB policy, as needed.
CreateOrUpdateLbPolicyLocked(lb_policy_name, lb_policy_config,
std::move(result), &trace_strings, health_check);
std::move(result), &trace_strings);
// Add channel trace event.
if (channelz_node() != nullptr) {
if (service_config_changed) {

@ -68,8 +68,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
typedef bool (*ProcessResolverResultCallback)(
void* user_data, const Resolver::Result& result,
const char** lb_policy_name,
const ParsedLoadBalancingConfig** lb_policy_config,
const HealthCheckParsedObject** health_check);
const ParsedLoadBalancingConfig** lb_policy_config);
// If error is set when this returns, then construction failed, and
// the caller may not use the new object.
ResolvingLoadBalancingPolicy(
@ -108,8 +107,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
void CreateOrUpdateLbPolicyLocked(
const char* lb_policy_name,
const ParsedLoadBalancingConfig* lb_policy_config,
Resolver::Result result, TraceStringVector* trace_strings,
const HealthCheckParsedObject* health_check);
Resolver::Result result, TraceStringVector* trace_strings);
OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
const char* lb_policy_name, const grpc_channel_args& args,
TraceStringVector* trace_strings);

@ -532,8 +532,7 @@ BackOff::Options ParseArgsForBackoffValues(
} // namespace
Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector,
const grpc_channel_args* args,
const HealthCheckParsedObject* health_check)
const grpc_channel_args* args)
: key_(key),
connector_(connector),
backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_ms_)) {
@ -565,10 +564,9 @@ Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector,
"subchannel");
grpc_connectivity_state_init(&state_and_health_tracker_, GRPC_CHANNEL_IDLE,
"subchannel");
if (health_check != nullptr) {
health_check_service_name_ =
UniquePtr<char>(gpr_strdup(health_check->service_name()));
}
UniquePtr<char>(gpr_strdup(grpc_channel_arg_get_string(
grpc_channel_args_find(args_, "grpc.temp.health_check"))));
const grpc_arg* arg = grpc_channel_args_find(args_, GRPC_ARG_ENABLE_CHANNELZ);
const bool channelz_enabled =
grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT);
@ -603,8 +601,7 @@ Subchannel::~Subchannel() {
}
Subchannel* Subchannel::Create(grpc_connector* connector,
const grpc_channel_args* args,
const HealthCheckParsedObject* health_check) {
const grpc_channel_args* args) {
SubchannelKey* key = New<SubchannelKey>(args);
SubchannelPoolInterface* subchannel_pool =
SubchannelPoolInterface::GetSubchannelPoolFromChannelArgs(args);
@ -614,7 +611,7 @@ Subchannel* Subchannel::Create(grpc_connector* connector,
Delete(key);
return c;
}
c = New<Subchannel>(key, connector, args, health_check);
c = New<Subchannel>(key, connector, args);
// Try to register the subchannel before setting the subchannel pool.
// Otherwise, in case of a registration race, unreffing c in
// RegisterSubchannel() will cause c to be tried to be unregistered, while

@ -179,14 +179,12 @@ class Subchannel {
public:
// The ctor and dtor are not intended to use directly.
Subchannel(SubchannelKey* key, grpc_connector* connector,
const grpc_channel_args* args,
const HealthCheckParsedObject* health_check);
const grpc_channel_args* args);
~Subchannel();
// Creates a subchannel given \a connector and \a args.
static Subchannel* Create(grpc_connector* connector,
const grpc_channel_args* args,
const HealthCheckParsedObject* health_check);
const grpc_channel_args* args);
// Strong and weak refcounting.
Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);

@ -37,13 +37,11 @@ namespace grpc_core {
class Chttp2InsecureClientChannelFactory : public ClientChannelFactory {
public:
Subchannel* CreateSubchannel(
const grpc_channel_args* args,
const HealthCheckParsedObject* health_check) override {
Subchannel* CreateSubchannel(const grpc_channel_args* args) override {
grpc_channel_args* new_args =
grpc_default_authority_add_if_not_present(args);
grpc_connector* connector = grpc_chttp2_connector_create();
Subchannel* s = Subchannel::Create(connector, new_args, health_check);
Subchannel* s = Subchannel::Create(connector, new_args);
grpc_connector_unref(connector);
grpc_channel_args_destroy(new_args);
return s;

@ -44,9 +44,7 @@ namespace grpc_core {
class Chttp2SecureClientChannelFactory : public ClientChannelFactory {
public:
Subchannel* CreateSubchannel(
const grpc_channel_args* args,
const HealthCheckParsedObject* health_check) override {
Subchannel* CreateSubchannel(const grpc_channel_args* args) override {
grpc_channel_args* new_args = GetSecureNamingChannelArgs(args);
if (new_args == nullptr) {
gpr_log(GPR_ERROR,
@ -54,7 +52,7 @@ class Chttp2SecureClientChannelFactory : public ClientChannelFactory {
return nullptr;
}
grpc_connector* connector = grpc_chttp2_connector_create();
Subchannel* s = Subchannel::Create(connector, new_args, health_check);
Subchannel* s = Subchannel::Create(connector, new_args);
grpc_connector_unref(connector);
grpc_channel_args_destroy(new_args);
return s;

@ -141,11 +141,8 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
InterceptRecvTrailingMetadataCallback cb, void* user_data)
: parent_(std::move(parent)), cb_(cb), user_data_(user_data) {}
Subchannel* CreateSubchannel(
const grpc_channel_args& args,
const HealthCheckParsedObject* health_check) override {
return parent_->channel_control_helper()->CreateSubchannel(args,
health_check);
Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
return parent_->channel_control_helper()->CreateSubchannel(args);
}
grpc_channel* CreateChannel(const char* target,

@ -322,8 +322,7 @@ static void DoNothing(void* arg, grpc_error* error) {}
class FakeClientChannelFactory : public grpc_core::ClientChannelFactory {
public:
grpc_core::Subchannel* CreateSubchannel(
const grpc_channel_args* args,
const grpc_core::HealthCheckParsedObject* health_check) override {
const grpc_channel_args* args) override {
return nullptr;
}
grpc_channel* CreateChannel(const char* target,

Loading…
Cancel
Save