diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 3cdacd79121..6d7465195d7 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -298,12 +298,12 @@ class XdsClient::ChannelState::StateWatcher : public AsyncConnectivityStateWatcherInterface { public: explicit StateWatcher(RefCountedPtr parent) - : AsyncConnectivityStateWatcherInterface( - grpc_combiner_scheduler(parent->xds_client()->combiner_)), + : AsyncConnectivityStateWatcherInterface(parent->xds_client()->combiner_), parent_(std::move(parent)) {} private: - void OnConnectivityStateChange(grpc_connectivity_state new_state) override { + void OnConnectivityStateChange( + grpc_connectivity_state new_state) override { if (!parent_->shutting_down_ && new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { // In TRANSIENT_FAILURE. Notify all watchers of error. diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc index 6c1b51b3e2d..84e33c79be9 100644 --- a/src/core/lib/transport/connectivity_state.cc +++ b/src/core/lib/transport/connectivity_state.cc @@ -57,10 +57,17 @@ const char* ConnectivityStateName(grpc_connectivity_state state) { class AsyncConnectivityStateWatcherInterface::Notifier { public: Notifier(RefCountedPtr watcher, - grpc_connectivity_state state, grpc_closure_scheduler* scheduler) + grpc_connectivity_state state, grpc_core::Combiner* combiner) : watcher_(std::move(watcher)), state_(state) { - GRPC_CLOSURE_INIT(&closure_, SendNotification, this, scheduler); - GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); + if (combiner != nullptr) { + combiner->Run( + GRPC_CLOSURE_INIT(&closure_, SendNotification, this, nullptr), + GRPC_ERROR_NONE); + } else { + GRPC_CLOSURE_INIT(&closure_, SendNotification, this, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); + } } private: @@ -81,7 +88,7 @@ class AsyncConnectivityStateWatcherInterface::Notifier { void AsyncConnectivityStateWatcherInterface::Notify( grpc_connectivity_state state) { - New(Ref(), state, scheduler_); // Deletes itself when done. + New(Ref(), state, combiner_); // Deletes itself when done. } // diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index 3895cc5c836..7aa8ef412d4 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -69,8 +69,8 @@ class AsyncConnectivityStateWatcherInterface class Notifier; explicit AsyncConnectivityStateWatcherInterface( - grpc_closure_scheduler* scheduler = grpc_schedule_on_exec_ctx) - : scheduler_(scheduler) {} + grpc_core::Combiner* combiner = nullptr) + : combiner_(combiner) {} // Invoked asynchronously when Notify() is called. virtual void OnConnectivityStateChange(grpc_connectivity_state new_state) = 0;