From 044e0557a7d9118d56df1cd7be07406fda0dc720 Mon Sep 17 00:00:00 2001 From: Yijie Ma <5663878+yijiem@users.noreply.github.com> Date: Fri, 2 Dec 2022 18:39:42 -0800 Subject: [PATCH] EventEngine::RunAfter migration StateWatcher (#31790) * EventEngine::RunAfter migration StateWatcher initial commit * comment * ref-count fun * ExecCtx fun * more ExecCtx fun * fix * more ref-counting fun * remove debug log * add mutex --- BUILD | 1 + .../client_channel/channel_connectivity.cc | 56 +++++++++++++------ 2 files changed, 40 insertions(+), 17 deletions(-) diff --git a/BUILD b/BUILD index 74bd041822b..8a279fae625 100644 --- a/BUILD +++ b/BUILD @@ -2665,6 +2665,7 @@ grpc_cc_library( "absl/status:statusor", "absl/strings", "absl/strings:cord", + "absl/synchronization", "absl/types:optional", "absl/types:variant", "upb_lib", diff --git a/src/core/ext/filters/client_channel/channel_connectivity.cc b/src/core/ext/filters/client_channel/channel_connectivity.cc index a75f404d5b2..5323805fd3e 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.cc +++ b/src/core/ext/filters/client_channel/channel_connectivity.cc @@ -18,8 +18,12 @@ #include +#include "absl/base/thread_annotations.h" #include "absl/status/status.h" +#include "absl/synchronization/mutex.h" +#include "absl/types/optional.h" +#include #include #include #include @@ -37,7 +41,6 @@ #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/polling_entity.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/completion_queue.h" @@ -112,7 +115,6 @@ class StateWatcher : public DualRefCounted { state_(last_observed_state) { GPR_ASSERT(grpc_cq_begin_op(cq, tag)); GRPC_CLOSURE_INIT(&on_complete_, WatchComplete, this, nullptr); - GRPC_CLOSURE_INIT(&on_timeout_, TimeoutComplete, this, nullptr); ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_.get()); if (client_channel == nullptr) { @@ -122,8 +124,11 @@ class StateWatcher : public DualRefCounted { // will always be TRANSIENT_FAILURE), so we don't actually start a // watch, but we are hiding that fact from the application. if (IsLameChannel(channel_.get())) { - // Ref from object creation is held by timer callback. + // A ref is held by the timer callback. StartTimer(Timestamp::FromTimespecRoundUp(deadline)); + // Ref from object creation needs to be freed here since lame channel + // does not have a watcher. + Unref(); return; } gpr_log(GPR_ERROR, @@ -131,10 +136,7 @@ class StateWatcher : public DualRefCounted { "something that is not a client channel"); GPR_ASSERT(false); } - // Take an addition ref, so we have two (the first one is from the - // creation of this object). One will be held by the timer callback, - // the other by the watcher callback. - Ref().release(); + // Ref from object creation is held by the watcher callback. auto* watcher_timer_init_state = new WatcherTimerInitState( this, Timestamp::FromTimespecRoundUp(deadline)); client_channel->AddExternalConnectivityWatcher( @@ -167,7 +169,16 @@ class StateWatcher : public DualRefCounted { }; void StartTimer(Timestamp deadline) { - grpc_timer_init(&timer_, deadline, &on_timeout_); + const Duration timeout = deadline - Timestamp::Now(); + absl::MutexLock lock(&mu_); + timer_handle_ = channel_->channel_stack()->EventEngine()->RunAfter( + timeout, [self = Ref()]() mutable { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + self->TimeoutComplete(); + // StateWatcher deletion might require an active ExecCtx. + self.reset(); + }); } static void WatchComplete(void* arg, grpc_error_handle error) { @@ -175,20 +186,27 @@ class StateWatcher : public DualRefCounted { if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) { GRPC_LOG_IF_ERROR("watch_completion_error", error); } - grpc_timer_cancel(&self->timer_); + { + absl::MutexLock lock(&self->mu_); + if (self->timer_handle_.has_value()) { + self->channel_->channel_stack()->EventEngine()->Cancel( + *self->timer_handle_); + } + } + // Watcher fired when either notified or cancelled, either way the state of + // this watcher has been cleared from the client channel. Thus there is no + // need to cancel the watch again. self->Unref(); } - static void TimeoutComplete(void* arg, grpc_error_handle error) { - auto* self = static_cast(arg); - self->timer_fired_ = error.ok(); + void TimeoutComplete() { + timer_fired_ = true; // If this is a client channel (not a lame channel), cancel the watch. ClientChannel* client_channel = - ClientChannel::GetFromChannel(self->channel_.get()); + ClientChannel::GetFromChannel(channel_.get()); if (client_channel != nullptr) { - client_channel->CancelExternalConnectivityWatcher(&self->on_complete_); + client_channel->CancelExternalConnectivityWatcher(&on_complete_); } - self->Unref(); } // Invoked when both strong refs are released. @@ -217,9 +235,13 @@ class StateWatcher : public DualRefCounted { grpc_cq_completion completion_storage_; grpc_closure on_complete_; - grpc_timer timer_; - grpc_closure on_timeout_; + // timer_handle_ might be accessed in parallel from multiple threads, e.g. + // timer callback fired immediately on an event engine thread before + // RunAfter() returns. + absl::Mutex mu_; + absl::optional + timer_handle_ ABSL_GUARDED_BY(mu_); bool timer_fired_ = false; };