Merge pull request #18839 from markdroth/subchannel_watcher

Revamp subchannel connectivity state monitoring APIs.
pull/18910/head
Mark D. Roth 6 years ago committed by GitHub
commit e802932a5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 51
      src/core/ext/filters/client_channel/client_channel.cc
  2. 4
      src/core/ext/filters/client_channel/client_channel_channelz.cc
  3. 29
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  4. 10
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  5. 360
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  6. 2
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  7. 3
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  8. 428
      src/core/ext/filters/client_channel/subchannel.cc
  9. 140
      src/core/ext/filters/client_channel/subchannel.h
  10. 67
      test/cpp/end2end/client_lb_end2end_test.cc

@ -222,7 +222,7 @@ class ChannelData {
~ChannelData();
static bool ProcessResolverResultLocked(
void* arg, const Resolver::Result& result, const char** lb_policy_name,
void* arg, Resolver::Result* result, const char** lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
grpc_error** service_config_error);
@ -271,7 +271,6 @@ class ChannelData {
OrphanablePtr<LoadBalancingPolicy> resolving_lb_policy_;
grpc_connectivity_state_tracker state_tracker_;
ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_;
UniquePtr<char> health_check_service_name_;
RefCountedPtr<ServiceConfig> saved_service_config_;
bool received_first_resolver_result_ = false;
@ -951,18 +950,10 @@ class ChannelData::ClientChannelControlHelper
}
Subchannel* CreateSubchannel(const grpc_channel_args& args) override {
grpc_arg args_to_add[2];
int num_args_to_add = 0;
if (chand_->health_check_service_name_ != nullptr) {
args_to_add[0] = grpc_channel_arg_string_create(
const_cast<char*>("grpc.temp.health_check"),
const_cast<char*>(chand_->health_check_service_name_.get()));
num_args_to_add++;
}
args_to_add[num_args_to_add++] = SubchannelPoolInterface::CreateChannelArg(
grpc_arg arg = SubchannelPoolInterface::CreateChannelArg(
chand_->subchannel_pool_.get());
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(&args, args_to_add, num_args_to_add);
grpc_channel_args_copy_and_add(&args, &arg, 1);
Subchannel* subchannel =
chand_->client_channel_factory_->CreateSubchannel(new_args);
grpc_channel_args_destroy(new_args);
@ -1201,14 +1192,14 @@ void ChannelData::ProcessLbPolicy(
// Synchronous callback from ResolvingLoadBalancingPolicy to process a
// resolver result update.
bool ChannelData::ProcessResolverResultLocked(
void* arg, const Resolver::Result& result, const char** lb_policy_name,
void* arg, Resolver::Result* result, const char** lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
grpc_error** service_config_error) {
ChannelData* chand = static_cast<ChannelData*>(arg);
RefCountedPtr<ServiceConfig> service_config;
// If resolver did not return a service config or returned an invalid service
// config, we need a fallback service config.
if (result.service_config_error != GRPC_ERROR_NONE) {
if (result->service_config_error != GRPC_ERROR_NONE) {
// If the service config was invalid, then fallback to the saved service
// config. If there is no saved config either, use the default service
// config.
@ -1229,7 +1220,7 @@ bool ChannelData::ProcessResolverResultLocked(
}
service_config = chand->default_service_config_;
}
} else if (result.service_config == nullptr) {
} else if (result->service_config == nullptr) {
if (chand->default_service_config_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
@ -1240,15 +1231,15 @@ bool ChannelData::ProcessResolverResultLocked(
service_config = chand->default_service_config_;
}
} else {
service_config = result.service_config;
service_config = result->service_config;
}
*service_config_error = GRPC_ERROR_REF(result.service_config_error);
*service_config_error = GRPC_ERROR_REF(result->service_config_error);
if (service_config == nullptr &&
result.service_config_error != GRPC_ERROR_NONE) {
result->service_config_error != GRPC_ERROR_NONE) {
return false;
}
UniquePtr<char> service_config_json;
// Process service config.
UniquePtr<char> service_config_json;
const internal::ClientChannelGlobalParsedObject* parsed_service_config =
nullptr;
if (service_config != nullptr) {
@ -1257,6 +1248,20 @@ bool ChannelData::ProcessResolverResultLocked(
service_config->GetParsedGlobalServiceConfigObject(
internal::ClientChannelServiceConfigParser::ParserIndex()));
}
// TODO(roth): Eliminate this hack as part of hiding health check
// service name from LB policy API. As part of this, change the API
// for this function to pass in result as a const reference.
if (parsed_service_config != nullptr &&
parsed_service_config->health_check_service_name() != nullptr) {
grpc_arg new_arg = grpc_channel_arg_string_create(
const_cast<char*>("grpc.temp.health_check"),
const_cast<char*>(parsed_service_config->health_check_service_name()));
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add(result->args, &new_arg, 1);
grpc_channel_args_destroy(result->args);
result->args = new_args;
}
// Check if the config has changed.
const bool service_config_changed =
((service_config == nullptr) !=
(chand->saved_service_config_ == nullptr)) ||
@ -1273,12 +1278,6 @@ bool ChannelData::ProcessResolverResultLocked(
chand, service_config_json.get());
}
chand->saved_service_config_ = std::move(service_config);
if (parsed_service_config != nullptr) {
chand->health_check_service_name_.reset(
gpr_strdup(parsed_service_config->health_check_service_name()));
} else {
chand->health_check_service_name_.reset();
}
}
// We want to set the service config at least once. This should not really be
// needed, but we are doing it as a defensive approach. This can be removed,
@ -1296,7 +1295,7 @@ bool ChannelData::ProcessResolverResultLocked(
chand->saved_service_config_);
}
UniquePtr<char> processed_lb_policy_name;
chand->ProcessLbPolicy(result, parsed_service_config,
chand->ProcessLbPolicy(*result, parsed_service_config,
&processed_lb_policy_name, lb_policy_config);
// Swap out the data used by GetChannelInfo().
{

@ -127,7 +127,9 @@ void SubchannelNode::PopulateConnectivityState(grpc_json* json) {
if (subchannel_ == nullptr) {
state = GRPC_CHANNEL_SHUTDOWN;
} else {
state = subchannel_->CheckConnectivity(true /* inhibit_health_checking */);
state = subchannel_->CheckConnectivityState(
nullptr /* health_check_service_name */,
nullptr /* connected_subchannel */);
}
json = grpc_json_create_child(nullptr, json, "state", nullptr,
GRPC_JSON_OBJECT, false);

@ -68,9 +68,8 @@ class PickFirst : public LoadBalancingPolicy {
PickFirstSubchannelData(
SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
subchannel_list,
const ServerAddress& address, Subchannel* subchannel,
grpc_combiner* combiner)
: SubchannelData(subchannel_list, address, subchannel, combiner) {}
const ServerAddress& address, Subchannel* subchannel)
: SubchannelData(subchannel_list, address, subchannel) {}
void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) override;
@ -312,6 +311,7 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
// here, since we've already checked the initial connectivity
// state of all subchannels above.
subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
subchannel_list_->subchannel(0)->subchannel()->AttemptToConnect();
}
} else {
// We do have a selected subchannel (which means it's READY), so keep
@ -334,6 +334,9 @@ void PickFirst::UpdateLocked(UpdateArgs args) {
// state of all subchannels above.
latest_pending_subchannel_list_->subchannel(0)
->StartConnectivityWatchLocked();
latest_pending_subchannel_list_->subchannel(0)
->subchannel()
->AttemptToConnect();
}
}
}
@ -366,7 +369,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->subchannel_list_.get());
}
p->selected_ = nullptr;
StopConnectivityWatchLocked();
CancelConnectivityWatchLocked(
"selected subchannel failed; switching to pending update");
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
// Set our state to that of the pending subchannel list.
if (p->subchannel_list_->in_transient_failure()) {
@ -391,7 +395,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->idle_ = true;
p->channel_control_helper()->RequestReresolution();
p->selected_ = nullptr;
StopConnectivityWatchLocked();
CancelConnectivityWatchLocked("selected subchannel failed; going IDLE");
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_IDLE,
UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
@ -408,8 +412,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
connectivity_state,
UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
}
// Renew notification.
RenewConnectivityWatchLocked();
}
}
return;
@ -426,13 +428,11 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
subchannel_list()->set_in_transient_failure(false);
switch (connectivity_state) {
case GRPC_CHANNEL_READY: {
// Renew notification.
RenewConnectivityWatchLocked();
ProcessUnselectedReadyLocked();
break;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
StopConnectivityWatchLocked();
CancelConnectivityWatchLocked("connection attempt failed");
PickFirstSubchannelData* sd = this;
size_t next_index =
(sd->Index() + 1) % subchannel_list()->num_subchannels();
@ -468,8 +468,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
GRPC_CHANNEL_CONNECTING,
UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
}
// Renew notification.
RenewConnectivityWatchLocked();
break;
}
case GRPC_CHANNEL_SHUTDOWN:
@ -521,8 +519,11 @@ void PickFirst::PickFirstSubchannelData::
// If current state is READY, select the subchannel now, since we started
// watching from this state and will not get a notification of it
// transitioning into this state.
if (p->selected_ != this && current_state == GRPC_CHANNEL_READY) {
ProcessUnselectedReadyLocked();
// If the current state is not READY, attempt to connect.
if (current_state == GRPC_CHANNEL_READY) {
if (p->selected_ != this) ProcessUnselectedReadyLocked();
} else {
subchannel()->AttemptToConnect();
}
}

