From c6811f0fb758cc38c86b654b27d91665c6402e18 Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Tue, 7 Jun 2022 16:01:38 -0700 Subject: [PATCH] Revert "Revert "EventEngine::RunAt - Subchannel connection retries"" (#29905) * Revert "Revert "EventEngine::RunAt - Subchannel connection retries (#29744)" (#29899)" This reverts commit d89d42d6a0fe3a06fa89c3fc8ef9ec685a865a5d. * fix bugs * add missing include * fix formatting * fix unused headers * adding grpc_init/grpc_shutdown to subchannel ctor/dtor --- BUILD | 2 + include/grpc/event_engine/event_engine.h | 5 ++ .../ext/filters/client_channel/subchannel.cc | 55 +++++++++++++++---- .../ext/filters/client_channel/subchannel.h | 9 ++- 4 files changed, 54 insertions(+), 17 deletions(-) diff --git a/BUILD b/BUILD index e074582d38d..5d7e5bee491 100644 --- a/BUILD +++ b/BUILD @@ -3067,6 +3067,7 @@ grpc_cc_library( "absl/strings", "absl/strings:cord", "absl/strings:str_format", + "absl/time", "absl/types:optional", "absl/types:variant", "absl/status", @@ -3085,6 +3086,7 @@ grpc_cc_library( "config", "construct_destruct", "debug_location", + "default_event_engine_factory_hdrs", "dual_ref_counted", "error", "gpr_base", diff --git a/include/grpc/event_engine/event_engine.h b/include/grpc/event_engine/event_engine.h index 0454ce921c1..dfd6e7f5c20 100644 --- a/include/grpc/event_engine/event_engine.h +++ b/include/grpc/event_engine/event_engine.h @@ -410,6 +410,11 @@ class EventEngine { /// If the associated callback has not been scheduled to run, it will be /// cancelled, and the associated std::function or \a Closure* will not be /// executed. In this case, Cancel will return true. + /// + /// Implementation note: closures should be destroyed in a timely manner after + /// execution or cancelliation (milliseconds), since any state bound to the + /// closure may need to be destroyed for things to progress (e.g., if a + /// closure holds a ref to some ref-counted object). virtual bool Cancel(TaskHandle handle) = 0; }; diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 0bb87d784cb..64c76d8ae06 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -28,7 +28,10 @@ #include #include "absl/status/statusor.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include #include #include #include @@ -49,6 +52,7 @@ #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/event_engine/event_engine_factory.h" #include "src/core/lib/gpr/alloc.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -81,6 +85,7 @@ GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall))) namespace grpc_core { +using ::grpc_event_engine::experimental::GetDefaultEventEngine; TraceFlag grpc_trace_subchannel(false, "subchannel"); DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount"); @@ -644,10 +649,18 @@ Subchannel::Subchannel(SubchannelKey key, pollset_set_(grpc_pollset_set_create()), connector_(std::move(connector)), backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_)) { + // A grpc_init is added here to ensure that grpc_shutdown does not happen + // until the subchannel is destroyed. Subchannels can persist longer than + // channels because they maybe reused/shared among multiple channels. As a + // result the subchannel destruction happens asynchronously to channel + // destruction. If the last channel destruction triggers a grpc_shutdown + // before the last subchannel destruction, then there maybe race conditions + // triggering segmentation faults. To prevent this issue, we call a grpc_init + // here and a grpc_shutdown in the subchannel destructor. + grpc_init(); GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(); GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this, nullptr); // Check proxy mapper to determine address to connect to and channel // args to use. address_for_connect_ = key_.address(); @@ -693,6 +706,8 @@ Subchannel::~Subchannel() { grpc_channel_args_destroy(args_); connector_.reset(); grpc_pollset_set_destroy(pollset_set_); + // grpc_shutdown is called here because grpc_init is called in the ctor. + grpc_shutdown(); } RefCountedPtr Subchannel::Create( @@ -784,10 +799,15 @@ void Subchannel::RequestConnection() { } void Subchannel::ResetBackoff() { + // Hold a ref to ensure cancellation and subsequent deletion of the closure + // does not eliminate the last ref and destroy the Subchannel before the + // method returns. + auto self = WeakRef(DEBUG_LOCATION, "ResetBackoff"); MutexLock lock(&mu_); backoff_.Reset(); - if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) { - grpc_timer_cancel(&retry_timer_); + if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && + GetDefaultEventEngine()->Cancel(retry_timer_handle_)) { + OnRetryTimerLocked(); } else if (state_ == GRPC_CHANNEL_CONNECTING) { next_attempt_time_ = ExecCtx::Get()->Now(); } @@ -872,13 +892,9 @@ void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state, health_watcher_map_.NotifyLocked(state, status); } -void Subchannel::OnRetryTimer(void* arg, grpc_error_handle /*error*/) { - WeakRefCountedPtr c(static_cast(arg)); - { - MutexLock lock(&c->mu_); - c->OnRetryTimerLocked(); - } - c.reset(DEBUG_LOCATION, "RetryTimer"); +void Subchannel::OnRetryTimer() { + MutexLock lock(&mu_); + OnRetryTimerLocked(); } void Subchannel::OnRetryTimerLocked() { @@ -938,6 +954,8 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { if (connecting_result_.transport == nullptr || !PublishTransportLocked()) { const Duration time_until_next_attempt = next_attempt_time_ - ExecCtx::Get()->Now(); + auto ee_deadline = + absl::Now() + absl::Milliseconds(time_until_next_attempt.millis()); gpr_log(GPR_INFO, "subchannel %p %s: connect failed (%s), backing off for %" PRId64 " ms", @@ -945,8 +963,21 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) { time_until_next_attempt.millis()); SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error)); - WeakRef(DEBUG_LOCATION, "RetryTimer").release(); // Ref held by callback. - grpc_timer_init(&retry_timer_, next_attempt_time_, &on_retry_timer_); + retry_timer_handle_ = GetDefaultEventEngine()->RunAt( + ee_deadline, [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable { + { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + self->OnRetryTimer(); + // Subchannel deletion might require an active ExecCtx. So if + // self.reset() is not called here, the WeakRefCountedPtr destructor + // may run after the ExecCtx declared in the callback is destroyed. + // Since subchannel may get destroyed when the WeakRefCountedPtr + // destructor runs, it may not have an active ExecCtx - thus leading + // to crashes. + self.reset(); + } + }); } (void)GRPC_ERROR_UNREF(error); } diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 24ac11a08a3..9a9ff817c3d 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -29,6 +29,7 @@ #include "absl/status/status.h" #include "absl/types/optional.h" +#include #include #include @@ -53,7 +54,6 @@ #include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/resolved_address.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/transport/connectivity_state.h" @@ -355,8 +355,7 @@ class Subchannel : public DualRefCounted { ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); // Methods for connection. - static void OnRetryTimer(void* arg, grpc_error_handle error) - ABSL_LOCKS_EXCLUDED(mu_); + void OnRetryTimer() ABSL_LOCKS_EXCLUDED(mu_); void OnRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); void StartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); static void OnConnectingFinished(void* arg, grpc_error_handle error) @@ -414,8 +413,8 @@ class Subchannel : public DualRefCounted { // Backoff state. BackOff backoff_ ABSL_GUARDED_BY(mu_); Timestamp next_attempt_time_ ABSL_GUARDED_BY(mu_); - grpc_timer retry_timer_ ABSL_GUARDED_BY(mu_); - grpc_closure on_retry_timer_ ABSL_GUARDED_BY(mu_); + grpc_event_engine::experimental::EventEngine::TaskHandle retry_timer_handle_ + ABSL_GUARDED_BY(mu_); // Keepalive time period (-1 for unset) int keepalive_time_ ABSL_GUARDED_BY(mu_) = -1;