EventEngine::RunAfter migration StateWatcher (#31790)

* EventEngine::RunAfter migration StateWatcher

initial commit

* comment

* ref-count fun

* ExecCtx fun

* more ExecCtx fun

* fix

* more ref-counting fun

* remove debug log

* add mutex
pull/31805/head
Yijie Ma 2 years ago committed by GitHub
parent 1e13612d4a
commit 044e0557a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 56
      src/core/ext/filters/client_channel/channel_connectivity.cc

@ -2665,6 +2665,7 @@ grpc_cc_library(
"absl/status:statusor",
"absl/strings",
"absl/strings:cord",
"absl/synchronization",
"absl/types:optional",
"absl/types:variant",
"upb_lib",

@ -18,8 +18,12 @@
#include <inttypes.h>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/synchronization/mutex.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/gpr_types.h>
@ -37,7 +41,6 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/completion_queue.h"
@ -112,7 +115,6 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
state_(last_observed_state) {
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
GRPC_CLOSURE_INIT(&on_complete_, WatchComplete, this, nullptr);
GRPC_CLOSURE_INIT(&on_timeout_, TimeoutComplete, this, nullptr);
ClientChannel* client_channel =
ClientChannel::GetFromChannel(channel_.get());
if (client_channel == nullptr) {
@ -122,8 +124,11 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
// will always be TRANSIENT_FAILURE), so we don't actually start a
// watch, but we are hiding that fact from the application.
if (IsLameChannel(channel_.get())) {
// Ref from object creation is held by timer callback.
// A ref is held by the timer callback.
StartTimer(Timestamp::FromTimespecRoundUp(deadline));
// Ref from object creation needs to be freed here since lame channel
// does not have a watcher.
Unref();
return;
}
gpr_log(GPR_ERROR,
@ -131,10 +136,7 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
"something that is not a client channel");
GPR_ASSERT(false);
}
// Take an addition ref, so we have two (the first one is from the
// creation of this object). One will be held by the timer callback,
// the other by the watcher callback.
Ref().release();
// Ref from object creation is held by the watcher callback.
auto* watcher_timer_init_state = new WatcherTimerInitState(
this, Timestamp::FromTimespecRoundUp(deadline));
client_channel->AddExternalConnectivityWatcher(
@ -167,7 +169,16 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
};
void StartTimer(Timestamp deadline) {
grpc_timer_init(&timer_, deadline, &on_timeout_);
const Duration timeout = deadline - Timestamp::Now();
absl::MutexLock lock(&mu_);
timer_handle_ = channel_->channel_stack()->EventEngine()->RunAfter(
timeout, [self = Ref()]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->TimeoutComplete();
// StateWatcher deletion might require an active ExecCtx.
self.reset();
});
}
static void WatchComplete(void* arg, grpc_error_handle error) {
@ -175,20 +186,27 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
GRPC_LOG_IF_ERROR("watch_completion_error", error);
}
grpc_timer_cancel(&self->timer_);
{
absl::MutexLock lock(&self->mu_);
if (self->timer_handle_.has_value()) {
self->channel_->channel_stack()->EventEngine()->Cancel(
*self->timer_handle_);
}
}
// Watcher fired when either notified or cancelled, either way the state of
// this watcher has been cleared from the client channel. Thus there is no
// need to cancel the watch again.
self->Unref();
}
static void TimeoutComplete(void* arg, grpc_error_handle error) {
auto* self = static_cast<StateWatcher*>(arg);
self->timer_fired_ = error.ok();
void TimeoutComplete() {
timer_fired_ = true;
// If this is a client channel (not a lame channel), cancel the watch.
ClientChannel* client_channel =
ClientChannel::GetFromChannel(self->channel_.get());
ClientChannel::GetFromChannel(channel_.get());
if (client_channel != nullptr) {
client_channel->CancelExternalConnectivityWatcher(&self->on_complete_);
client_channel->CancelExternalConnectivityWatcher(&on_complete_);
}
self->Unref();
}
// Invoked when both strong refs are released.
@ -217,9 +235,13 @@ class StateWatcher : public DualRefCounted<StateWatcher> {
grpc_cq_completion completion_storage_;
grpc_closure on_complete_;
grpc_timer timer_;
grpc_closure on_timeout_;
// timer_handle_ might be accessed in parallel from multiple threads, e.g.
// timer callback fired immediately on an event engine thread before
// RunAfter() returns.
absl::Mutex mu_;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
timer_handle_ ABSL_GUARDED_BY(mu_);
bool timer_fired_ = false;
};

Loading…
Cancel
Save