@ -83,9 +83,8 @@ class RoundRobin : public LoadBalancingPolicy {
RoundRobinSubchannelData(
SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>*
subchannel_list,
const ServerAddress& address, Subchannel* subchannel,
grpc_combiner* combiner)
: SubchannelData(subchannel_list, address, subchannel, combiner) {}
const ServerAddress& address, Subchannel* subchannel)
: SubchannelData(subchannel_list, address, subchannel) {}
grpc_connectivity_state connectivity_state() const {
return last_connectivity_state_;
@ -320,6 +319,7 @@ void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
for (size_t i = 0; i < num_subchannels(); i++) {
if (subchannel(i)->subchannel() != nullptr) {
subchannel(i)->StartConnectivityWatchLocked();
subchannel(i)->subchannel()->AttemptToConnect();
}
}
// Now set the LB policy's state based on the subchannels' states.
@ -448,6 +448,7 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
// Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
// when the subchannel list was created, we'd wind up in a constant
// loop of re-resolution.
// Also attempt to reconnect.
if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO,
@ -456,9 +457,8 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
p, subchannel());
}
p->channel_control_helper()->RequestReresolution();
subchannel()->AttemptToConnect();
}
// Renew connectivity watch.
RenewConnectivityWatchLocked();
// Update state counters.
UpdateConnectivityStateLocked(connectivity_state);
// Update overall state and renew notification.

