diff --git a/src/core/client_channel/client_channel.cc b/src/core/client_channel/client_channel.cc index 4bc8e44b333..29ea04b3f19 100644 --- a/src/core/client_channel/client_channel.cc +++ b/src/core/client_channel/client_channel.cc @@ -688,10 +688,93 @@ grpc_connectivity_state ClientChannel::CheckConnectivityState( return state; } -void ClientChannel::WatchConnectivityState(grpc_connectivity_state, Timestamp, - grpc_completion_queue*, void*) { - // TODO(ctiller): implement - Crash("not implemented"); +namespace { + +// A fire-and-forget object to handle external connectivity state watches. +class ExternalStateWatcher : public RefCounted { + public: + ExternalStateWatcher(WeakRefCountedPtr channel, + grpc_completion_queue* cq, void* tag, + grpc_connectivity_state last_observed_state, + Timestamp deadline) + : channel_(std::move(channel)), cq_(cq), tag_(tag) { + MutexLock lock(&mu_); + // Start watch. This inherits the ref from creation. + auto watcher = + MakeOrphanable(RefCountedPtr(this)); + watcher_ = watcher.get(); + channel_->AddConnectivityWatcher(last_observed_state, std::move(watcher)); + // Start timer. This takes a second ref. + const Duration timeout = deadline - Timestamp::Now(); + timer_handle_ = + channel_->event_engine()->RunAfter(timeout, [self = Ref()]() mutable { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + self->MaybeStartCompletion( + absl::DeadlineExceededError( + "Timed out waiting for connection state change")); + // ExternalStateWatcher deletion might require an active ExecCtx. + self.reset(); + }); + } + + private: + class Watcher : public AsyncConnectivityStateWatcherInterface { + public: + explicit Watcher(RefCountedPtr external_state_watcher) + : external_state_watcher_(std::move(external_state_watcher)) {} + + void OnConnectivityStateChange(grpc_connectivity_state /*new_state*/, + const absl::Status& /*status*/) override { + external_state_watcher_->MaybeStartCompletion(absl::OkStatus()); + } + + private: + RefCountedPtr external_state_watcher_; + }; + + // This is called both when the watch reports a new connectivity state + // and when the timer fires. It will trigger a CQ notification only + // on the first call. Subsequent calls will be ignored, because + // events can come in asynchronously. + void MaybeStartCompletion(absl::Status status) { + MutexLock lock(&mu_); + if (watcher_ == nullptr) return; // Ignore subsequent notifications. + // Cancel watch. + channel_->RemoveConnectivityWatcher(watcher_); + watcher_ = nullptr; + // Cancel timer. + channel_->event_engine()->Cancel(timer_handle_); + // Send CQ completion. + Ref().release(); // Released in FinishedCompletion(). + grpc_cq_end_op(cq_, tag_, status, FinishedCompletion, this, + &completion_storage_); + } + + // Called when the completion is returned to the CQ. + static void FinishedCompletion(void* arg, grpc_cq_completion* /*ignored*/) { + auto* self = static_cast(arg); + self->Unref(); + } + + WeakRefCountedPtr channel_; + + Mutex mu_; + grpc_completion_queue* cq_ ABSL_GUARDED_BY(&mu_); + void* tag_ ABSL_GUARDED_BY(&mu_); + grpc_cq_completion completion_storage_ ABSL_GUARDED_BY(&mu_); + Watcher* watcher_ ABSL_GUARDED_BY(&mu_) = nullptr; + grpc_event_engine::experimental::EventEngine::TaskHandle timer_handle_ + ABSL_GUARDED_BY(&mu_); +}; + +} // namespace + +void ClientChannel::WatchConnectivityState( + grpc_connectivity_state state, Timestamp deadline, + grpc_completion_queue* cq, void* tag) { + new ExternalStateWatcher(WeakRefAsSubclass(), cq, tag, state, + deadline); } void ClientChannel::AddConnectivityWatcher(