pass health check service name through LB policies via a channel arg (#26441)

* grab connected subchannel in the data plane

* don't return connected subchannel via state watch

* fix build

* fix build

* pass health check service name through LB policies via a channel arg

* fix build

* clang-format

* add trace log
pull/27636/head
Mark D. Roth 3 years ago committed by GitHub
parent 5912aedb2b
commit cb8dafa248
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 103
      src/core/ext/filters/client_channel/client_channel.cc
  2. 7
      src/core/ext/filters/client_channel/client_channel.h
  3. 28
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  4. 7
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc

@ -79,6 +79,9 @@
// Client channel filter
//
#define GRPC_ARG_HEALTH_CHECK_SERVICE_NAME \
"grpc.internal.health_check_service_name"
namespace grpc_core {
using internal::ClientChannelGlobalParsedConfig;
@ -494,8 +497,7 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
}
grpc_connectivity_state CheckConnectivityState() override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
grpc_connectivity_state CheckConnectivityState() override {
return subchannel_->CheckConnectivityState(health_check_service_name_);
}
@ -539,41 +541,6 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
}
void UpdateHealthCheckServiceName(
absl::optional<std::string> health_check_service_name)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::work_serializer_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p: subchannel wrapper %p: updating health check service "
"name from \"%s\" to \"%s\"",
chand_, this, health_check_service_name_->c_str(),
health_check_service_name->c_str());
}
for (auto& p : watcher_map_) {
WatcherWrapper*& watcher_wrapper = p.second;
// Cancel the current watcher and create a new one using the new
// health check service name.
// TODO(roth): If there is not already an existing health watch
// call for the new name, then the watcher will initially report
// state CONNECTING. If the LB policy is currently reporting
// state READY, this may cause it to switch to CONNECTING before
// switching back to READY. This could cause a small delay for
// RPCs being started on the channel. If/when this becomes a
// problem, we may be able to handle it by waiting for the new
// watcher to report READY before we use it to replace the old one.
WatcherWrapper* replacement = watcher_wrapper->MakeReplacement();
subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
watcher_wrapper);
watcher_wrapper = replacement;
subchannel_->WatchConnectivityState(
replacement->last_seen_state(), health_check_service_name,
RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
replacement));
}
// Save the new health check service name.
health_check_service_name_ = std::move(health_check_service_name);
}
private:
// Subchannel and SubchannelInterface have different interfaces for
// their respective ConnectivityStateWatcherInterface classes.
@ -698,14 +665,14 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
ClientChannel* chand_;
RefCountedPtr<Subchannel> subchannel_;
absl::optional<std::string> health_check_service_name_
ABSL_GUARDED_BY(&ClientChannel::work_serializer_);
absl::optional<std::string> health_check_service_name_;
// Maps from the address of the watcher passed to us by the LB policy
// to the address of the WrapperWatcher that we passed to the underlying
// subchannel. This is needed so that when the LB policy calls
// CancelConnectivityStateWatch() with its watcher, we know the
// corresponding WrapperWatcher to cancel on the underlying subchannel.
std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_;
std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_
ABSL_GUARDED_BY(&ClientChannel::work_serializer_);
};
//
@ -902,15 +869,20 @@ class ClientChannel::ClientChannelControlHelper
ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
if (chand_->resolver_ == nullptr) return nullptr; // Shutting down.
// Determine health check service name.
bool inhibit_health_checking = grpc_channel_args_find_bool(
&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING, false);
absl::optional<std::string> health_check_service_name;
if (!inhibit_health_checking) {
health_check_service_name = chand_->health_check_service_name_;
const char* health_check_service_name_arg = grpc_channel_args_find_string(
&args, GRPC_ARG_HEALTH_CHECK_SERVICE_NAME);
if (health_check_service_name_arg != nullptr) {
bool inhibit_health_checking = grpc_channel_args_find_bool(
&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING, false);
if (!inhibit_health_checking) {
health_check_service_name = health_check_service_name_arg;
}
}
// Construct channel args for subchannel.
// Remove channel args that should not affect subchannel uniqueness.
absl::InlinedVector<const char*, 3> args_to_remove = {
absl::InlinedVector<const char*, 4> args_to_remove = {
GRPC_ARG_HEALTH_CHECK_SERVICE_NAME,
GRPC_ARG_INHIBIT_HEALTH_CHECKING,
GRPC_ARG_CHANNELZ_CHANNEL_NODE,
};
@ -1302,15 +1274,16 @@ void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
// If either has changed, apply the global parameters now.
if (service_config_changed || config_selector_changed) {
// Update service config in control plane.
UpdateServiceConfigInControlPlaneLocked(
std::move(service_config), std::move(config_selector),
parsed_service_config, lb_policy_config->name());
UpdateServiceConfigInControlPlaneLocked(std::move(service_config),
std::move(config_selector),
lb_policy_config->name());
} else if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
}
// Create or update LB policy, as needed.
CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config),
std::move(result));
CreateOrUpdateLbPolicyLocked(
std::move(lb_policy_config),
parsed_service_config->health_check_service_name(), std::move(result));
if (service_config_changed || config_selector_changed) {
// Start using new service config for calls.
// This needs to happen after the LB policy has been updated, since
@ -1376,17 +1349,25 @@ void ClientChannel::OnResolverErrorLocked(grpc_error_handle error) {
void ClientChannel::CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
const absl::optional<std::string>& health_check_service_name,
Resolver::Result result) {
// Construct update.
LoadBalancingPolicy::UpdateArgs update_args;
update_args.addresses = std::move(result.addresses);
update_args.config = std::move(lb_policy_config);
// Add health check service name to channel args.
absl::InlinedVector<grpc_arg, 1> args_to_add;
if (health_check_service_name.has_value()) {
args_to_add.push_back(grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME),
const_cast<char*>(health_check_service_name->c_str())));
}
// Remove the config selector from channel args so that we're not holding
// unnecessary refs that cause it to be destroyed somewhere other than in the
// WorkSerializer.
const char* arg_name = GRPC_ARG_CONFIG_SELECTOR;
update_args.args =
grpc_channel_args_copy_and_remove(result.args, &arg_name, 1);
const char* arg_to_remove = GRPC_ARG_CONFIG_SELECTOR;
update_args.args = grpc_channel_args_copy_and_add_and_remove(
result.args, &arg_to_remove, 1, args_to_add.data(), args_to_add.size());
// Create policy if needed.
if (lb_policy_ == nullptr) {
lb_policy_ = CreateLbPolicyLocked(*update_args.args);
@ -1445,9 +1426,7 @@ void ClientChannel::RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
void ClientChannel::UpdateServiceConfigInControlPlaneLocked(
RefCountedPtr<ServiceConfig> service_config,
RefCountedPtr<ConfigSelector> config_selector,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
const char* lb_policy_name) {
RefCountedPtr<ConfigSelector> config_selector, const char* lb_policy_name) {
UniquePtr<char> service_config_json(
gpr_strdup(service_config->json_string().c_str()));
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
@ -1457,17 +1436,6 @@ void ClientChannel::UpdateServiceConfigInControlPlaneLocked(
}
// Save service config.
saved_service_config_ = std::move(service_config);
// Update health check service name if needed.
if (health_check_service_name_ !=
parsed_service_config->health_check_service_name()) {
health_check_service_name_ =
parsed_service_config->health_check_service_name();
// Update health check service name used by existing subchannel wrappers.
for (auto* subchannel_wrapper : subchannel_wrappers_) {
subchannel_wrapper->UpdateHealthCheckServiceName(
health_check_service_name_);
}
}
// Swap out the data used by GetChannelInfo().
UniquePtr<char> lb_policy_name_owned(gpr_strdup(lb_policy_name));
{
@ -1525,7 +1493,6 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
//
// We defer unreffing the old values (and deallocating memory) until
// after releasing the lock to keep the critical section small.
std::set<grpc_call_element*> calls_pending_resolver_result;
{
MutexLock lock(&resolution_mu_);
GRPC_ERROR_UNREF(resolver_transient_failure_error_);

@ -217,6 +217,7 @@ class ClientChannel {
void CreateOrUpdateLbPolicyLocked(
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
const absl::optional<std::string>& health_check_service_name,
Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
const grpc_channel_args& args)
@ -230,9 +231,7 @@ class ClientChannel {
void UpdateServiceConfigInControlPlaneLocked(
RefCountedPtr<ServiceConfig> service_config,
RefCountedPtr<ConfigSelector> config_selector,
const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
const char* lb_policy_name)
RefCountedPtr<ConfigSelector> config_selector, const char* lb_policy_name)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
void UpdateServiceConfigInDataPlaneLocked()
@ -315,8 +314,6 @@ class ClientChannel {
ABSL_GUARDED_BY(work_serializer_);
RefCountedPtr<ConfigSelector> saved_config_selector_
ABSL_GUARDED_BY(work_serializer_);
absl::optional<std::string> health_check_service_name_
ABSL_GUARDED_BY(work_serializer_);
OrphanablePtr<LoadBalancingPolicy> lb_policy_
ABSL_GUARDED_BY(work_serializer_);
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_

@ -377,15 +377,31 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
sd = subchannel_list()->subchannel(next_index);
// If we're tried all subchannels, set state to TRANSIENT_FAILURE.
if (sd->Index() == 0) {
// Re-resolve if this is the most recent subchannel list.
if (subchannel_list() == (p->latest_pending_subchannel_list_ != nullptr
? p->latest_pending_subchannel_list_.get()
: p->subchannel_list_.get())) {
p->channel_control_helper()->RequestReresolution();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p subchannel list %p failed to connect to "
"all subchannels",
p, subchannel_list());
}
subchannel_list()->set_in_transient_failure(true);
// Only report new state in case 1.
// In case 2, swap to the new subchannel list. This means reporting
// TRANSIENT_FAILURE and dropping the existing (working) connection,
// but we can't ignore what the control plane has told us.
if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO,
"Pick First %p promoting pending subchannel list %p to "
"replace %p",
p, p->latest_pending_subchannel_list_.get(),
p->subchannel_list_.get());
}
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
}
// If this is the current subchannel list (either because we were
// in case 1 or because we were in case 2 and just promoted it to
// be the current list), re-resolve and report new state.
if (subchannel_list() == p->subchannel_list_.get()) {
p->channel_control_helper()->RequestReresolution();
absl::Status status =
absl::UnavailableError("failed to connect to all addresses");
p->channel_control_helper()->UpdateState(

@ -339,7 +339,12 @@ void RoundRobin::RoundRobinSubchannelList::
void RoundRobin::RoundRobinSubchannelList::
UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
RoundRobin* p = static_cast<RoundRobin*>(policy());
if (num_ready_ > 0) {
// If we have at least one READY subchannel, then swap to the new list.
// Also, if all of the subchannels are in TRANSIENT_FAILURE, then we know
// we've tried all of them and failed, so we go ahead and swap over
// anyway; this may cause the channel to go from READY to TRANSIENT_FAILURE,
// but we are doing what the control plane told us to do.
if (num_ready_ > 0 || num_transient_failure_ == num_subchannels()) {
if (p->subchannel_list_.get() != this) {
// Promote this list to p->subchannel_list_.
// This list must be p->latest_pending_subchannel_list_, because

Loading…
Cancel
Save