@ -98,15 +98,13 @@ class SubchannelData {
// Synchronously checks the subchannel's connectivity state.
// Must not be called while there is a connectivity notification
// pending (i.e., between calling StartConnectivityWatchLocked() or
// RenewConnectivityWatchLocked() and the resulting invocation of
// ProcessConnectivityChangeLocked()).
// pending (i.e., between calling StartConnectivityWatchLocked() and
// calling CancelConnectivityWatchLocked()).
grpc_connectivity_state CheckConnectivityStateLocked() {
GPR_ASSERT(!connectivity_notification_pending_);
pending_connectivity_state_unsafe_ = subchannel()->CheckConnectivity(
subchannel_list_->inhibit_health_checking());
UpdateConnectedSubchannelLocked();
return pending_connectivity_state_unsafe_;
GPR_ASSERT(pending_watcher_ == nullptr);
connectivity_state_ = subchannel()->CheckConnectivityState(
subchannel_list_->health_check_service_name(), &connected_subchannel_);
return connectivity_state_;
}
// Resets the connection backoff.
@ -115,23 +113,11 @@ class SubchannelData {
void ResetBackoffLocked();
// Starts watching the connectivity state of the subchannel.
// ProcessConnectivityChangeLocked() will be called when the
// ProcessConnectivityChangeLocked() will be called whenever the
// connectivity state changes.
void StartConnectivityWatchLocked();
// Renews watching the connectivity state of the subchannel.
void RenewConnectivityWatchLocked();
// Stops watching the connectivity state of the subchannel.
void StopConnectivityWatchLocked();
// Cancels watching the connectivity state of the subchannel.
// Must be called only while there is a connectivity notification
// pending (i.e., between calling StartConnectivityWatchLocked() or
// RenewConnectivityWatchLocked() and the resulting invocation of
// ProcessConnectivityChangeLocked()).
// From within ProcessConnectivityChangeLocked(), use
// StopConnectivityWatchLocked() instead.
void CancelConnectivityWatchLocked(const char* reason);
// Cancels any pending connectivity watch and unrefs the subchannel.
@ -142,44 +128,80 @@ class SubchannelData {
protected:
SubchannelData(
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
const ServerAddress& address, Subchannel* subchannel,
grpc_combiner* combiner);
const ServerAddress& address, Subchannel* subchannel);
virtual ~SubchannelData();
// After StartConnectivityWatchLocked() or RenewConnectivityWatchLocked()
// is called, this method will be invoked when the subchannel's connectivity
// state changes.
// Implementations must invoke either RenewConnectivityWatchLocked() or
// StopConnectivityWatchLocked() before returning.
// After StartConnectivityWatchLocked() is called, this method will be
// invoked whenever the subchannel's connectivity state changes.
// To stop watching, use CancelConnectivityWatchLocked().
virtual void ProcessConnectivityChangeLocked(
grpc_connectivity_state connectivity_state) GRPC_ABSTRACT;
// Unrefs the subchannel.
void UnrefSubchannelLocked(const char* reason);
private:
// Updates connected_subchannel_ based on pending_connectivity_state_unsafe_.
// Returns true if the connectivity state should be reported.
bool UpdateConnectedSubchannelLocked();
// Watcher for subchannel connectivity state.
class Watcher : public Subchannel::ConnectivityStateWatcher {
public:
Watcher(
SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
RefCountedPtr<SubchannelListType> subchannel_list)
: subchannel_data_(subchannel_data),
subchannel_list_(std::move(subchannel_list)) {}
~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); }
void OnConnectivityStateChange(
grpc_connectivity_state new_state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel) override;
grpc_pollset_set* interested_parties() override {
return subchannel_list_->policy()->interested_parties();
}
private:
// A fire-and-forget class that bounces into the combiner to process
// a connectivity state update.
class Updater {
public:
Updater(
SubchannelData<SubchannelListType, SubchannelDataType>*
subchannel_data,
RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>>
subchannel_list,
grpc_connectivity_state state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel);
~Updater() {
subchannel_list_.reset(DEBUG_LOCATION, "Watcher::Updater dtor");
}
private:
static void OnUpdateLocked(void* arg, grpc_error* error);
SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_;
RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>>
subchannel_list_;
const grpc_connectivity_state state_;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
grpc_closure closure_;
};
static void OnConnectivityChangedLocked(void* arg, grpc_error* error);
SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_;
RefCountedPtr<SubchannelListType> subchannel_list_;
};
// Unrefs the subchannel.
void UnrefSubchannelLocked(const char* reason);
// Backpointer to owning subchannel list. Not owned.
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list_;
// The subchannel and connected subchannel.
// The subchannel.
Subchannel* subchannel_;
// Will be non-null when the subchannel's state is being watched.
Subchannel::ConnectivityStateWatcher* pending_watcher_ = nullptr;
// Data updated by the watcher.
grpc_connectivity_state connectivity_state_;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
// Notification that connectivity has changed on subchannel.
grpc_closure connectivity_changed_closure_;
// Is a connectivity notification pending?
bool connectivity_notification_pending_ = false;
// Connectivity state to be updated by
// grpc_subchannel_notify_on_state_change(), not guarded by
// the combiner.
grpc_connectivity_state pending_connectivity_state_unsafe_;
};
// A list of subchannels.
@ -213,7 +235,9 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
// Accessors.
LoadBalancingPolicy* policy() const { return policy_; }
TraceFlag* tracer() const { return tracer_; }
bool inhibit_health_checking() const { return inhibit_health_checking_; }
const char* health_check_service_name() const {
return health_check_service_name_.get();
}
// Resets connection backoff of all subchannels.
// TODO(roth): We will probably need to rethink this as part of moving
@ -251,7 +275,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
TraceFlag* tracer_;
bool inhibit_health_checking_;
UniquePtr<char> health_check_service_name_;
grpc_combiner* combiner_;
@ -268,6 +292,67 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
// implementation -- no user-servicable parts below
//
//
// SubchannelData::Watcher
//
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
OnConnectivityStateChange(
grpc_connectivity_state new_state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
// Will delete itself.
New<Updater>(subchannel_data_,
subchannel_list_->Ref(DEBUG_LOCATION, "Watcher::Updater"),
new_state, std::move(connected_subchannel));
}
template <typename SubchannelListType, typename SubchannelDataType>
SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::Updater::
Updater(
SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
RefCountedPtr<SubchannelList<SubchannelListType, SubchannelDataType>>
subchannel_list,
grpc_connectivity_state state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel)
: subchannel_data_(subchannel_data),
subchannel_list_(std::move(subchannel_list)),
state_(state),
connected_subchannel_(std::move(connected_subchannel)) {
GRPC_CLOSURE_INIT(&closure_, &OnUpdateLocked, this,
grpc_combiner_scheduler(subchannel_list_->combiner_));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::Updater::
OnUpdateLocked(void* arg, grpc_error* error) {
Updater* self = static_cast<Updater*>(arg);
SubchannelData* sd = self->subchannel_data_;
if (GRPC_TRACE_FLAG_ENABLED(*sd->subchannel_list_->tracer())) {
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): connectivity changed: state=%s, "
"connected_subchannel=%p, shutting_down=%d, pending_watcher=%p",
sd->subchannel_list_->tracer()->name(),
sd->subchannel_list_->policy(), sd->subchannel_list_, sd->Index(),
sd->subchannel_list_->num_subchannels(), sd->subchannel_,
grpc_connectivity_state_name(self->state_),
self->connected_subchannel_.get(),
sd->subchannel_list_->shutting_down(), sd->pending_watcher_);
}
if (!sd->subchannel_list_->shutting_down() &&
sd->pending_watcher_ != nullptr) {
sd->connectivity_state_ = self->state_;
// Get or release ref to connected subchannel.
sd->connected_subchannel_ = std::move(self->connected_subchannel_);
// Call the subclass's ProcessConnectivityChangeLocked() method.
sd->ProcessConnectivityChangeLocked(sd->connectivity_state_);
}
// Clean up.
Delete(self);
}
//
// SubchannelData
//
@ -275,23 +360,16 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
template <typename SubchannelListType, typename SubchannelDataType>
SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
const ServerAddress& address, Subchannel* subchannel,
grpc_combiner* combiner)
const ServerAddress& address, Subchannel* subchannel)
: subchannel_list_(subchannel_list),
subchannel_(subchannel),
// We assume that the current state is IDLE. If not, we'll get a
// callback telling us that.
pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE) {
GRPC_CLOSURE_INIT(
&connectivity_changed_closure_,
(&SubchannelData<SubchannelListType,
SubchannelDataType>::OnConnectivityChangedLocked),
this, grpc_combiner_scheduler(combiner));
}
connectivity_state_(GRPC_CHANNEL_IDLE) {}
template <typename SubchannelListType, typename SubchannelDataType>
SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() {
UnrefSubchannelLocked("subchannel_data_destroy");
GPR_ASSERT(subchannel_ == nullptr);
}
template <typename SubchannelListType, typename SubchannelDataType>
@ -326,56 +404,19 @@ void SubchannelData<SubchannelListType,
if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): starting watch: requesting connectivity change "
"notification (from %s)",
subchannel_list_->tracer()->name(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_,
grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
}
GPR_ASSERT(!connectivity_notification_pending_);
connectivity_notification_pending_ = true;
subchannel_list()->Ref(DEBUG_LOCATION, "connectivity_watch").release();
subchannel_->NotifyOnStateChange(
subchannel_list_->policy()->interested_parties(),
&pending_connectivity_state_unsafe_, &connectivity_changed_closure_,
subchannel_list_->inhibit_health_checking());
}
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType,
SubchannelDataType>::RenewConnectivityWatchLocked() {
if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): renewing watch: requesting connectivity change "
"notification (from %s)",
" (subchannel %p): starting watch (from %s)",
subchannel_list_->tracer()->name(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_,
grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
subchannel_, grpc_connectivity_state_name(connectivity_state_));
}
GPR_ASSERT(connectivity_notification_pending_);
subchannel_->NotifyOnStateChange(
subchannel_list_->policy()->interested_parties(),
&pending_connectivity_state_unsafe_, &connectivity_changed_closure_,
subchannel_list_->inhibit_health_checking());
}
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType,
SubchannelDataType>::StopConnectivityWatchLocked() {
if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): stopping connectivity watch",
subchannel_list_->tracer()->name(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_);
}
GPR_ASSERT(connectivity_notification_pending_);
connectivity_notification_pending_ = false;
subchannel_list()->Unref(DEBUG_LOCATION, "connectivity_watch");
GPR_ASSERT(pending_watcher_ == nullptr);
pending_watcher_ =
New<Watcher>(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher"));
subchannel_->WatchConnectivityState(
connectivity_state_,
UniquePtr<char>(
gpr_strdup(subchannel_list_->health_check_service_name())),
UniquePtr<Subchannel::ConnectivityStateWatcher>(pending_watcher_));
}
template <typename SubchannelListType, typename SubchannelDataType>
@ -389,91 +430,17 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_, reason);
}
GPR_ASSERT(connectivity_notification_pending_);
subchannel_->NotifyOnStateChange(nullptr, nullptr,
&connectivity_changed_closure_,
subchannel_list_->inhibit_health_checking());
}
template <typename SubchannelListType, typename SubchannelDataType>
bool SubchannelData<SubchannelListType,
SubchannelDataType>::UpdateConnectedSubchannelLocked() {
// If the subchannel is READY, take a ref to the connected subchannel.
if (pending_connectivity_state_unsafe_ == GRPC_CHANNEL_READY) {
connected_subchannel_ = subchannel_->connected_subchannel();
// If the subchannel became disconnected between the time that READY
// was reported and the time we got here (e.g., between when a
// notification callback is scheduled and when it was actually run in
// the combiner), then the connected subchannel may have disappeared out
// from under us. In that case, we don't actually want to consider the
// subchannel to be in state READY. Instead, we use IDLE as the
// basis for any future connectivity watch; this is the one state that
// the subchannel will never transition back into, so this ensures
// that we will get a notification for the next state, even if that state
// is READY again (e.g., if the subchannel has transitioned back to
// READY before the next watch gets requested).
if (connected_subchannel_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
gpr_log(GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): state is READY but connected subchannel is "
"null; moving to state IDLE",
subchannel_list_->tracer()->name(), subchannel_list_->policy(),
subchannel_list_, Index(), subchannel_list_->num_subchannels(),
subchannel_);
}
pending_connectivity_state_unsafe_ = GRPC_CHANNEL_IDLE;
return false;
}
} else {
// For any state other than READY, unref the connected subchannel.
connected_subchannel_.reset();
}
return true;
}
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::
OnConnectivityChangedLocked(void* arg, grpc_error* error) {
SubchannelData* sd = static_cast<SubchannelData*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(*sd->subchannel_list_->tracer())) {
gpr_log(
GPR_INFO,
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
" (subchannel %p): connectivity changed: state=%s, error=%s, "
"shutting_down=%d",
sd->subchannel_list_->tracer()->name(), sd->subchannel_list_->policy(),
sd->subchannel_list_, sd->Index(),
sd->subchannel_list_->num_subchannels(), sd->subchannel_,
grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe_),
grpc_error_string(error), sd->subchannel_list_->shutting_down());
}
// If shutting down, unref subchannel and stop watching.
if (sd->subchannel_list_->shutting_down() || error == GRPC_ERROR_CANCELLED) {
sd->UnrefSubchannelLocked("connectivity_shutdown");
sd->StopConnectivityWatchLocked();
return;
if (pending_watcher_ != nullptr) {
subchannel_->CancelConnectivityStateWatch(
subchannel_list_->health_check_service_name(), pending_watcher_);
pending_watcher_ = nullptr;
}
// Get or release ref to connected subchannel.
if (!sd->UpdateConnectedSubchannelLocked()) {
// We don't want to report this connectivity state, so renew the watch.
sd->RenewConnectivityWatchLocked();
return;
}
// Call the subclass's ProcessConnectivityChangeLocked() method.
sd->ProcessConnectivityChangeLocked(sd->pending_connectivity_state_unsafe_);
}
template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
// If there's a pending notification for this subchannel, cancel it;
// the callback is responsible for unreffing the subchannel.
// Otherwise, unref the subchannel directly.
if (connectivity_notification_pending_) {
CancelConnectivityWatchLocked("shutdown");
} else if (subchannel_ != nullptr) {
UnrefSubchannelLocked("shutdown");
}
if (pending_watcher_ != nullptr) CancelConnectivityWatchLocked("shutdown");
UnrefSubchannelLocked("shutdown");
}
//
@ -496,14 +463,25 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
tracer_->name(), policy, this, addresses.size());
}
subchannels_.reserve(addresses.size());
// Find health check service name.
const bool inhibit_health_checking = grpc_channel_arg_get_bool(
grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
if (!inhibit_health_checking) {
const char* health_check_service_name = grpc_channel_arg_get_string(
grpc_channel_args_find(&args, "grpc.temp.health_check"));
if (health_check_service_name != nullptr) {
health_check_service_name_.reset(gpr_strdup(health_check_service_name));
}
}
// We need to remove the LB addresses in order to be able to compare the
// subchannel keys of subchannels from a different batch of addresses.
// We also remove the inhibit-health-checking arg, since we are
// We also remove the health-checking-related args, since we are
// handling that here.
inhibit_health_checking_ = grpc_channel_arg_get_bool(
grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
GRPC_ARG_INHIBIT_HEALTH_CHECKING};
// We remove the service config, since it will be passed into the
// subchannel via call context.
static const char* keys_to_remove[] = {
GRPC_ARG_SUBCHANNEL_ADDRESS, "grpc.temp.health_check",
GRPC_ARG_INHIBIT_HEALTH_CHECKING, GRPC_ARG_SERVICE_CONFIG};
// Create a subchannel for each address.
for (size_t i = 0; i < addresses.size(); i++) {
// TODO(roth): we should ideally hide this from the LB policy code. In
@ -549,7 +527,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
address_uri);
gpr_free(address_uri);
}
subchannels_.emplace_back(this, addresses[i], subchannel, combiner);
subchannels_.emplace_back(this, addresses[i], subchannel);
}
}

