From 490be929b34e7ddfa419e010ada0bab70c329837 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 2 Oct 2019 10:56:23 -0700 Subject: [PATCH] Use more normal API for client channel connectivity watches from internal code --- .../filters/client_channel/client_channel.cc | 100 ++++++++++++++++++ .../filters/client_channel/client_channel.h | 30 ++++-- .../client_channel/lb_policy/xds/xds.cc | 94 ++++++++-------- src/core/lib/transport/connectivity_state.cc | 7 +- src/core/lib/transport/connectivity_state.h | 8 ++ 5 files changed, 177 insertions(+), 62 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 70746f4783f..923410846ed 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -179,9 +179,17 @@ class ChannelData { return static_cast(external_watchers_.size()); } + void AddConnectivityWatcher( + grpc_connectivity_state initial_state, + OrphanablePtr watcher); + void RemoveConnectivityWatcher( + AsyncConnectivityStateWatcherInterface* watcher); + private: class SubchannelWrapper; class ClientChannelControlHelper; + class ConnectivityWatcherAdder; + class ConnectivityWatcherRemover; // Represents a pending connectivity callback from an external caller // via grpc_client_channel_watch_connectivity_state(). @@ -1201,6 +1209,72 @@ void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked( self->chand_->state_tracker_.RemoveWatcher(self); } +// +// ChannelData::ConnectivityWatcherAdder +// + +class ChannelData::ConnectivityWatcherAdder { + public: + ConnectivityWatcherAdder( + ChannelData* chand, grpc_connectivity_state initial_state, + OrphanablePtr watcher) + : chand_(chand), + initial_state_(initial_state), + watcher_(std::move(watcher)) { + GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder"); + GRPC_CLOSURE_INIT(&closure_, &ConnectivityWatcherAdder::AddWatcherLocked, + this, grpc_combiner_scheduler(chand_->combiner_)); + GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); + } + + private: + static void AddWatcherLocked(void* arg, grpc_error* error) { + ConnectivityWatcherAdder* self = + static_cast(arg); + self->chand_->state_tracker_.AddWatcher(self->initial_state_, + std::move(self->watcher_)); + GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_, + "ConnectivityWatcherAdder"); + Delete(self); + } + + ChannelData* chand_; + grpc_connectivity_state initial_state_; + OrphanablePtr watcher_; + grpc_closure closure_; +}; + +// +// ChannelData::ConnectivityWatcherRemover +// + +class ChannelData::ConnectivityWatcherRemover { + public: + ConnectivityWatcherRemover(ChannelData* chand, + AsyncConnectivityStateWatcherInterface* watcher) + : chand_(chand), watcher_(watcher) { + GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover"); + GRPC_CLOSURE_INIT(&closure_, + &ConnectivityWatcherRemover::RemoveWatcherLocked, this, + grpc_combiner_scheduler(chand_->combiner_)); + GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); + } + + private: + static void RemoveWatcherLocked(void* arg, grpc_error* error) { + ConnectivityWatcherRemover* self = + static_cast(arg); + self->chand_->state_tracker_.RemoveWatcher(self->watcher_); + GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_, + "ConnectivityWatcherRemover"); + Delete(self); + } + + ChannelData* chand_; + AsyncConnectivityStateWatcherInterface* watcher_; + grpc_closure closure_; +}; + // // ChannelData::ClientChannelControlHelper // @@ -1885,6 +1959,17 @@ grpc_connectivity_state ChannelData::CheckConnectivityState( return out; } +void ChannelData::AddConnectivityWatcher( + grpc_connectivity_state initial_state, + OrphanablePtr watcher) { + New(this, initial_state, std::move(watcher)); +} + +void ChannelData::RemoveConnectivityWatcher( + AsyncConnectivityStateWatcherInterface* watcher) { + New(this, watcher); +} + // // CallData implementation // @@ -3936,6 +4021,21 @@ void grpc_client_channel_watch_connectivity_state( watcher_timer_init); } +void grpc_client_channel_start_connectivity_watch( + grpc_channel_element* elem, grpc_connectivity_state initial_state, + grpc_core::OrphanablePtr + watcher) { + auto* chand = static_cast(elem->channel_data); + chand->AddConnectivityWatcher(initial_state, std::move(watcher)); +} + +void grpc_client_channel_stop_connectivity_watch( + grpc_channel_element* elem, + grpc_core::AsyncConnectivityStateWatcherInterface* watcher) { + auto* chand = static_cast(elem->channel_data); + chand->RemoveConnectivityWatcher(watcher); +} + grpc_core::RefCountedPtr grpc_client_channel_get_subchannel_call(grpc_call_element* elem) { auto* calld = static_cast(elem->call_data); diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 72bcb404ce0..2ba2b87bb13 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -46,17 +46,35 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( int grpc_client_channel_num_external_connectivity_watchers( grpc_channel_element* elem); -// TODO(roth): This function is used both when handling external -// connectivity watchers and for LB policies like grpclb and xds that -// contain nested channels. In the latter case, we ideally want -// something closer to the normal connectivity state tracker API. -// When we have time, consider refactoring this somehow to allow each -// use-case to be handled more cleanly. +// Starts a one-time connectivity state watch. When the channel's state +// becomes different from *state, sets *state to the new state and +// schedules on_complete. The watcher_timer_init callback is invoked as +// soon as the watch is actually started (i.e., after hopping into the +// client channel combiner). I/O will be serviced via pollent. +// +// This is intended to be used when starting a watch from outside of C-core +// via grpc_channel_watch_connectivity_state(). It should not be used +// by other callers. void grpc_client_channel_watch_connectivity_state( grpc_channel_element* elem, grpc_polling_entity pollent, grpc_connectivity_state* state, grpc_closure* on_complete, grpc_closure* watcher_timer_init); +// Starts and stops a connectivity watch. The watcher will be initially +// notified as soon as the state changes from initial_state and then on +// every subsequent state change until either the watch is stopped or +// it is notified that the state has changed to SHUTDOWN. +// +// This is intended to be used when starting watches from code inside of +// C-core (e.g., for a nested control plane channel for things like xds). +void grpc_client_channel_start_connectivity_watch( + grpc_channel_element* elem, grpc_connectivity_state initial_state, + grpc_core::OrphanablePtr + watcher); +void grpc_client_channel_stop_connectivity_watch( + grpc_channel_element* elem, + grpc_core::AsyncConnectivityStateWatcherInterface* watcher); + /* Debug helper: pull the subchannel call from a call stack element */ grpc_core::RefCountedPtr grpc_client_channel_get_subchannel_call(grpc_call_element* elem); diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index e0fa6c31dcd..ee6578a2fb0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -355,17 +355,17 @@ class XdsLb : public LoadBalancingPolicy { void StartConnectivityWatchLocked(); void CancelConnectivityWatchLocked(); - static void OnConnectivityChangedLocked(void* arg, grpc_error* error); private: + class StateWatcher; + // The owning LB policy. RefCountedPtr xdslb_policy_; // The channel and its status. grpc_channel* channel_; bool shutting_down_ = false; - grpc_connectivity_state connectivity_ = GRPC_CHANNEL_IDLE; - grpc_closure on_connectivity_changed_; + StateWatcher* watcher_ = nullptr; // The retryable XDS calls to the LB server. OrphanablePtr> eds_calld_; @@ -862,6 +862,39 @@ void XdsLb::FallbackHelper::AddTraceEvent(TraceSeverity severity, parent_->channel_control_helper()->AddTraceEvent(severity, message); } +// +// XdsLb::LbChannelState::StateWatcher +// + +class XdsLb::LbChannelState::StateWatcher + : public AsyncConnectivityStateWatcherInterface { + public: + explicit StateWatcher(RefCountedPtr parent) + : AsyncConnectivityStateWatcherInterface( + grpc_combiner_scheduler(parent->xdslb_policy_->combiner())), + parent_(std::move(parent)) {} + + private: + void OnConnectivityStateChange(grpc_connectivity_state new_state) override { + if (!parent_->shutting_down_ && + parent_->xdslb_policy_->fallback_at_startup_checks_pending_ && + new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { + // In TRANSIENT_FAILURE. Cancel the fallback timer and go into + // fallback mode immediately. + gpr_log(GPR_INFO, + "[xdslb %p] Balancer channel in state TRANSIENT_FAILURE; " + "entering fallback mode", + parent_->xdslb_policy_.get()); + parent_->xdslb_policy_->fallback_at_startup_checks_pending_ = false; + grpc_timer_cancel(&parent_->xdslb_policy_->lb_fallback_timer_); + parent_->xdslb_policy_->UpdateFallbackPolicyLocked(); + parent_->CancelConnectivityWatchLocked(); + } + } + + RefCountedPtr parent_; +}; + // // XdsLb::LbChannelState // @@ -871,8 +904,6 @@ XdsLb::LbChannelState::LbChannelState(RefCountedPtr xdslb_policy, const grpc_channel_args& args) : InternallyRefCounted(&grpc_lb_xds_trace), xdslb_policy_(std::move(xdslb_policy)) { - GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChangedLocked, - this, grpc_combiner_scheduler(xdslb_policy_->combiner())); channel_ = CreateXdsBalancerChannel(balancer_name, args); GPR_ASSERT(channel_ != nullptr); eds_calld_.reset(New>( @@ -900,56 +931,17 @@ void XdsLb::LbChannelState::StartConnectivityWatchLocked() { grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_)); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); - // Ref held by callback. - Ref(DEBUG_LOCATION, "LbChannelState+start_watch").release(); - grpc_client_channel_watch_connectivity_state( - client_channel_elem, - grpc_polling_entity_create_from_pollset_set( - xdslb_policy_->interested_parties()), - &connectivity_, &on_connectivity_changed_, nullptr); + auto watcher = MakeOrphanable(Ref()); + watcher_ = watcher.get(); + grpc_client_channel_start_connectivity_watch( + client_channel_elem, GRPC_CHANNEL_IDLE, std::move(watcher)); } void XdsLb::LbChannelState::CancelConnectivityWatchLocked() { grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_)); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); - grpc_client_channel_watch_connectivity_state( - client_channel_elem, - grpc_polling_entity_create_from_pollset_set( - xdslb_policy_->interested_parties()), - nullptr, &on_connectivity_changed_, nullptr); -} - -void XdsLb::LbChannelState::OnConnectivityChangedLocked(void* arg, - grpc_error* error) { - LbChannelState* self = static_cast(arg); - if (!self->shutting_down_ && - self->xdslb_policy_->fallback_at_startup_checks_pending_) { - if (self->connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) { - // Not in TRANSIENT_FAILURE. Renew connectivity watch. - grpc_channel_element* client_channel_elem = - grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(self->channel_)); - GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); - grpc_client_channel_watch_connectivity_state( - client_channel_elem, - grpc_polling_entity_create_from_pollset_set( - self->xdslb_policy_->interested_parties()), - &self->connectivity_, &self->on_connectivity_changed_, nullptr); - return; // Early out so we don't drop the ref below. - } - // In TRANSIENT_FAILURE. Cancel the fallback timer and go into - // fallback mode immediately. - gpr_log(GPR_INFO, - "[xdslb %p] Balancer channel in state TRANSIENT_FAILURE; " - "entering fallback mode", - self); - self->xdslb_policy_->fallback_at_startup_checks_pending_ = false; - grpc_timer_cancel(&self->xdslb_policy_->lb_fallback_timer_); - self->xdslb_policy_->UpdateFallbackPolicyLocked(); - } - // Done watching connectivity state, so drop ref. - self->Unref(DEBUG_LOCATION, "LbChannelState+watch_done"); + grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_); } // @@ -1843,9 +1835,7 @@ void XdsLb::ShutdownLocked() { gpr_log(GPR_INFO, "[xdslb %p] shutting down", this); } shutting_down_ = true; - if (fallback_at_startup_checks_pending_) { - grpc_timer_cancel(&lb_fallback_timer_); - } + MaybeCancelFallbackAtStartupChecks(); priority_list_.ShutdownLocked(); if (fallback_policy_ != nullptr) { grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(), diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc index 32aa99306be..ec60a1125f8 100644 --- a/src/core/lib/transport/connectivity_state.cc +++ b/src/core/lib/transport/connectivity_state.cc @@ -57,10 +57,9 @@ const char* ConnectivityStateName(grpc_connectivity_state state) { class AsyncConnectivityStateWatcherInterface::Notifier { public: Notifier(RefCountedPtr watcher, - grpc_connectivity_state state) + grpc_connectivity_state state, grpc_closure_scheduler* scheduler) : watcher_(std::move(watcher)), state_(state) { - GRPC_CLOSURE_INIT(&closure_, SendNotification, this, - grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&closure_, SendNotification, this, scheduler); GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); } @@ -82,7 +81,7 @@ class AsyncConnectivityStateWatcherInterface::Notifier { void AsyncConnectivityStateWatcherInterface::Notify( grpc_connectivity_state state) { - New(Ref(), state); // Deletes itself when done. + New(Ref(), state, scheduler_); // Deletes itself when done. } // diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index cefd1b88a02..2a4cf63de29 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -28,6 +28,7 @@ #include "src/core/lib/gprpp/map.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/exec_ctx.h" namespace grpc_core { @@ -67,8 +68,15 @@ class AsyncConnectivityStateWatcherInterface protected: class Notifier; + explicit AsyncConnectivityStateWatcherInterface( + grpc_closure_scheduler* scheduler = grpc_schedule_on_exec_ctx) + : scheduler_(scheduler) {} + // Invoked asynchronously when Notify() is called. virtual void OnConnectivityStateChange(grpc_connectivity_state new_state) = 0; + + private: + grpc_closure_scheduler* scheduler_; }; // Tracks connectivity state. Maintains a list of watchers that are