diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index f774c985c30..757ac2a35e7 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -222,7 +222,7 @@ class ChannelData { ~ChannelData(); static bool ProcessResolverResultLocked( - void* arg, const Resolver::Result& result, const char** lb_policy_name, + void* arg, Resolver::Result* result, const char** lb_policy_name, RefCountedPtr* lb_policy_config, grpc_error** service_config_error); @@ -271,7 +271,6 @@ class ChannelData { OrphanablePtr resolving_lb_policy_; grpc_connectivity_state_tracker state_tracker_; ExternalConnectivityWatcher::WatcherList external_connectivity_watcher_list_; - UniquePtr health_check_service_name_; RefCountedPtr saved_service_config_; bool received_first_resolver_result_ = false; @@ -951,18 +950,10 @@ class ChannelData::ClientChannelControlHelper } Subchannel* CreateSubchannel(const grpc_channel_args& args) override { - grpc_arg args_to_add[2]; - int num_args_to_add = 0; - if (chand_->health_check_service_name_ != nullptr) { - args_to_add[0] = grpc_channel_arg_string_create( - const_cast("grpc.temp.health_check"), - const_cast(chand_->health_check_service_name_.get())); - num_args_to_add++; - } - args_to_add[num_args_to_add++] = SubchannelPoolInterface::CreateChannelArg( + grpc_arg arg = SubchannelPoolInterface::CreateChannelArg( chand_->subchannel_pool_.get()); grpc_channel_args* new_args = - grpc_channel_args_copy_and_add(&args, args_to_add, num_args_to_add); + grpc_channel_args_copy_and_add(&args, &arg, 1); Subchannel* subchannel = chand_->client_channel_factory_->CreateSubchannel(new_args); grpc_channel_args_destroy(new_args); @@ -1201,14 +1192,14 @@ void ChannelData::ProcessLbPolicy( // Synchronous callback from ResolvingLoadBalancingPolicy to process a // resolver result update. bool ChannelData::ProcessResolverResultLocked( - void* arg, const Resolver::Result& result, const char** lb_policy_name, + void* arg, Resolver::Result* result, const char** lb_policy_name, RefCountedPtr* lb_policy_config, grpc_error** service_config_error) { ChannelData* chand = static_cast(arg); RefCountedPtr service_config; // If resolver did not return a service config or returned an invalid service // config, we need a fallback service config. - if (result.service_config_error != GRPC_ERROR_NONE) { + if (result->service_config_error != GRPC_ERROR_NONE) { // If the service config was invalid, then fallback to the saved service // config. If there is no saved config either, use the default service // config. @@ -1229,7 +1220,7 @@ bool ChannelData::ProcessResolverResultLocked( } service_config = chand->default_service_config_; } - } else if (result.service_config == nullptr) { + } else if (result->service_config == nullptr) { if (chand->default_service_config_ != nullptr) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, @@ -1240,15 +1231,15 @@ bool ChannelData::ProcessResolverResultLocked( service_config = chand->default_service_config_; } } else { - service_config = result.service_config; + service_config = result->service_config; } - *service_config_error = GRPC_ERROR_REF(result.service_config_error); + *service_config_error = GRPC_ERROR_REF(result->service_config_error); if (service_config == nullptr && - result.service_config_error != GRPC_ERROR_NONE) { + result->service_config_error != GRPC_ERROR_NONE) { return false; } - UniquePtr service_config_json; // Process service config. + UniquePtr service_config_json; const internal::ClientChannelGlobalParsedObject* parsed_service_config = nullptr; if (service_config != nullptr) { @@ -1257,6 +1248,20 @@ bool ChannelData::ProcessResolverResultLocked( service_config->GetParsedGlobalServiceConfigObject( internal::ClientChannelServiceConfigParser::ParserIndex())); } + // TODO(roth): Eliminate this hack as part of hiding health check + // service name from LB policy API. As part of this, change the API + // for this function to pass in result as a const reference. + if (parsed_service_config != nullptr && + parsed_service_config->health_check_service_name() != nullptr) { + grpc_arg new_arg = grpc_channel_arg_string_create( + const_cast("grpc.temp.health_check"), + const_cast(parsed_service_config->health_check_service_name())); + grpc_channel_args* new_args = + grpc_channel_args_copy_and_add(result->args, &new_arg, 1); + grpc_channel_args_destroy(result->args); + result->args = new_args; + } + // Check if the config has changed. const bool service_config_changed = ((service_config == nullptr) != (chand->saved_service_config_ == nullptr)) || @@ -1273,12 +1278,6 @@ bool ChannelData::ProcessResolverResultLocked( chand, service_config_json.get()); } chand->saved_service_config_ = std::move(service_config); - if (parsed_service_config != nullptr) { - chand->health_check_service_name_.reset( - gpr_strdup(parsed_service_config->health_check_service_name())); - } else { - chand->health_check_service_name_.reset(); - } } // We want to set the service config at least once. This should not really be // needed, but we are doing it as a defensive approach. This can be removed, @@ -1296,7 +1295,7 @@ bool ChannelData::ProcessResolverResultLocked( chand->saved_service_config_); } UniquePtr processed_lb_policy_name; - chand->ProcessLbPolicy(result, parsed_service_config, + chand->ProcessLbPolicy(*result, parsed_service_config, &processed_lb_policy_name, lb_policy_config); // Swap out the data used by GetChannelInfo(). { diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc index a7a47e9eb10..de61819ef54 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.cc +++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc @@ -127,7 +127,9 @@ void SubchannelNode::PopulateConnectivityState(grpc_json* json) { if (subchannel_ == nullptr) { state = GRPC_CHANNEL_SHUTDOWN; } else { - state = subchannel_->CheckConnectivity(true /* inhibit_health_checking */); + state = subchannel_->CheckConnectivityState( + nullptr /* health_check_service_name */, + nullptr /* connected_subchannel */); } json = grpc_json_create_child(nullptr, json, "state", nullptr, GRPC_JSON_OBJECT, false); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index bc2f6e5efd6..199e973e72c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -68,9 +68,8 @@ class PickFirst : public LoadBalancingPolicy { PickFirstSubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, Subchannel* subchannel, - grpc_combiner* combiner) - : SubchannelData(subchannel_list, address, subchannel, combiner) {} + const ServerAddress& address, Subchannel* subchannel) + : SubchannelData(subchannel_list, address, subchannel) {} void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state) override; @@ -312,6 +311,7 @@ void PickFirst::UpdateLocked(UpdateArgs args) { // here, since we've already checked the initial connectivity // state of all subchannels above. subchannel_list_->subchannel(0)->StartConnectivityWatchLocked(); + subchannel_list_->subchannel(0)->subchannel()->AttemptToConnect(); } } else { // We do have a selected subchannel (which means it's READY), so keep @@ -334,6 +334,9 @@ void PickFirst::UpdateLocked(UpdateArgs args) { // state of all subchannels above. latest_pending_subchannel_list_->subchannel(0) ->StartConnectivityWatchLocked(); + latest_pending_subchannel_list_->subchannel(0) + ->subchannel() + ->AttemptToConnect(); } } } @@ -366,7 +369,8 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( p->subchannel_list_.get()); } p->selected_ = nullptr; - StopConnectivityWatchLocked(); + CancelConnectivityWatchLocked( + "selected subchannel failed; switching to pending update"); p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); // Set our state to that of the pending subchannel list. if (p->subchannel_list_->in_transient_failure()) { @@ -391,7 +395,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( p->idle_ = true; p->channel_control_helper()->RequestReresolution(); p->selected_ = nullptr; - StopConnectivityWatchLocked(); + CancelConnectivityWatchLocked("selected subchannel failed; going IDLE"); p->channel_control_helper()->UpdateState( GRPC_CHANNEL_IDLE, UniquePtr(New(p->Ref()))); @@ -408,8 +412,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( connectivity_state, UniquePtr(New(p->Ref()))); } - // Renew notification. - RenewConnectivityWatchLocked(); } } return; @@ -426,13 +428,11 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( subchannel_list()->set_in_transient_failure(false); switch (connectivity_state) { case GRPC_CHANNEL_READY: { - // Renew notification. - RenewConnectivityWatchLocked(); ProcessUnselectedReadyLocked(); break; } case GRPC_CHANNEL_TRANSIENT_FAILURE: { - StopConnectivityWatchLocked(); + CancelConnectivityWatchLocked("connection attempt failed"); PickFirstSubchannelData* sd = this; size_t next_index = (sd->Index() + 1) % subchannel_list()->num_subchannels(); @@ -468,8 +468,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( GRPC_CHANNEL_CONNECTING, UniquePtr(New(p->Ref()))); } - // Renew notification. - RenewConnectivityWatchLocked(); break; } case GRPC_CHANNEL_SHUTDOWN: @@ -521,8 +519,11 @@ void PickFirst::PickFirstSubchannelData:: // If current state is READY, select the subchannel now, since we started // watching from this state and will not get a notification of it // transitioning into this state. - if (p->selected_ != this && current_state == GRPC_CHANNEL_READY) { - ProcessUnselectedReadyLocked(); + // If the current state is not READY, attempt to connect. + if (current_state == GRPC_CHANNEL_READY) { + if (p->selected_ != this) ProcessUnselectedReadyLocked(); + } else { + subchannel()->AttemptToConnect(); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 6d603913d82..1693032ea24 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -83,9 +83,8 @@ class RoundRobin : public LoadBalancingPolicy { RoundRobinSubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, Subchannel* subchannel, - grpc_combiner* combiner) - : SubchannelData(subchannel_list, address, subchannel, combiner) {} + const ServerAddress& address, Subchannel* subchannel) + : SubchannelData(subchannel_list, address, subchannel) {} grpc_connectivity_state connectivity_state() const { return last_connectivity_state_; @@ -320,6 +319,7 @@ void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { for (size_t i = 0; i < num_subchannels(); i++) { if (subchannel(i)->subchannel() != nullptr) { subchannel(i)->StartConnectivityWatchLocked(); + subchannel(i)->subchannel()->AttemptToConnect(); } } // Now set the LB policy's state based on the subchannels' states. @@ -448,6 +448,7 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE // when the subchannel list was created, we'd wind up in a constant // loop of re-resolution. + // Also attempt to reconnect. if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, @@ -456,9 +457,8 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( p, subchannel()); } p->channel_control_helper()->RequestReresolution(); + subchannel()->AttemptToConnect(); } - // Renew connectivity watch. - RenewConnectivityWatchLocked(); // Update state counters. UpdateConnectivityStateLocked(connectivity_state); // Update overall state and renew notification. diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 4c48045f978..93b4bbd369a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -98,15 +98,13 @@ class SubchannelData { // Synchronously checks the subchannel's connectivity state. // Must not be called while there is a connectivity notification - // pending (i.e., between calling StartConnectivityWatchLocked() or - // RenewConnectivityWatchLocked() and the resulting invocation of - // ProcessConnectivityChangeLocked()). + // pending (i.e., between calling StartConnectivityWatchLocked() and + // calling CancelConnectivityWatchLocked()). grpc_connectivity_state CheckConnectivityStateLocked() { - GPR_ASSERT(!connectivity_notification_pending_); - pending_connectivity_state_unsafe_ = subchannel()->CheckConnectivity( - subchannel_list_->inhibit_health_checking()); - UpdateConnectedSubchannelLocked(); - return pending_connectivity_state_unsafe_; + GPR_ASSERT(pending_watcher_ == nullptr); + connectivity_state_ = subchannel()->CheckConnectivityState( + subchannel_list_->health_check_service_name(), &connected_subchannel_); + return connectivity_state_; } // Resets the connection backoff. @@ -115,23 +113,11 @@ class SubchannelData { void ResetBackoffLocked(); // Starts watching the connectivity state of the subchannel. - // ProcessConnectivityChangeLocked() will be called when the + // ProcessConnectivityChangeLocked() will be called whenever the // connectivity state changes. void StartConnectivityWatchLocked(); - // Renews watching the connectivity state of the subchannel. - void RenewConnectivityWatchLocked(); - - // Stops watching the connectivity state of the subchannel. - void StopConnectivityWatchLocked(); - // Cancels watching the connectivity state of the subchannel. - // Must be called only while there is a connectivity notification - // pending (i.e., between calling StartConnectivityWatchLocked() or - // RenewConnectivityWatchLocked() and the resulting invocation of - // ProcessConnectivityChangeLocked()). - // From within ProcessConnectivityChangeLocked(), use - // StopConnectivityWatchLocked() instead. void CancelConnectivityWatchLocked(const char* reason); // Cancels any pending connectivity watch and unrefs the subchannel. @@ -142,44 +128,80 @@ class SubchannelData { protected: SubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, Subchannel* subchannel, - grpc_combiner* combiner); + const ServerAddress& address, Subchannel* subchannel); virtual ~SubchannelData(); - // After StartConnectivityWatchLocked() or RenewConnectivityWatchLocked() - // is called, this method will be invoked when the subchannel's connectivity - // state changes. - // Implementations must invoke either RenewConnectivityWatchLocked() or - // StopConnectivityWatchLocked() before returning. + // After StartConnectivityWatchLocked() is called, this method will be + // invoked whenever the subchannel's connectivity state changes. + // To stop watching, use CancelConnectivityWatchLocked(). virtual void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state) GRPC_ABSTRACT; - // Unrefs the subchannel. - void UnrefSubchannelLocked(const char* reason); - private: - // Updates connected_subchannel_ based on pending_connectivity_state_unsafe_. - // Returns true if the connectivity state should be reported. - bool UpdateConnectedSubchannelLocked(); + // Watcher for subchannel connectivity state. + class Watcher : public Subchannel::ConnectivityStateWatcher { + public: + Watcher( + SubchannelData* subchannel_data, + RefCountedPtr subchannel_list) + : subchannel_data_(subchannel_data), + subchannel_list_(std::move(subchannel_list)) {} + + ~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); } + + void OnConnectivityStateChange( + grpc_connectivity_state new_state, + RefCountedPtr connected_subchannel) override; + + grpc_pollset_set* interested_parties() override { + return subchannel_list_->policy()->interested_parties(); + } + + private: + // A fire-and-forget class that bounces into the combiner to process + // a connectivity state update. + class Updater { + public: + Updater( + SubchannelData* + subchannel_data, + RefCountedPtr> + subchannel_list, + grpc_connectivity_state state, + RefCountedPtr connected_subchannel); + + ~Updater() { + subchannel_list_.reset(DEBUG_LOCATION, "Watcher::Updater dtor"); + } + + private: + static void OnUpdateLocked(void* arg, grpc_error* error); + + SubchannelData* subchannel_data_; + RefCountedPtr> + subchannel_list_; + const grpc_connectivity_state state_; + RefCountedPtr connected_subchannel_; + grpc_closure closure_; + }; - static void OnConnectivityChangedLocked(void* arg, grpc_error* error); + SubchannelData* subchannel_data_; + RefCountedPtr subchannel_list_; + }; + + // Unrefs the subchannel. + void UnrefSubchannelLocked(const char* reason); // Backpointer to owning subchannel list. Not owned. SubchannelList* subchannel_list_; - - // The subchannel and connected subchannel. + // The subchannel. Subchannel* subchannel_; + // Will be non-null when the subchannel's state is being watched. + Subchannel::ConnectivityStateWatcher* pending_watcher_ = nullptr; + // Data updated by the watcher. + grpc_connectivity_state connectivity_state_; RefCountedPtr connected_subchannel_; - - // Notification that connectivity has changed on subchannel. - grpc_closure connectivity_changed_closure_; - // Is a connectivity notification pending? - bool connectivity_notification_pending_ = false; - // Connectivity state to be updated by - // grpc_subchannel_notify_on_state_change(), not guarded by - // the combiner. - grpc_connectivity_state pending_connectivity_state_unsafe_; }; // A list of subchannels. @@ -213,7 +235,9 @@ class SubchannelList : public InternallyRefCounted { // Accessors. LoadBalancingPolicy* policy() const { return policy_; } TraceFlag* tracer() const { return tracer_; } - bool inhibit_health_checking() const { return inhibit_health_checking_; } + const char* health_check_service_name() const { + return health_check_service_name_.get(); + } // Resets connection backoff of all subchannels. // TODO(roth): We will probably need to rethink this as part of moving @@ -251,7 +275,7 @@ class SubchannelList : public InternallyRefCounted { TraceFlag* tracer_; - bool inhibit_health_checking_; + UniquePtr health_check_service_name_; grpc_combiner* combiner_; @@ -268,6 +292,67 @@ class SubchannelList : public InternallyRefCounted { // implementation -- no user-servicable parts below // +// +// SubchannelData::Watcher +// + +template +void SubchannelData::Watcher:: + OnConnectivityStateChange( + grpc_connectivity_state new_state, + RefCountedPtr connected_subchannel) { + // Will delete itself. + New(subchannel_data_, + subchannel_list_->Ref(DEBUG_LOCATION, "Watcher::Updater"), + new_state, std::move(connected_subchannel)); +} + +template +SubchannelData::Watcher::Updater:: + Updater( + SubchannelData* subchannel_data, + RefCountedPtr> + subchannel_list, + grpc_connectivity_state state, + RefCountedPtr connected_subchannel) + : subchannel_data_(subchannel_data), + subchannel_list_(std::move(subchannel_list)), + state_(state), + connected_subchannel_(std::move(connected_subchannel)) { + GRPC_CLOSURE_INIT(&closure_, &OnUpdateLocked, this, + grpc_combiner_scheduler(subchannel_list_->combiner_)); + GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); +} + +template +void SubchannelData::Watcher::Updater:: + OnUpdateLocked(void* arg, grpc_error* error) { + Updater* self = static_cast(arg); + SubchannelData* sd = self->subchannel_data_; + if (GRPC_TRACE_FLAG_ENABLED(*sd->subchannel_list_->tracer())) { + gpr_log(GPR_INFO, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): connectivity changed: state=%s, " + "connected_subchannel=%p, shutting_down=%d, pending_watcher=%p", + sd->subchannel_list_->tracer()->name(), + sd->subchannel_list_->policy(), sd->subchannel_list_, sd->Index(), + sd->subchannel_list_->num_subchannels(), sd->subchannel_, + grpc_connectivity_state_name(self->state_), + self->connected_subchannel_.get(), + sd->subchannel_list_->shutting_down(), sd->pending_watcher_); + } + if (!sd->subchannel_list_->shutting_down() && + sd->pending_watcher_ != nullptr) { + sd->connectivity_state_ = self->state_; + // Get or release ref to connected subchannel. + sd->connected_subchannel_ = std::move(self->connected_subchannel_); + // Call the subclass's ProcessConnectivityChangeLocked() method. + sd->ProcessConnectivityChangeLocked(sd->connectivity_state_); + } + // Clean up. + Delete(self); +} + // // SubchannelData // @@ -275,23 +360,16 @@ class SubchannelList : public InternallyRefCounted { template SubchannelData::SubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, Subchannel* subchannel, - grpc_combiner* combiner) + const ServerAddress& address, Subchannel* subchannel) : subchannel_list_(subchannel_list), subchannel_(subchannel), // We assume that the current state is IDLE. If not, we'll get a // callback telling us that. - pending_connectivity_state_unsafe_(GRPC_CHANNEL_IDLE) { - GRPC_CLOSURE_INIT( - &connectivity_changed_closure_, - (&SubchannelData::OnConnectivityChangedLocked), - this, grpc_combiner_scheduler(combiner)); -} + connectivity_state_(GRPC_CHANNEL_IDLE) {} template SubchannelData::~SubchannelData() { - UnrefSubchannelLocked("subchannel_data_destroy"); + GPR_ASSERT(subchannel_ == nullptr); } template @@ -326,56 +404,19 @@ void SubchannelDatatracer())) { gpr_log(GPR_INFO, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): starting watch: requesting connectivity change " - "notification (from %s)", - subchannel_list_->tracer()->name(), subchannel_list_->policy(), - subchannel_list_, Index(), subchannel_list_->num_subchannels(), - subchannel_, - grpc_connectivity_state_name(pending_connectivity_state_unsafe_)); - } - GPR_ASSERT(!connectivity_notification_pending_); - connectivity_notification_pending_ = true; - subchannel_list()->Ref(DEBUG_LOCATION, "connectivity_watch").release(); - subchannel_->NotifyOnStateChange( - subchannel_list_->policy()->interested_parties(), - &pending_connectivity_state_unsafe_, &connectivity_changed_closure_, - subchannel_list_->inhibit_health_checking()); -} - -template -void SubchannelData::RenewConnectivityWatchLocked() { - if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) { - gpr_log(GPR_INFO, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): renewing watch: requesting connectivity change " - "notification (from %s)", + " (subchannel %p): starting watch (from %s)", subchannel_list_->tracer()->name(), subchannel_list_->policy(), subchannel_list_, Index(), subchannel_list_->num_subchannels(), - subchannel_, - grpc_connectivity_state_name(pending_connectivity_state_unsafe_)); + subchannel_, grpc_connectivity_state_name(connectivity_state_)); } - GPR_ASSERT(connectivity_notification_pending_); - subchannel_->NotifyOnStateChange( - subchannel_list_->policy()->interested_parties(), - &pending_connectivity_state_unsafe_, &connectivity_changed_closure_, - subchannel_list_->inhibit_health_checking()); -} - -template -void SubchannelData::StopConnectivityWatchLocked() { - if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) { - gpr_log(GPR_INFO, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): stopping connectivity watch", - subchannel_list_->tracer()->name(), subchannel_list_->policy(), - subchannel_list_, Index(), subchannel_list_->num_subchannels(), - subchannel_); - } - GPR_ASSERT(connectivity_notification_pending_); - connectivity_notification_pending_ = false; - subchannel_list()->Unref(DEBUG_LOCATION, "connectivity_watch"); + GPR_ASSERT(pending_watcher_ == nullptr); + pending_watcher_ = + New(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher")); + subchannel_->WatchConnectivityState( + connectivity_state_, + UniquePtr( + gpr_strdup(subchannel_list_->health_check_service_name())), + UniquePtr(pending_watcher_)); } template @@ -389,91 +430,17 @@ void SubchannelData:: subchannel_list_, Index(), subchannel_list_->num_subchannels(), subchannel_, reason); } - GPR_ASSERT(connectivity_notification_pending_); - subchannel_->NotifyOnStateChange(nullptr, nullptr, - &connectivity_changed_closure_, - subchannel_list_->inhibit_health_checking()); -} - -template -bool SubchannelData::UpdateConnectedSubchannelLocked() { - // If the subchannel is READY, take a ref to the connected subchannel. - if (pending_connectivity_state_unsafe_ == GRPC_CHANNEL_READY) { - connected_subchannel_ = subchannel_->connected_subchannel(); - // If the subchannel became disconnected between the time that READY - // was reported and the time we got here (e.g., between when a - // notification callback is scheduled and when it was actually run in - // the combiner), then the connected subchannel may have disappeared out - // from under us. In that case, we don't actually want to consider the - // subchannel to be in state READY. Instead, we use IDLE as the - // basis for any future connectivity watch; this is the one state that - // the subchannel will never transition back into, so this ensures - // that we will get a notification for the next state, even if that state - // is READY again (e.g., if the subchannel has transitioned back to - // READY before the next watch gets requested). - if (connected_subchannel_ == nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) { - gpr_log(GPR_INFO, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): state is READY but connected subchannel is " - "null; moving to state IDLE", - subchannel_list_->tracer()->name(), subchannel_list_->policy(), - subchannel_list_, Index(), subchannel_list_->num_subchannels(), - subchannel_); - } - pending_connectivity_state_unsafe_ = GRPC_CHANNEL_IDLE; - return false; - } - } else { - // For any state other than READY, unref the connected subchannel. - connected_subchannel_.reset(); - } - return true; -} - -template -void SubchannelData:: - OnConnectivityChangedLocked(void* arg, grpc_error* error) { - SubchannelData* sd = static_cast(arg); - if (GRPC_TRACE_FLAG_ENABLED(*sd->subchannel_list_->tracer())) { - gpr_log( - GPR_INFO, - "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): connectivity changed: state=%s, error=%s, " - "shutting_down=%d", - sd->subchannel_list_->tracer()->name(), sd->subchannel_list_->policy(), - sd->subchannel_list_, sd->Index(), - sd->subchannel_list_->num_subchannels(), sd->subchannel_, - grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe_), - grpc_error_string(error), sd->subchannel_list_->shutting_down()); - } - // If shutting down, unref subchannel and stop watching. - if (sd->subchannel_list_->shutting_down() || error == GRPC_ERROR_CANCELLED) { - sd->UnrefSubchannelLocked("connectivity_shutdown"); - sd->StopConnectivityWatchLocked(); - return; + if (pending_watcher_ != nullptr) { + subchannel_->CancelConnectivityStateWatch( + subchannel_list_->health_check_service_name(), pending_watcher_); + pending_watcher_ = nullptr; } - // Get or release ref to connected subchannel. - if (!sd->UpdateConnectedSubchannelLocked()) { - // We don't want to report this connectivity state, so renew the watch. - sd->RenewConnectivityWatchLocked(); - return; - } - // Call the subclass's ProcessConnectivityChangeLocked() method. - sd->ProcessConnectivityChangeLocked(sd->pending_connectivity_state_unsafe_); } template void SubchannelData::ShutdownLocked() { - // If there's a pending notification for this subchannel, cancel it; - // the callback is responsible for unreffing the subchannel. - // Otherwise, unref the subchannel directly. - if (connectivity_notification_pending_) { - CancelConnectivityWatchLocked("shutdown"); - } else if (subchannel_ != nullptr) { - UnrefSubchannelLocked("shutdown"); - } + if (pending_watcher_ != nullptr) CancelConnectivityWatchLocked("shutdown"); + UnrefSubchannelLocked("shutdown"); } // @@ -496,14 +463,25 @@ SubchannelList::SubchannelList( tracer_->name(), policy, this, addresses.size()); } subchannels_.reserve(addresses.size()); + // Find health check service name. + const bool inhibit_health_checking = grpc_channel_arg_get_bool( + grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false); + if (!inhibit_health_checking) { + const char* health_check_service_name = grpc_channel_arg_get_string( + grpc_channel_args_find(&args, "grpc.temp.health_check")); + if (health_check_service_name != nullptr) { + health_check_service_name_.reset(gpr_strdup(health_check_service_name)); + } + } // We need to remove the LB addresses in order to be able to compare the // subchannel keys of subchannels from a different batch of addresses. - // We also remove the inhibit-health-checking arg, since we are + // We also remove the health-checking-related args, since we are // handling that here. - inhibit_health_checking_ = grpc_channel_arg_get_bool( - grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false); - static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, - GRPC_ARG_INHIBIT_HEALTH_CHECKING}; + // We remove the service config, since it will be passed into the + // subchannel via call context. + static const char* keys_to_remove[] = { + GRPC_ARG_SUBCHANNEL_ADDRESS, "grpc.temp.health_check", + GRPC_ARG_INHIBIT_HEALTH_CHECKING, GRPC_ARG_SERVICE_CONFIG}; // Create a subchannel for each address. for (size_t i = 0; i < addresses.size(); i++) { // TODO(roth): we should ideally hide this from the LB policy code. In @@ -549,7 +527,7 @@ SubchannelList::SubchannelList( address_uri); gpr_free(address_uri); } - subchannels_.emplace_back(this, addresses[i], subchannel, combiner); + subchannels_.emplace_back(this, addresses[i], subchannel); } } diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc index b6bc3eabc49..4e383f65dd1 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc @@ -536,7 +536,7 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked( if (process_resolver_result_ != nullptr) { grpc_error* service_config_error = GRPC_ERROR_NONE; service_config_changed = process_resolver_result_( - process_resolver_result_user_data_, result, &lb_policy_name, + process_resolver_result_user_data_, &result, &lb_policy_name, &lb_policy_config, &service_config_error); if (service_config_error != GRPC_ERROR_NONE) { service_config_error_string = diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.h b/src/core/ext/filters/client_channel/resolving_lb_policy.h index b7d99dcc7de..0ca6c9563f9 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.h +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.h @@ -69,8 +69,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy { // empty, it means that we don't have a valid service config to use, and we // should set the channel to be in TRANSIENT_FAILURE. typedef bool (*ProcessResolverResultCallback)( - void* user_data, const Resolver::Result& result, - const char** lb_policy_name, + void* user_data, Resolver::Result* result, const char** lb_policy_name, RefCountedPtr* lb_policy_config, grpc_error** service_config_error); // If error is set when this returns, then construction failed, and diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index a284e692b09..cd778976166 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -303,8 +303,7 @@ void SubchannelCall::IncrementRefCount(const grpc_core::DebugLocation& location, // Subchannel::ConnectedSubchannelStateWatcher // -class Subchannel::ConnectedSubchannelStateWatcher - : public InternallyRefCounted { +class Subchannel::ConnectedSubchannelStateWatcher { public: // Must be instantiated while holding c->mu. explicit ConnectedSubchannelStateWatcher(Subchannel* c) : subchannel_(c) { @@ -312,38 +311,17 @@ class Subchannel::ConnectedSubchannelStateWatcher GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting"); // Start watching for connectivity state changes. - // Callback uses initial ref to this. GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChanged, this, grpc_schedule_on_exec_ctx); c->connected_subchannel_->NotifyOnStateChange(c->pollset_set_, &pending_connectivity_state_, &on_connectivity_changed_); - // Start health check if needed. - grpc_connectivity_state health_state = GRPC_CHANNEL_READY; - if (c->health_check_service_name_ != nullptr) { - health_check_client_ = MakeOrphanable( - c->health_check_service_name_.get(), c->connected_subchannel_, - c->pollset_set_, c->channelz_node_); - GRPC_CLOSURE_INIT(&on_health_changed_, OnHealthChanged, this, - grpc_schedule_on_exec_ctx); - Ref().release(); // Ref for health callback tracked manually. - health_check_client_->NotifyOnHealthChange(&health_state_, - &on_health_changed_); - health_state = GRPC_CHANNEL_CONNECTING; - } - // Report initial state. - c->SetConnectivityStateLocked(GRPC_CHANNEL_READY, "subchannel_connected"); - grpc_connectivity_state_set(&c->state_and_health_tracker_, health_state, - "subchannel_connected"); } ~ConnectedSubchannelStateWatcher() { GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "state_watcher"); } - // Must be called while holding subchannel_->mu. - void Orphan() override { health_check_client_.reset(); } - private: static void OnConnectivityChanged(void* arg, grpc_error* error) { auto* self = static_cast(arg); @@ -363,20 +341,10 @@ class Subchannel::ConnectedSubchannelStateWatcher self->pending_connectivity_state_)); } c->connected_subchannel_.reset(); - c->connected_subchannel_watcher_.reset(); - self->last_connectivity_state_ = GRPC_CHANNEL_TRANSIENT_FAILURE; - c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, - "reflect_child"); - grpc_connectivity_state_set(&c->state_and_health_tracker_, - GRPC_CHANNEL_TRANSIENT_FAILURE, - "reflect_child"); + c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE); c->backoff_begun_ = false; c->backoff_.Reset(); - c->MaybeStartConnectingLocked(); - } else { - self->last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN; } - self->health_check_client_.reset(); break; } default: { @@ -384,96 +352,246 @@ class Subchannel::ConnectedSubchannelStateWatcher // a callback for READY, because that was the state we started // this watch from. And a connected subchannel should never go // from READY to CONNECTING or IDLE. - self->last_connectivity_state_ = self->pending_connectivity_state_; - c->SetConnectivityStateLocked(self->pending_connectivity_state_, - "reflect_child"); - if (self->pending_connectivity_state_ != GRPC_CHANNEL_READY) { - grpc_connectivity_state_set(&c->state_and_health_tracker_, - self->pending_connectivity_state_, - "reflect_child"); - } + c->SetConnectivityStateLocked(self->pending_connectivity_state_); c->connected_subchannel_->NotifyOnStateChange( nullptr, &self->pending_connectivity_state_, &self->on_connectivity_changed_); - self = nullptr; // So we don't unref below. + return; // So we don't delete ourself below. } } } - // Don't unref until we've released the lock, because this might + // Don't delete until we've released the lock, because this might // cause the subchannel (which contains the lock) to be destroyed. - if (self != nullptr) self->Unref(); + Delete(self); + } + + Subchannel* subchannel_; + grpc_closure on_connectivity_changed_; + grpc_connectivity_state pending_connectivity_state_ = GRPC_CHANNEL_READY; +}; + +// +// Subchannel::ConnectivityStateWatcherList +// + +void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked( + UniquePtr watcher) { + watcher->next_ = head_; + head_ = watcher.release(); +} + +void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked( + ConnectivityStateWatcher* watcher) { + for (ConnectivityStateWatcher** w = &head_; *w != nullptr; w = &(*w)->next_) { + if (*w == watcher) { + *w = watcher->next_; + Delete(watcher); + return; + } + } + GPR_UNREACHABLE_CODE(return ); +} + +void Subchannel::ConnectivityStateWatcherList::NotifyLocked( + Subchannel* subchannel, grpc_connectivity_state state) { + for (ConnectivityStateWatcher* w = head_; w != nullptr; w = w->next_) { + RefCountedPtr connected_subchannel; + if (state == GRPC_CHANNEL_READY) { + connected_subchannel = subchannel->connected_subchannel_; + } + // TODO(roth): In principle, it seems wrong to send this notification + // to the watcher while holding the subchannel's mutex, since it could + // lead to a deadlock if the watcher calls back into the subchannel + // before returning back to us. In practice, this doesn't happen, + // because the LB policy code that watches subchannels always bounces + // the notification into the client_channel control-plane combiner + // before processing it. But if we ever have any other callers here, + // we will probably need to change this. + w->OnConnectivityStateChange(state, std::move(connected_subchannel)); + } +} + +void Subchannel::ConnectivityStateWatcherList::Clear() { + while (head_ != nullptr) { + ConnectivityStateWatcher* next = head_->next_; + Delete(head_); + head_ = next; + } +} + +// +// Subchannel::HealthWatcherMap::HealthWatcher +// + +// State needed for tracking the connectivity state with a particular +// health check service name. +class Subchannel::HealthWatcherMap::HealthWatcher + : public InternallyRefCounted { + public: + HealthWatcher(Subchannel* c, UniquePtr health_check_service_name, + grpc_connectivity_state subchannel_state) + : subchannel_(c), + health_check_service_name_(std::move(health_check_service_name)), + state_(subchannel_state == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING + : subchannel_state) { + GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "health_watcher"); + GRPC_CLOSURE_INIT(&on_health_changed_, OnHealthChanged, this, + grpc_schedule_on_exec_ctx); + // If the subchannel is already connected, start health checking. + if (subchannel_state == GRPC_CHANNEL_READY) StartHealthCheckingLocked(); + } + + ~HealthWatcher() { + GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "health_watcher"); + } + + const char* health_check_service_name() const { + return health_check_service_name_.get(); + } + + grpc_connectivity_state state() const { return state_; } + + void AddWatcherLocked(grpc_connectivity_state initial_state, + UniquePtr watcher) { + if (state_ != initial_state) { + RefCountedPtr connected_subchannel; + if (state_ == GRPC_CHANNEL_READY) { + connected_subchannel = subchannel_->connected_subchannel_; + } + watcher->OnConnectivityStateChange(state_, + std::move(connected_subchannel)); + } + watcher_list_.AddWatcherLocked(std::move(watcher)); + } + + void RemoveWatcherLocked(ConnectivityStateWatcher* watcher) { + watcher_list_.RemoveWatcherLocked(watcher); + } + + bool HasWatchers() const { return !watcher_list_.empty(); } + + void NotifyLocked(grpc_connectivity_state state) { + if (state == GRPC_CHANNEL_READY) { + // If we had not already notified for CONNECTING state, do so now. + // (We may have missed this earlier, because if the transition + // from IDLE to CONNECTING to READY was too quick, the connected + // subchannel may not have sent us a notification for CONNECTING.) + if (state_ != GRPC_CHANNEL_CONNECTING) { + state_ = GRPC_CHANNEL_CONNECTING; + watcher_list_.NotifyLocked(subchannel_, state_); + } + // If we've become connected, start health checking. + StartHealthCheckingLocked(); + } else { + state_ = state; + watcher_list_.NotifyLocked(subchannel_, state_); + // We're not connected, so stop health checking. + health_check_client_.reset(); + } + } + + void Orphan() override { + watcher_list_.Clear(); + health_check_client_.reset(); + Unref(); + } + + private: + void StartHealthCheckingLocked() { + GPR_ASSERT(health_check_client_ == nullptr); + health_check_client_ = MakeOrphanable( + health_check_service_name_.get(), subchannel_->connected_subchannel_, + subchannel_->pollset_set_, subchannel_->channelz_node_); + Ref().release(); // Ref for health callback tracked manually. + health_check_client_->NotifyOnHealthChange(&state_, &on_health_changed_); } static void OnHealthChanged(void* arg, grpc_error* error) { - auto* self = static_cast(arg); + auto* self = static_cast(arg); Subchannel* c = self->subchannel_; { MutexLock lock(&c->mu_); - if (self->health_state_ != GRPC_CHANNEL_SHUTDOWN && + if (self->state_ != GRPC_CHANNEL_SHUTDOWN && self->health_check_client_ != nullptr) { - if (self->last_connectivity_state_ == GRPC_CHANNEL_READY) { - grpc_connectivity_state_set(&c->state_and_health_tracker_, - self->health_state_, "health_changed"); - } + self->watcher_list_.NotifyLocked(c, self->state_); + // Renew watch. self->health_check_client_->NotifyOnHealthChange( - &self->health_state_, &self->on_health_changed_); - self = nullptr; // So we don't unref below. + &self->state_, &self->on_health_changed_); + return; // So we don't unref below. } } // Don't unref until we've released the lock, because this might // cause the subchannel (which contains the lock) to be destroyed. - if (self != nullptr) self->Unref(); + self->Unref(); } Subchannel* subchannel_; - grpc_closure on_connectivity_changed_; - grpc_connectivity_state pending_connectivity_state_ = GRPC_CHANNEL_READY; - grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_READY; + UniquePtr health_check_service_name_; OrphanablePtr health_check_client_; grpc_closure on_health_changed_; - grpc_connectivity_state health_state_ = GRPC_CHANNEL_CONNECTING; + grpc_connectivity_state state_; + ConnectivityStateWatcherList watcher_list_; }; // -// Subchannel::ExternalStateWatcher +// Subchannel::HealthWatcherMap // -struct Subchannel::ExternalStateWatcher { - ExternalStateWatcher(Subchannel* subchannel, grpc_pollset_set* pollset_set, - grpc_closure* notify) - : subchannel(subchannel), pollset_set(pollset_set), notify(notify) { - GRPC_SUBCHANNEL_WEAK_REF(subchannel, "external_state_watcher+init"); - GRPC_CLOSURE_INIT(&on_state_changed, OnStateChanged, this, - grpc_schedule_on_exec_ctx); +void Subchannel::HealthWatcherMap::AddWatcherLocked( + Subchannel* subchannel, grpc_connectivity_state initial_state, + UniquePtr health_check_service_name, + UniquePtr watcher) { + // If the health check service name is not already present in the map, + // add it. + auto it = map_.find(health_check_service_name.get()); + HealthWatcher* health_watcher; + if (it == map_.end()) { + const char* key = health_check_service_name.get(); + auto w = MakeOrphanable( + subchannel, std::move(health_check_service_name), subchannel->state_); + health_watcher = w.get(); + map_[key] = std::move(w); + } else { + health_watcher = it->second.get(); + } + // Add the watcher to the entry. + health_watcher->AddWatcherLocked(initial_state, std::move(watcher)); +} + +void Subchannel::HealthWatcherMap::RemoveWatcherLocked( + const char* health_check_service_name, ConnectivityStateWatcher* watcher) { + auto it = map_.find(health_check_service_name); + GPR_ASSERT(it != map_.end()); + it->second->RemoveWatcherLocked(watcher); + // If we just removed the last watcher for this service name, remove + // the map entry. + if (!it->second->HasWatchers()) map_.erase(it); +} + +void Subchannel::HealthWatcherMap::NotifyLocked(grpc_connectivity_state state) { + for (const auto& p : map_) { + p.second->NotifyLocked(state); } +} - static void OnStateChanged(void* arg, grpc_error* error) { - ExternalStateWatcher* w = static_cast(arg); - grpc_closure* follow_up = w->notify; - if (w->pollset_set != nullptr) { - grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set_, - w->pollset_set); - } - { - MutexLock lock(&w->subchannel->mu_); - if (w->subchannel->external_state_watcher_list_ == w) { - w->subchannel->external_state_watcher_list_ = w->next; - } - if (w->next != nullptr) w->next->prev = w->prev; - if (w->prev != nullptr) w->prev->next = w->next; - } - GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher+done"); - Delete(w); - GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); +grpc_connectivity_state +Subchannel::HealthWatcherMap::CheckConnectivityStateLocked( + Subchannel* subchannel, const char* health_check_service_name) { + auto it = map_.find(health_check_service_name); + if (it == map_.end()) { + // If the health check service name is not found in the map, we're + // not currently doing a health check for that service name. If the + // subchannel's state without health checking is READY, report + // CONNECTING, since that's what we'd be in as soon as we do start a + // watch. Otherwise, report the channel's state without health checking. + return subchannel->state_ == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING + : subchannel->state_; } + HealthWatcher* health_watcher = it->second.get(); + return health_watcher->state(); +} - Subchannel* subchannel; - grpc_pollset_set* pollset_set; - grpc_closure* notify; - grpc_closure on_state_changed; - ExternalStateWatcher* next = nullptr; - ExternalStateWatcher* prev = nullptr; -}; +void Subchannel::HealthWatcherMap::ShutdownLocked() { map_.clear(); } // // Subchannel @@ -560,13 +678,6 @@ Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector, if (new_args != nullptr) grpc_channel_args_destroy(new_args); GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this, grpc_schedule_on_exec_ctx); - grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, - "subchannel"); - grpc_connectivity_state_init(&state_and_health_tracker_, GRPC_CHANNEL_IDLE, - "subchannel"); - health_check_service_name_ = - UniquePtr(gpr_strdup(grpc_channel_arg_get_string( - grpc_channel_args_find(args_, "grpc.temp.health_check")))); const grpc_arg* arg = grpc_channel_args_find(args_, GRPC_ARG_ENABLE_CHANNELZ); const bool channelz_enabled = grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT); @@ -593,8 +704,6 @@ Subchannel::~Subchannel() { channelz_node_->MarkSubchannelDestroyed(); } grpc_channel_args_destroy(args_); - grpc_connectivity_state_destroy(&state_tracker_); - grpc_connectivity_state_destroy(&state_and_health_tracker_); grpc_connector_unref(connector_); grpc_pollset_set_destroy(pollset_set_); Delete(key_); @@ -698,55 +807,67 @@ const char* Subchannel::GetTargetAddress() { return addr_str; } -RefCountedPtr Subchannel::connected_subchannel() { - MutexLock lock(&mu_); - return connected_subchannel_; -} - channelz::SubchannelNode* Subchannel::channelz_node() { return channelz_node_.get(); } -grpc_connectivity_state Subchannel::CheckConnectivity( - bool inhibit_health_checking) { - grpc_connectivity_state_tracker* tracker = - inhibit_health_checking ? &state_tracker_ : &state_and_health_tracker_; - grpc_connectivity_state state = grpc_connectivity_state_check(tracker); +grpc_connectivity_state Subchannel::CheckConnectivityState( + const char* health_check_service_name, + RefCountedPtr* connected_subchannel) { + MutexLock lock(&mu_); + grpc_connectivity_state state; + if (health_check_service_name == nullptr) { + state = state_; + } else { + state = health_watcher_map_.CheckConnectivityStateLocked( + this, health_check_service_name); + } + if (connected_subchannel != nullptr && state == GRPC_CHANNEL_READY) { + *connected_subchannel = connected_subchannel_; + } return state; } -void Subchannel::NotifyOnStateChange(grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, - grpc_closure* notify, - bool inhibit_health_checking) { - grpc_connectivity_state_tracker* tracker = - inhibit_health_checking ? &state_tracker_ : &state_and_health_tracker_; - ExternalStateWatcher* w; - if (state == nullptr) { - MutexLock lock(&mu_); - for (w = external_state_watcher_list_; w != nullptr; w = w->next) { - if (w->notify == notify) { - grpc_connectivity_state_notify_on_state_change(tracker, nullptr, - &w->on_state_changed); - } +void Subchannel::WatchConnectivityState( + grpc_connectivity_state initial_state, + UniquePtr health_check_service_name, + UniquePtr watcher) { + MutexLock lock(&mu_); + grpc_pollset_set* interested_parties = watcher->interested_parties(); + if (interested_parties != nullptr) { + grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties); + } + if (health_check_service_name == nullptr) { + if (state_ != initial_state) { + watcher->OnConnectivityStateChange(state_, connected_subchannel_); } + watcher_list_.AddWatcherLocked(std::move(watcher)); } else { - w = New(this, interested_parties, notify); - if (interested_parties != nullptr) { - grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties); - } - MutexLock lock(&mu_); - if (external_state_watcher_list_ != nullptr) { - w->next = external_state_watcher_list_; - w->next->prev = w; - } - external_state_watcher_list_ = w; - grpc_connectivity_state_notify_on_state_change(tracker, state, - &w->on_state_changed); - MaybeStartConnectingLocked(); + health_watcher_map_.AddWatcherLocked(this, initial_state, + std::move(health_check_service_name), + std::move(watcher)); } } +void Subchannel::CancelConnectivityStateWatch( + const char* health_check_service_name, ConnectivityStateWatcher* watcher) { + MutexLock lock(&mu_); + grpc_pollset_set* interested_parties = watcher->interested_parties(); + if (interested_parties != nullptr) { + grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties); + } + if (health_check_service_name == nullptr) { + watcher_list_.RemoveWatcherLocked(watcher); + } else { + health_watcher_map_.RemoveWatcherLocked(health_check_service_name, watcher); + } +} + +void Subchannel::AttemptToConnect() { + MutexLock lock(&mu_); + MaybeStartConnectingLocked(); +} + void Subchannel::ResetBackoff() { MutexLock lock(&mu_); backoff_.Reset(); @@ -818,15 +939,19 @@ const char* SubchannelConnectivityStateChangeString( } // namespace -void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state, - const char* reason) { +// Note: Must be called with a state that is different from the current state. +void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state) { + state_ = state; if (channelz_node_ != nullptr) { channelz_node_->AddTraceEvent( channelz::ChannelTrace::Severity::Info, grpc_slice_from_static_string( SubchannelConnectivityStateChangeString(state))); } - grpc_connectivity_state_set(&state_tracker_, state, reason); + // Notify non-health watchers. + watcher_list_.NotifyLocked(this, state); + // Notify health watchers. + health_watcher_map_.NotifyLocked(state); } void Subchannel::MaybeStartConnectingLocked() { @@ -842,11 +967,6 @@ void Subchannel::MaybeStartConnectingLocked() { // Already connected: don't restart. return; } - if (!grpc_connectivity_state_has_watchers(&state_tracker_) && - !grpc_connectivity_state_has_watchers(&state_and_health_tracker_)) { - // Nobody is interested in connecting: so don't just yet. - return; - } connecting_ = true; GRPC_SUBCHANNEL_WEAK_REF(this, "connecting"); if (!backoff_begun_) { @@ -903,9 +1023,7 @@ void Subchannel::ContinueConnectingLocked() { next_attempt_deadline_ = backoff_.NextAttemptTime(); args.deadline = std::max(next_attempt_deadline_, min_deadline); args.channel_args = args_; - SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, "connecting"); - grpc_connectivity_state_set(&state_and_health_tracker_, - GRPC_CHANNEL_CONNECTING, "connecting"); + SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING); grpc_connector_connect(connector_, &args, &connecting_result_, &on_connecting_finished_); } @@ -924,12 +1042,7 @@ void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) { GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } else { gpr_log(GPR_INFO, "Connect failed: %s", grpc_error_string(error)); - c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, - "connect_failed"); - grpc_connectivity_state_set(&c->state_and_health_tracker_, - GRPC_CHANNEL_TRANSIENT_FAILURE, - "connect_failed"); - c->MaybeStartConnectingLocked(); + c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } } @@ -982,8 +1095,9 @@ bool Subchannel::PublishTransportLocked() { gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", connected_subchannel_.get(), this); // Instantiate state watcher. Will clean itself up. - connected_subchannel_watcher_ = - MakeOrphanable(this); + New(this); + // Report initial state. + SetConnectivityStateLocked(GRPC_CHANNEL_READY); return true; } @@ -1000,7 +1114,7 @@ void Subchannel::Disconnect() { grpc_connector_shutdown(connector_, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Subchannel disconnected")); connected_subchannel_.reset(); - connected_subchannel_watcher_.reset(); + health_watcher_map_.ShutdownLocked(); } gpr_atm Subchannel::RefMutate( diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index e3efc8900ae..e0741bb28fa 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -27,6 +27,7 @@ #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/gprpp/arena.h" +#include "src/core/lib/gprpp/map.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" @@ -77,7 +78,7 @@ class ConnectedSubchannel : public RefCounted { grpc_millis deadline; Arena* arena; grpc_call_context_element* context; - grpc_core::CallCombiner* call_combiner; + CallCombiner* call_combiner; size_t parent_data_size; }; @@ -175,7 +176,38 @@ class SubchannelCall { // A subchannel that knows how to connect to exactly one target address. It // provides a target for load balancing. class Subchannel { + private: + class ConnectivityStateWatcherList; // Forward declaration. + public: + class ConnectivityStateWatcher { + public: + virtual ~ConnectivityStateWatcher() = default; + + // Will be invoked whenever the subchannel's connectivity state + // changes. There will be only one invocation of this method on a + // given watcher instance at any given time. + // + // When the state changes to READY, connected_subchannel will + // contain a ref to the connected subchannel. When it changes from + // READY to some other state, the implementation must release its + // ref to the connected subchannel. + virtual void OnConnectivityStateChange( + grpc_connectivity_state new_state, + RefCountedPtr connected_subchannel) // NOLINT + GRPC_ABSTRACT; + + virtual grpc_pollset_set* interested_parties() GRPC_ABSTRACT; + + GRPC_ABSTRACT_BASE_CLASS + + private: + // For access to next_. + friend class Subchannel::ConnectivityStateWatcherList; + + ConnectivityStateWatcher* next_ = nullptr; + }; + // The ctor and dtor are not intended to use directly. Subchannel(SubchannelKey* key, grpc_connector* connector, const grpc_channel_args* args); @@ -201,20 +233,36 @@ class Subchannel { // Caller doesn't take ownership. const char* GetTargetAddress(); - // Gets the connected subchannel - or nullptr if not connected (which may - // happen before it initially connects or during transient failures). - RefCountedPtr connected_subchannel(); - channelz::SubchannelNode* channelz_node(); - // Polls the current connectivity state of the subchannel. - grpc_connectivity_state CheckConnectivity(bool inhibit_health_checking); - - // When the connectivity state of the subchannel changes from \a *state, - // invokes \a notify and updates \a *state with the new state. - void NotifyOnStateChange(grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, grpc_closure* notify, - bool inhibit_health_checking); + // Returns the current connectivity state of the subchannel. + // If health_check_service_name is non-null, the returned connectivity + // state will be based on the state reported by the backend for that + // service name. + // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel. + grpc_connectivity_state CheckConnectivityState( + const char* health_check_service_name, + RefCountedPtr* connected_subchannel); + + // Starts watching the subchannel's connectivity state. + // The first callback to the watcher will be delivered when the + // subchannel's connectivity state becomes a value other than + // initial_state, which may happen immediately. + // Subsequent callbacks will be delivered as the subchannel's state + // changes. + // The watcher will be destroyed either when the subchannel is + // destroyed or when CancelConnectivityStateWatch() is called. + void WatchConnectivityState(grpc_connectivity_state initial_state, + UniquePtr health_check_service_name, + UniquePtr watcher); + + // Cancels a connectivity state watch. + // If the watcher has already been destroyed, this is a no-op. + void CancelConnectivityStateWatch(const char* health_check_service_name, + ConnectivityStateWatcher* watcher); + + // Attempt to connect to the backend. Has no effect if already connected. + void AttemptToConnect(); // Resets the connection backoff of the subchannel. // TODO(roth): Move connection backoff out of subchannels and up into LB @@ -236,12 +284,62 @@ class Subchannel { grpc_resolved_address* addr); private: - struct ExternalStateWatcher; + // A linked list of ConnectivityStateWatchers that are monitoring the + // subchannel's state. + class ConnectivityStateWatcherList { + public: + ~ConnectivityStateWatcherList() { Clear(); } + + void AddWatcherLocked(UniquePtr watcher); + void RemoveWatcherLocked(ConnectivityStateWatcher* watcher); + + // Notifies all watchers in the list about a change to state. + void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state); + + void Clear(); + + bool empty() const { return head_ == nullptr; } + + private: + ConnectivityStateWatcher* head_ = nullptr; + }; + + // A map that tracks ConnectivityStateWatchers using a particular health + // check service name. + // + // There is one entry in the map for each health check service name. + // Entries exist only as long as there are watchers using the + // corresponding service name. + // + // A health check client is maintained only while the subchannel is in + // state READY. + class HealthWatcherMap { + public: + void AddWatcherLocked(Subchannel* subchannel, + grpc_connectivity_state initial_state, + UniquePtr health_check_service_name, + UniquePtr watcher); + void RemoveWatcherLocked(const char* health_check_service_name, + ConnectivityStateWatcher* watcher); + + // Notifies the watcher when the subchannel's state changes. + void NotifyLocked(grpc_connectivity_state state); + + grpc_connectivity_state CheckConnectivityStateLocked( + Subchannel* subchannel, const char* health_check_service_name); + + void ShutdownLocked(); + + private: + class HealthWatcher; + + Map, StringLess> map_; + }; + class ConnectedSubchannelStateWatcher; // Sets the subchannel's connectivity state to \a state. - void SetConnectivityStateLocked(grpc_connectivity_state state, - const char* reason); + void SetConnectivityStateLocked(grpc_connectivity_state state); // Methods for connection. void MaybeStartConnectingLocked(); @@ -279,15 +377,15 @@ class Subchannel { grpc_closure on_connecting_finished_; // Active connection, or null. RefCountedPtr connected_subchannel_; - OrphanablePtr connected_subchannel_watcher_; bool connecting_ = false; bool disconnected_ = false; // Connectivity state tracking. - grpc_connectivity_state_tracker state_tracker_; - grpc_connectivity_state_tracker state_and_health_tracker_; - UniquePtr health_check_service_name_; - ExternalStateWatcher* external_state_watcher_list_ = nullptr; + grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE; + // The list of watchers without a health check service name. + ConnectivityStateWatcherList watcher_list_; + // The map of watchers with health check service names. + HealthWatcherMap health_watcher_map_; // Backoff state. BackOff backoff_; diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 766c38a64a1..b623348e13f 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -1019,8 +1019,8 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { auto channel = BuildChannel("round_robin"); auto stub = BuildStub(channel); std::vector ports; - // Start with a single server. + gpr_log(GPR_INFO, "*** FIRST BACKEND ***"); ports.emplace_back(servers_[0]->port_); SetNextResolution(ports); WaitForServer(stub, 0, DEBUG_LOCATION); @@ -1030,36 +1030,33 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[2]->service_.request_count()); servers_[0]->service_.ResetCounters(); - // And now for the second server. + gpr_log(GPR_INFO, "*** SECOND BACKEND ***"); ports.clear(); ports.emplace_back(servers_[1]->port_); SetNextResolution(ports); - // Wait until update has been processed, as signaled by the second backend // receiving a request. EXPECT_EQ(0, servers_[1]->service_.request_count()); WaitForServer(stub, 1, DEBUG_LOCATION); - for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION); EXPECT_EQ(0, servers_[0]->service_.request_count()); EXPECT_EQ(10, servers_[1]->service_.request_count()); EXPECT_EQ(0, servers_[2]->service_.request_count()); servers_[1]->service_.ResetCounters(); - // ... and for the last server. + gpr_log(GPR_INFO, "*** THIRD BACKEND ***"); ports.clear(); ports.emplace_back(servers_[2]->port_); SetNextResolution(ports); WaitForServer(stub, 2, DEBUG_LOCATION); - for (size_t i = 0; i < 10; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION); EXPECT_EQ(0, servers_[0]->service_.request_count()); EXPECT_EQ(0, servers_[1]->service_.request_count()); EXPECT_EQ(10, servers_[2]->service_.request_count()); servers_[2]->service_.ResetCounters(); - // Back to all servers. + gpr_log(GPR_INFO, "*** ALL BACKENDS ***"); ports.clear(); ports.emplace_back(servers_[0]->port_); ports.emplace_back(servers_[1]->port_); @@ -1068,14 +1065,13 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { WaitForServer(stub, 0, DEBUG_LOCATION); WaitForServer(stub, 1, DEBUG_LOCATION); WaitForServer(stub, 2, DEBUG_LOCATION); - // Send three RPCs, one per server. for (size_t i = 0; i < 3; ++i) CheckRpcSendOk(stub, DEBUG_LOCATION); EXPECT_EQ(1, servers_[0]->service_.request_count()); EXPECT_EQ(1, servers_[1]->service_.request_count()); EXPECT_EQ(1, servers_[2]->service_.request_count()); - // An empty update will result in the channel going into TRANSIENT_FAILURE. + gpr_log(GPR_INFO, "*** NO BACKENDS ***"); ports.clear(); SetNextResolution(ports); grpc_connectivity_state channel_state; @@ -1084,15 +1080,14 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { } while (channel_state == GRPC_CHANNEL_READY); ASSERT_NE(channel_state, GRPC_CHANNEL_READY); servers_[0]->service_.ResetCounters(); - // Next update introduces servers_[1], making the channel recover. + gpr_log(GPR_INFO, "*** BACK TO SECOND BACKEND ***"); ports.clear(); ports.emplace_back(servers_[1]->port_); SetNextResolution(ports); WaitForServer(stub, 1, DEBUG_LOCATION); channel_state = channel->GetState(false /* try to connect */); ASSERT_EQ(channel_state, GRPC_CHANNEL_READY); - // Check LB policy name for the channel. EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName()); } @@ -1211,8 +1206,9 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { auto channel = BuildChannel("round_robin"); auto stub = BuildStub(channel); SetNextResolution(ports); - for (size_t i = 0; i < kNumServers; ++i) + for (size_t i = 0; i < kNumServers; ++i) { WaitForServer(stub, i, DEBUG_LOCATION); + } for (size_t i = 0; i < servers_.size(); ++i) { CheckRpcSendOk(stub, DEBUG_LOCATION); EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i; @@ -1236,7 +1232,6 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) { // No requests have gone to the deceased server. EXPECT_EQ(pre_death, post_death); // Bring the first server back up. - servers_[0].reset(new ServerData(ports[0])); StartServer(0); // Requests should start arriving at the first server either right away (if // the server managed to start before the RR policy retried the subchannel) or @@ -1360,6 +1355,52 @@ TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingInhibitPerChannel) { // Second channel should be READY. EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1)); CheckRpcSendOk(stub2, DEBUG_LOCATION); + // Enable health checks on the backend and wait for channel 1 to succeed. + servers_[0]->SetServingStatus("health_check_service_name", true); + CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */); + // Check that we created only one subchannel to the backend. + EXPECT_EQ(1UL, servers_[0]->service_.clients().size()); + // Clean up. + EnableDefaultHealthCheckService(false); +} + +TEST_F(ClientLbEnd2endTest, RoundRobinWithHealthCheckingServiceNamePerChannel) { + EnableDefaultHealthCheckService(true); + // Start server. + const int kNumServers = 1; + StartServers(kNumServers); + // Create a channel with health-checking enabled. + ChannelArguments args; + args.SetServiceConfigJSON( + "{\"healthCheckConfig\": " + "{\"serviceName\": \"health_check_service_name\"}}"); + auto channel1 = BuildChannel("round_robin", args); + auto stub1 = BuildStub(channel1); + std::vector ports = GetServersPorts(); + SetNextResolution(ports); + // Create a channel with health-checking enabled with a different + // service name. + ChannelArguments args2; + args2.SetServiceConfigJSON( + "{\"healthCheckConfig\": " + "{\"serviceName\": \"health_check_service_name2\"}}"); + auto channel2 = BuildChannel("round_robin", args2); + auto stub2 = BuildStub(channel2); + SetNextResolution(ports); + // Allow health checks from channel 2 to succeed. + servers_[0]->SetServingStatus("health_check_service_name2", true); + // First channel should not become READY, because health checks should be + // failing. + EXPECT_FALSE(WaitForChannelReady(channel1.get(), 1)); + CheckRpcSendFailure(stub1); + // Second channel should be READY. + EXPECT_TRUE(WaitForChannelReady(channel2.get(), 1)); + CheckRpcSendOk(stub2, DEBUG_LOCATION); + // Enable health checks for channel 1 and wait for it to succeed. + servers_[0]->SetServingStatus("health_check_service_name", true); + CheckRpcSendOk(stub1, DEBUG_LOCATION, true /* wait_for_ready */); + // Check that we created only one subchannel to the backend. + EXPECT_EQ(1UL, servers_[0]->service_.clients().size()); // Clean up. EnableDefaultHealthCheckService(false); }