@ -536,7 +536,7 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
if (process_resolver_result_ != nullptr) {
grpc_error* service_config_error = GRPC_ERROR_NONE;
service_config_changed = process_resolver_result_(
process_resolver_result_user_data_, result, &lb_policy_name,
process_resolver_result_user_data_, &result, &lb_policy_name,
&lb_policy_config, &service_config_error);
if (service_config_error != GRPC_ERROR_NONE) {
service_config_error_string =

@ -69,8 +69,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
// empty, it means that we don't have a valid service config to use, and we
// should set the channel to be in TRANSIENT_FAILURE.
typedef bool (*ProcessResolverResultCallback)(
void* user_data, const Resolver::Result& result,
const char** lb_policy_name,
void* user_data, Resolver::Result* result, const char** lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
grpc_error** service_config_error);
// If error is set when this returns, then construction failed, and

@ -303,8 +303,7 @@ void SubchannelCall::IncrementRefCount(const grpc_core::DebugLocation& location,
// Subchannel::ConnectedSubchannelStateWatcher
//
class Subchannel::ConnectedSubchannelStateWatcher
: public InternallyRefCounted<ConnectedSubchannelStateWatcher> {
class Subchannel::ConnectedSubchannelStateWatcher {
public:
// Must be instantiated while holding c->mu.
explicit ConnectedSubchannelStateWatcher(Subchannel* c) : subchannel_(c) {
@ -312,38 +311,17 @@ class Subchannel::ConnectedSubchannelStateWatcher
GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher");
GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting");
// Start watching for connectivity state changes.
// Callback uses initial ref to this.
GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChanged, this,
grpc_schedule_on_exec_ctx);
c->connected_subchannel_->NotifyOnStateChange(c->pollset_set_,
&pending_connectivity_state_,
&on_connectivity_changed_);
// Start health check if needed.
grpc_connectivity_state health_state = GRPC_CHANNEL_READY;
if (c->health_check_service_name_ != nullptr) {
health_check_client_ = MakeOrphanable<HealthCheckClient>(
c->health_check_service_name_.get(), c->connected_subchannel_,
c->pollset_set_, c->channelz_node_);
GRPC_CLOSURE_INIT(&on_health_changed_, OnHealthChanged, this,
grpc_schedule_on_exec_ctx);
Ref().release(); // Ref for health callback tracked manually.
health_check_client_->NotifyOnHealthChange(&health_state_,
&on_health_changed_);
health_state = GRPC_CHANNEL_CONNECTING;
}
// Report initial state.
c->SetConnectivityStateLocked(GRPC_CHANNEL_READY, "subchannel_connected");
grpc_connectivity_state_set(&c->state_and_health_tracker_, health_state,
"subchannel_connected");
}
~ConnectedSubchannelStateWatcher() {
GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "state_watcher");
}
// Must be called while holding subchannel_->mu.
void Orphan() override { health_check_client_.reset(); }
private:
static void OnConnectivityChanged(void* arg, grpc_error* error) {
auto* self = static_cast<ConnectedSubchannelStateWatcher*>(arg);
@ -363,20 +341,10 @@ class Subchannel::ConnectedSubchannelStateWatcher
self->pending_connectivity_state_));
}
c->connected_subchannel_.reset();
c->connected_subchannel_watcher_.reset();
self->last_connectivity_state_ = GRPC_CHANNEL_TRANSIENT_FAILURE;
c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
"reflect_child");
grpc_connectivity_state_set(&c->state_and_health_tracker_,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"reflect_child");
c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE);
c->backoff_begun_ = false;
c->backoff_.Reset();
c->MaybeStartConnectingLocked();
} else {
self->last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN;
}
self->health_check_client_.reset();
break;
}
default: {
@ -384,96 +352,246 @@ class Subchannel::ConnectedSubchannelStateWatcher
// a callback for READY, because that was the state we started
// this watch from. And a connected subchannel should never go
// from READY to CONNECTING or IDLE.
self->last_connectivity_state_ = self->pending_connectivity_state_;
c->SetConnectivityStateLocked(self->pending_connectivity_state_,
"reflect_child");
if (self->pending_connectivity_state_ != GRPC_CHANNEL_READY) {
grpc_connectivity_state_set(&c->state_and_health_tracker_,
self->pending_connectivity_state_,
"reflect_child");
}
c->SetConnectivityStateLocked(self->pending_connectivity_state_);
c->connected_subchannel_->NotifyOnStateChange(
nullptr, &self->pending_connectivity_state_,
&self->on_connectivity_changed_);
self = nullptr; // So we don't unref below.
return; // So we don't delete ourself below.
}
}
}
// Don't unref until we've released the lock, because this might
// Don't delete until we've released the lock, because this might
// cause the subchannel (which contains the lock) to be destroyed.
if (self != nullptr) self->Unref();
Delete(self);
}
Subchannel* subchannel_;
grpc_closure on_connectivity_changed_;
grpc_connectivity_state pending_connectivity_state_ = GRPC_CHANNEL_READY;
};
//
// Subchannel::ConnectivityStateWatcherList
//
void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked(
UniquePtr<ConnectivityStateWatcher> watcher) {
watcher->next_ = head_;
head_ = watcher.release();
}
void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
ConnectivityStateWatcher* watcher) {
for (ConnectivityStateWatcher** w = &head_; *w != nullptr; w = &(*w)->next_) {
if (*w == watcher) {
*w = watcher->next_;
Delete(watcher);
return;
}
}
GPR_UNREACHABLE_CODE(return );
}
void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
Subchannel* subchannel, grpc_connectivity_state state) {
for (ConnectivityStateWatcher* w = head_; w != nullptr; w = w->next_) {
RefCountedPtr<ConnectedSubchannel> connected_subchannel;
if (state == GRPC_CHANNEL_READY) {
connected_subchannel = subchannel->connected_subchannel_;
}
// TODO(roth): In principle, it seems wrong to send this notification
// to the watcher while holding the subchannel's mutex, since it could
// lead to a deadlock if the watcher calls back into the subchannel
// before returning back to us. In practice, this doesn't happen,
// because the LB policy code that watches subchannels always bounces
// the notification into the client_channel control-plane combiner
// before processing it. But if we ever have any other callers here,
// we will probably need to change this.
w->OnConnectivityStateChange(state, std::move(connected_subchannel));
}
}
void Subchannel::ConnectivityStateWatcherList::Clear() {
while (head_ != nullptr) {
ConnectivityStateWatcher* next = head_->next_;
Delete(head_);
head_ = next;
}
}
//
// Subchannel::HealthWatcherMap::HealthWatcher
//
// State needed for tracking the connectivity state with a particular
// health check service name.
class Subchannel::HealthWatcherMap::HealthWatcher
: public InternallyRefCounted<HealthWatcher> {
public:
HealthWatcher(Subchannel* c, UniquePtr<char> health_check_service_name,
grpc_connectivity_state subchannel_state)
: subchannel_(c),
health_check_service_name_(std::move(health_check_service_name)),
state_(subchannel_state == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING
: subchannel_state) {
GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "health_watcher");
GRPC_CLOSURE_INIT(&on_health_changed_, OnHealthChanged, this,
grpc_schedule_on_exec_ctx);
// If the subchannel is already connected, start health checking.
if (subchannel_state == GRPC_CHANNEL_READY) StartHealthCheckingLocked();
}
~HealthWatcher() {
GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "health_watcher");
}
const char* health_check_service_name() const {
return health_check_service_name_.get();
}
grpc_connectivity_state state() const { return state_; }
void AddWatcherLocked(grpc_connectivity_state initial_state,
UniquePtr<ConnectivityStateWatcher> watcher) {
if (state_ != initial_state) {
RefCountedPtr<ConnectedSubchannel> connected_subchannel;
if (state_ == GRPC_CHANNEL_READY) {
connected_subchannel = subchannel_->connected_subchannel_;
}
watcher->OnConnectivityStateChange(state_,
std::move(connected_subchannel));
}
watcher_list_.AddWatcherLocked(std::move(watcher));
}
void RemoveWatcherLocked(ConnectivityStateWatcher* watcher) {
watcher_list_.RemoveWatcherLocked(watcher);
}
bool HasWatchers() const { return !watcher_list_.empty(); }
void NotifyLocked(grpc_connectivity_state state) {
if (state == GRPC_CHANNEL_READY) {
// If we had not already notified for CONNECTING state, do so now.
// (We may have missed this earlier, because if the transition
// from IDLE to CONNECTING to READY was too quick, the connected
// subchannel may not have sent us a notification for CONNECTING.)
if (state_ != GRPC_CHANNEL_CONNECTING) {
state_ = GRPC_CHANNEL_CONNECTING;
watcher_list_.NotifyLocked(subchannel_, state_);
}
// If we've become connected, start health checking.
StartHealthCheckingLocked();
} else {
state_ = state;
watcher_list_.NotifyLocked(subchannel_, state_);
// We're not connected, so stop health checking.
health_check_client_.reset();
}
}
void Orphan() override {
watcher_list_.Clear();
health_check_client_.reset();
Unref();
}
private:
void StartHealthCheckingLocked() {
GPR_ASSERT(health_check_client_ == nullptr);
health_check_client_ = MakeOrphanable<HealthCheckClient>(
health_check_service_name_.get(), subchannel_->connected_subchannel_,
subchannel_->pollset_set_, subchannel_->channelz_node_);
Ref().release(); // Ref for health callback tracked manually.
health_check_client_->NotifyOnHealthChange(&state_, &on_health_changed_);
}
static void OnHealthChanged(void* arg, grpc_error* error) {
auto* self = static_cast<ConnectedSubchannelStateWatcher*>(arg);
auto* self = static_cast<HealthWatcher*>(arg);
Subchannel* c = self->subchannel_;
{
MutexLock lock(&c->mu_);
if (self->health_state_ != GRPC_CHANNEL_SHUTDOWN &&
if (self->state_ != GRPC_CHANNEL_SHUTDOWN &&
self->health_check_client_ != nullptr) {
if (self->last_connectivity_state_ == GRPC_CHANNEL_READY) {
grpc_connectivity_state_set(&c->state_and_health_tracker_,
self->health_state_, "health_changed");
}
self->watcher_list_.NotifyLocked(c, self->state_);
// Renew watch.
self->health_check_client_->NotifyOnHealthChange(
&self->health_state_, &self->on_health_changed_);
self = nullptr; // So we don't unref below.
&self->state_, &self->on_health_changed_);
return; // So we don't unref below.
}
}
// Don't unref until we've released the lock, because this might
// cause the subchannel (which contains the lock) to be destroyed.
if (self != nullptr) self->Unref();
self->Unref();
}
Subchannel* subchannel_;
grpc_closure on_connectivity_changed_;
grpc_connectivity_state pending_connectivity_state_ = GRPC_CHANNEL_READY;
grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_READY;
UniquePtr<char> health_check_service_name_;
OrphanablePtr<HealthCheckClient> health_check_client_;
grpc_closure on_health_changed_;
grpc_connectivity_state health_state_ = GRPC_CHANNEL_CONNECTING;
grpc_connectivity_state state_;
ConnectivityStateWatcherList watcher_list_;
};
//
// Subchannel::ExternalStateWatcher
// Subchannel::HealthWatcherMap
//
struct Subchannel::ExternalStateWatcher {
ExternalStateWatcher(Subchannel* subchannel, grpc_pollset_set* pollset_set,
grpc_closure* notify)
: subchannel(subchannel), pollset_set(pollset_set), notify(notify) {
GRPC_SUBCHANNEL_WEAK_REF(subchannel, "external_state_watcher+init");
GRPC_CLOSURE_INIT(&on_state_changed, OnStateChanged, this,
grpc_schedule_on_exec_ctx);
void Subchannel::HealthWatcherMap::AddWatcherLocked(
Subchannel* subchannel, grpc_connectivity_state initial_state,
UniquePtr<char> health_check_service_name,
UniquePtr<ConnectivityStateWatcher> watcher) {
// If the health check service name is not already present in the map,
// add it.
auto it = map_.find(health_check_service_name.get());
HealthWatcher* health_watcher;
if (it == map_.end()) {
const char* key = health_check_service_name.get();
auto w = MakeOrphanable<HealthWatcher>(
subchannel, std::move(health_check_service_name), subchannel->state_);
health_watcher = w.get();
map_[key] = std::move(w);
} else {
health_watcher = it->second.get();
}
// Add the watcher to the entry.
health_watcher->AddWatcherLocked(initial_state, std::move(watcher));
}
void Subchannel::HealthWatcherMap::RemoveWatcherLocked(
const char* health_check_service_name, ConnectivityStateWatcher* watcher) {
auto it = map_.find(health_check_service_name);
GPR_ASSERT(it != map_.end());
it->second->RemoveWatcherLocked(watcher);
// If we just removed the last watcher for this service name, remove
// the map entry.
if (!it->second->HasWatchers()) map_.erase(it);
}
void Subchannel::HealthWatcherMap::NotifyLocked(grpc_connectivity_state state) {
for (const auto& p : map_) {
p.second->NotifyLocked(state);
}
}
static void OnStateChanged(void* arg, grpc_error* error) {
ExternalStateWatcher* w = static_cast<ExternalStateWatcher*>(arg);
grpc_closure* follow_up = w->notify;
if (w->pollset_set != nullptr) {
grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set_,
w->pollset_set);
}
{
MutexLock lock(&w->subchannel->mu_);
if (w->subchannel->external_state_watcher_list_ == w) {
w->subchannel->external_state_watcher_list_ = w->next;
}
if (w->next != nullptr) w->next->prev = w->prev;
if (w->prev != nullptr) w->prev->next = w->next;
}
GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher+done");
Delete(w);
GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
grpc_connectivity_state
Subchannel::HealthWatcherMap::CheckConnectivityStateLocked(
Subchannel* subchannel, const char* health_check_service_name) {
auto it = map_.find(health_check_service_name);
if (it == map_.end()) {
// If the health check service name is not found in the map, we're
// not currently doing a health check for that service name. If the
// subchannel's state without health checking is READY, report
// CONNECTING, since that's what we'd be in as soon as we do start a
// watch. Otherwise, report the channel's state without health checking.
return subchannel->state_ == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING
: subchannel->state_;
}
HealthWatcher* health_watcher = it->second.get();
return health_watcher->state();
}
Subchannel* subchannel;
grpc_pollset_set* pollset_set;
grpc_closure* notify;
grpc_closure on_state_changed;
ExternalStateWatcher* next = nullptr;
ExternalStateWatcher* prev = nullptr;
};
void Subchannel::HealthWatcherMap::ShutdownLocked() { map_.clear(); }
//
// Subchannel
@ -560,13 +678,6 @@ Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector,
if (new_args != nullptr) grpc_channel_args_destroy(new_args);
GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this,
grpc_schedule_on_exec_ctx);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
"subchannel");
grpc_connectivity_state_init(&state_and_health_tracker_, GRPC_CHANNEL_IDLE,
"subchannel");
health_check_service_name_ =
UniquePtr<char>(gpr_strdup(grpc_channel_arg_get_string(
grpc_channel_args_find(args_, "grpc.temp.health_check"))));
const grpc_arg* arg = grpc_channel_args_find(args_, GRPC_ARG_ENABLE_CHANNELZ);
const bool channelz_enabled =
grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT);
@ -593,8 +704,6 @@ Subchannel::~Subchannel() {
channelz_node_->MarkSubchannelDestroyed();
}
grpc_channel_args_destroy(args_);
grpc_connectivity_state_destroy(&state_tracker_);
grpc_connectivity_state_destroy(&state_and_health_tracker_);
grpc_connector_unref(connector_);
grpc_pollset_set_destroy(pollset_set_);
Delete(key_);
@ -698,55 +807,67 @@ const char* Subchannel::GetTargetAddress() {
return addr_str;
}
RefCountedPtr<ConnectedSubchannel> Subchannel::connected_subchannel() {
MutexLock lock(&mu_);
return connected_subchannel_;
}
channelz::SubchannelNode* Subchannel::channelz_node() {
return channelz_node_.get();
}
grpc_connectivity_state Subchannel::CheckConnectivity(
bool inhibit_health_checking) {
grpc_connectivity_state_tracker* tracker =
inhibit_health_checking ? &state_tracker_ : &state_and_health_tracker_;
grpc_connectivity_state state = grpc_connectivity_state_check(tracker);
grpc_connectivity_state Subchannel::CheckConnectivityState(
const char* health_check_service_name,
RefCountedPtr<ConnectedSubchannel>* connected_subchannel) {
MutexLock lock(&mu_);
grpc_connectivity_state state;
if (health_check_service_name == nullptr) {
state = state_;
} else {
state = health_watcher_map_.CheckConnectivityStateLocked(
this, health_check_service_name);
}
if (connected_subchannel != nullptr && state == GRPC_CHANNEL_READY) {
*connected_subchannel = connected_subchannel_;
}
return state;
}
void Subchannel::NotifyOnStateChange(grpc_pollset_set* interested_parties,
grpc_connectivity_state* state,
grpc_closure* notify,
bool inhibit_health_checking) {
grpc_connectivity_state_tracker* tracker =
inhibit_health_checking ? &state_tracker_ : &state_and_health_tracker_;
ExternalStateWatcher* w;
if (state == nullptr) {
MutexLock lock(&mu_);
for (w = external_state_watcher_list_; w != nullptr; w = w->next) {
if (w->notify == notify) {
grpc_connectivity_state_notify_on_state_change(tracker, nullptr,
&w->on_state_changed);
}
void Subchannel::WatchConnectivityState(
grpc_connectivity_state initial_state,
UniquePtr<char> health_check_service_name,
UniquePtr<ConnectivityStateWatcher> watcher) {
MutexLock lock(&mu_);
grpc_pollset_set* interested_parties = watcher->interested_parties();
if (interested_parties != nullptr) {
grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
}
if (health_check_service_name == nullptr) {
if (state_ != initial_state) {
watcher->OnConnectivityStateChange(state_, connected_subchannel_);
}
watcher_list_.AddWatcherLocked(std::move(watcher));
} else {
w = New<ExternalStateWatcher>(this, interested_parties, notify);
if (interested_parties != nullptr) {
grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
}
MutexLock lock(&mu_);
if (external_state_watcher_list_ != nullptr) {
w->next = external_state_watcher_list_;
w->next->prev = w;
}
external_state_watcher_list_ = w;
grpc_connectivity_state_notify_on_state_change(tracker, state,
&w->on_state_changed);
MaybeStartConnectingLocked();
health_watcher_map_.AddWatcherLocked(this, initial_state,
std::move(health_check_service_name),
std::move(watcher));
}
}
void Subchannel::CancelConnectivityStateWatch(
const char* health_check_service_name, ConnectivityStateWatcher* watcher) {
MutexLock lock(&mu_);
grpc_pollset_set* interested_parties = watcher->interested_parties();
if (interested_parties != nullptr) {
grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties);
}
if (health_check_service_name == nullptr) {
watcher_list_.RemoveWatcherLocked(watcher);
} else {
health_watcher_map_.RemoveWatcherLocked(health_check_service_name, watcher);
}
}
void Subchannel::AttemptToConnect() {
MutexLock lock(&mu_);
MaybeStartConnectingLocked();
}
void Subchannel::ResetBackoff() {
MutexLock lock(&mu_);
backoff_.Reset();
@ -818,15 +939,19 @@ const char* SubchannelConnectivityStateChangeString(
} // namespace
void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
const char* reason) {
// Note: Must be called with a state that is different from the current state.
void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state) {
state_ = state;
if (channelz_node_ != nullptr) {
channelz_node_->AddTraceEvent(
channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string(
SubchannelConnectivityStateChangeString(state)));
}
grpc_connectivity_state_set(&state_tracker_, state, reason);
// Notify non-health watchers.
watcher_list_.NotifyLocked(this, state);
// Notify health watchers.
health_watcher_map_.NotifyLocked(state);
}
void Subchannel::MaybeStartConnectingLocked() {
@ -842,11 +967,6 @@ void Subchannel::MaybeStartConnectingLocked() {
// Already connected: don't restart.
return;
}
if (!grpc_connectivity_state_has_watchers(&state_tracker_) &&
!grpc_connectivity_state_has_watchers(&state_and_health_tracker_)) {
// Nobody is interested in connecting: so don't just yet.
return;
}
connecting_ = true;
GRPC_SUBCHANNEL_WEAK_REF(this, "connecting");
if (!backoff_begun_) {
@ -903,9 +1023,7 @@ void Subchannel::ContinueConnectingLocked() {
next_attempt_deadline_ = backoff_.NextAttemptTime();
args.deadline = std::max(next_attempt_deadline_, min_deadline);
args.channel_args = args_;
SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, "connecting");
grpc_connectivity_state_set(&state_and_health_tracker_,
GRPC_CHANNEL_CONNECTING, "connecting");
SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING);
grpc_connector_connect(connector_, &args, &connecting_result_,
&on_connecting_finished_);
}
@ -924,12 +1042,7 @@ void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) {
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
} else {
gpr_log(GPR_INFO, "Connect failed: %s", grpc_error_string(error));
c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
"connect_failed");
grpc_connectivity_state_set(&c->state_and_health_tracker_,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"connect_failed");
c->MaybeStartConnectingLocked();
c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE);
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
}
}
@ -982,8 +1095,9 @@ bool Subchannel::PublishTransportLocked() {
gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
connected_subchannel_.get(), this);
// Instantiate state watcher. Will clean itself up.
connected_subchannel_watcher_ =
MakeOrphanable<ConnectedSubchannelStateWatcher>(this);
New<ConnectedSubchannelStateWatcher>(this);
// Report initial state.
SetConnectivityStateLocked(GRPC_CHANNEL_READY);
return true;
}
@ -1000,7 +1114,7 @@ void Subchannel::Disconnect() {
grpc_connector_shutdown(connector_, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Subchannel disconnected"));
connected_subchannel_.reset();
connected_subchannel_watcher_.reset();
health_watcher_map_.ShutdownLocked();
}
gpr_atm Subchannel::RefMutate(

@ -27,6 +27,7 @@
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gprpp/arena.h"
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@ -77,7 +78,7 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
grpc_millis deadline;
Arena* arena;
grpc_call_context_element* context;
grpc_core::CallCombiner* call_combiner;
CallCombiner* call_combiner;
size_t parent_data_size;
};
@ -175,7 +176,38 @@ class SubchannelCall {
// A subchannel that knows how to connect to exactly one target address. It
// provides a target for load balancing.
class Subchannel {
private:
class ConnectivityStateWatcherList; // Forward declaration.
public:
class ConnectivityStateWatcher {
public:
virtual ~ConnectivityStateWatcher() = default;
// Will be invoked whenever the subchannel's connectivity state
// changes. There will be only one invocation of this method on a
// given watcher instance at any given time.
//
// When the state changes to READY, connected_subchannel will
// contain a ref to the connected subchannel. When it changes from
// READY to some other state, the implementation must release its
// ref to the connected subchannel.
virtual void OnConnectivityStateChange(
grpc_connectivity_state new_state,
RefCountedPtr<ConnectedSubchannel> connected_subchannel) // NOLINT
GRPC_ABSTRACT;
virtual grpc_pollset_set* interested_parties() GRPC_ABSTRACT;
GRPC_ABSTRACT_BASE_CLASS
private:
// For access to next_.
friend class Subchannel::ConnectivityStateWatcherList;
ConnectivityStateWatcher* next_ = nullptr;
};
// The ctor and dtor are not intended to use directly.
Subchannel(SubchannelKey* key, grpc_connector* connector,
const grpc_channel_args* args);
@ -201,20 +233,36 @@ class Subchannel {
// Caller doesn't take ownership.
const char* GetTargetAddress();
// Gets the connected subchannel - or nullptr if not connected (which may
// happen before it initially connects or during transient failures).
RefCountedPtr<ConnectedSubchannel> connected_subchannel();
channelz::SubchannelNode* channelz_node();
// Polls the current connectivity state of the subchannel.
grpc_connectivity_state CheckConnectivity(bool inhibit_health_checking);
// When the connectivity state of the subchannel changes from \a *state,
// invokes \a notify and updates \a *state with the new state.
void NotifyOnStateChange(grpc_pollset_set* interested_parties,
grpc_connectivity_state* state, grpc_closure* notify,
bool inhibit_health_checking);
// Returns the current connectivity state of the subchannel.
// If health_check_service_name is non-null, the returned connectivity
// state will be based on the state reported by the backend for that
// service name.
// If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel.
grpc_connectivity_state CheckConnectivityState(
const char* health_check_service_name,
RefCountedPtr<ConnectedSubchannel>* connected_subchannel);
// Starts watching the subchannel's connectivity state.
// The first callback to the watcher will be delivered when the
// subchannel's connectivity state becomes a value other than
// initial_state, which may happen immediately.
// Subsequent callbacks will be delivered as the subchannel's state
// changes.
// The watcher will be destroyed either when the subchannel is
// destroyed or when CancelConnectivityStateWatch() is called.
void WatchConnectivityState(grpc_connectivity_state initial_state,
UniquePtr<char> health_check_service_name,
UniquePtr<ConnectivityStateWatcher> watcher);
// Cancels a connectivity state watch.
// If the watcher has already been destroyed, this is a no-op.
void CancelConnectivityStateWatch(const char* health_check_service_name,
ConnectivityStateWatcher* watcher);
// Attempt to connect to the backend. Has no effect if already connected.
void AttemptToConnect();
// Resets the connection backoff of the subchannel.
// TODO(roth): Move connection backoff out of subchannels and up into LB
@ -236,12 +284,62 @@ class Subchannel {
grpc_resolved_address* addr);
private:
struct ExternalStateWatcher;
// A linked list of ConnectivityStateWatchers that are monitoring the
// subchannel's state.
class ConnectivityStateWatcherList {
public:
~ConnectivityStateWatcherList() { Clear(); }
void AddWatcherLocked(UniquePtr<ConnectivityStateWatcher> watcher);
void RemoveWatcherLocked(ConnectivityStateWatcher* watcher);
// Notifies all watchers in the list about a change to state.
void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state);
void Clear();
bool empty() const { return head_ == nullptr; }
private:
ConnectivityStateWatcher* head_ = nullptr;
};
// A map that tracks ConnectivityStateWatchers using a particular health
// check service name.
//
// There is one entry in the map for each health check service name.
// Entries exist only as long as there are watchers using the
// corresponding service name.
//
// A health check client is maintained only while the subchannel is in
// state READY.
class HealthWatcherMap {
public:
void AddWatcherLocked(Subchannel* subchannel,
grpc_connectivity_state initial_state,
UniquePtr<char> health_check_service_name,
UniquePtr<ConnectivityStateWatcher> watcher);
void RemoveWatcherLocked(const char* health_check_service_name,
ConnectivityStateWatcher* watcher);
// Notifies the watcher when the subchannel's state changes.
void NotifyLocked(grpc_connectivity_state state);
grpc_connectivity_state CheckConnectivityStateLocked(
Subchannel* subchannel, const char* health_check_service_name);
void ShutdownLocked();
private:
class HealthWatcher;
Map<const char*, OrphanablePtr<HealthWatcher>, StringLess> map_;
};
class ConnectedSubchannelStateWatcher;
// Sets the subchannel's connectivity state to \a state.
void SetConnectivityStateLocked(grpc_connectivity_state state,
const char* reason);
void SetConnectivityStateLocked(grpc_connectivity_state state);
// Methods for connection.
void MaybeStartConnectingLocked();
@ -279,15 +377,15 @@ class Subchannel {
grpc_closure on_connecting_finished_;
// Active connection, or null.
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
OrphanablePtr<ConnectedSubchannelStateWatcher> connected_subchannel_watcher_;
bool connecting_ = false;
bool disconnected_ = false;
// Connectivity state tracking.
grpc_connectivity_state_tracker state_tracker_;
grpc_connectivity_state_tracker state_and_health_tracker_;
UniquePtr<char> health_check_service_name_;
ExternalStateWatcher* external_state_watcher_list_ = nullptr;
grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
// The list of watchers without a health check service name.
ConnectivityStateWatcherList watcher_list_;
// The map of watchers with health check service names.
HealthWatcherMap health_watcher_map_;
// Backoff state.
BackOff backoff_;

@ -1019,8 +1019,8 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
auto channel = BuildChannel("round_robin");
auto stub = BuildStub(channel);
std::vector<int> ports;
// Start with a single server.
gpr_log(GPR_INFO, "*** FIRST BACKEND ***");
ports.emplace_back(servers_[0]->port_);
SetNextResolution(ports);
WaitForServer(stub, 0, DEBUG_LOCATION);
@ -1030,36 +1030,33 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count());
servers_[0]->service_.ResetCounters();
// And now for the second server.
gpr_log(GPR_INFO, "*** SECOND BACKEND ***");
ports.clear();
ports.emplace_back(servers_[1]->port_);
SetNextResolution(ports);
// Wait until update has been processed, as signaled by the second backend
// receiving a request.
EXPECT_EQ(0, servers_[1]->service_.request_count());
WaitForServer(stub, 1, DEBUG_LOCATION);
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(0, servers_[0]->service_.request_count());
EXPECT_EQ(10, servers_[1]->service_.request_count());
EXPECT_EQ(0, servers_[2]->service_.request_count());
servers_[1]->service_.ResetCounters();
// ... and for the last server.
gpr_log(GPR_INFO, "*** THIRD BACKEND ***");
ports.clear();
ports.emplace_back(servers_[2]->port_);
SetNextResolution(ports);
WaitForServer(stub, 2, DEBUG_LOCATION);
for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(0, servers_[0]->service_.request_count());
EXPECT_EQ(0, servers_[1]->service_.request_count());
EXPECT_EQ(10, servers_[2]->service_.request_count());
servers_[2]->service_.ResetCounters();
// Back to all servers.
gpr_log(GPR_INFO, "*** ALL BACKENDS ***");
ports.clear();
ports.emplace_back(servers_[0]->port_);
ports.emplace_back(servers_[1]->port_);
@ -1068,14 +1065,13 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
WaitForServer(stub, 0, DEBUG_LOCATION);
WaitForServer(stub, 1, DEBUG_LOCATION);
WaitForServer(stub, 2, DEBUG_LOCATION);
// Send three RPCs, one per server.
for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(1, servers_[0]->service_.request_count());
EXPECT_EQ(1, servers_[1]->service_.request_count());
EXPECT_EQ(1, servers_[2]->service_.request_count());
// An empty update will result in the channel going into TRANSIENT_FAILURE.
gpr_log(GPR_INFO, "*** NO BACKENDS ***");
ports.clear();
SetNextResolution(ports);
grpc_connectivity_state channel_state;
@ -1084,15 +1080,14 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
} while (channel_state == GRPC_CHANNEL_READY);
ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
servers_[0]->service_.ResetCounters();
// Next update introduces servers_[1], making the channel recover.
gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***");
ports.clear();
ports.emplace_back(servers_[1]->port_);
SetNextResolution(ports);
WaitForServer(stub, 1, DEBUG_LOCATION);
channel_state = channel->GetState(false /* try to connect */);
ASSERT_EQ(channel_state, GRPC_CHANNEL_READY);
// Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
}
@ -1211,8 +1206,9 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
auto channel = BuildChannel("round_robin");
auto stub = BuildStub(channel);
SetNextResolution(ports);
for (size_t i = 0; i < kNumServers; ++i)
for (size_t i = 0; i < kNumServers; ++i) {
WaitForServer(stub, i, DEBUG_LOCATION);
}
for (size_t i = 0; i < servers_.size(); ++i) {
CheckRpcSendOk(stub, DEBUG_LOCATION);
EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i;
@ -1236,7 +1232,6 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
// No requests have gone to the deceased server.
EXPECT_EQ(pre_death, post_death);
// Bring the first server back up.
servers_[0].reset(new ServerData(ports[0]));
StartServer(0);
// Requests should start arriving at the first server either right away (if
// the server managed to start before the RR policy retried the subchannel) or
@ -1360,6 +1355,52 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) {
// Second channel should be READY.
EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
CheckRpcSendOk(stub2, DEBUG_LOCATION);
// Enable health checks on the backend and wait for channel 1 to succeed.
servers_[0]->SetServingStatus("health_check_service_name", true);
CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */);
// Check that we created only one subchannel to the backend.
EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
// Clean up.
EnableDefaultHealthCheckService(false);
}
TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) {
EnableDefaultHealthCheckService(true);
// Start server.
const int kNumServers = 1;
StartServers(kNumServers);
// Create a channel with health-checking enabled.
ChannelArguments args;
args.SetServiceConfigJSON(
"{\"healthCheckConfig\": "
"{\"serviceName\": \"health_check_service_name\"}}");
auto channel1 = BuildChannel("round_robin", args);
auto stub1 = BuildStub(channel1);
std::vector<int> ports = GetServersPorts();
SetNextResolution(ports);
// Create a channel with health-checking enabled with a different
// service name.
ChannelArguments args2;
args2.SetServiceConfigJSON(
"{\"healthCheckConfig\": "
"{\"serviceName\": \"health_check_service_name2\"}}");
auto channel2 = BuildChannel("round_robin", args2);
auto stub2 = BuildStub(channel2);
SetNextResolution(ports);
// Allow health checks from channel 2 to succeed.
servers_[0]->SetServingStatus("health_check_service_name2", true);
// First channel should not become READY, because health checks should be
// failing.
EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1));
CheckRpcSendFailure(stub1);
// Second channel should be READY.
EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1));
CheckRpcSendOk(stub2, DEBUG_LOCATION);
// Enable health checks for channel 1 and wait for it to succeed.
servers_[0]->SetServingStatus("health_check_service_name", true);
CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */);
// Check that we created only one subchannel to the backend.
EXPECT_EQ(1UL, servers_[0]->service_.clients().size());
// Clean up.
EnableDefaultHealthCheckService(false);
}

Loading…
Cancel
Save