Revert "Revert "EventEngine::RunAt - Subchannel connection retries"" (#29905)

* Revert "Revert "EventEngine::RunAt - Subchannel connection retries (#29744)" (#29899)"

This reverts commit d89d42d6a0.

* fix bugs

* add missing include

* fix formatting

* fix unused headers

* adding grpc_init/grpc_shutdown to subchannel ctor/dtor
pull/29945/head
Vignesh Babu 3 years ago committed by GitHub
parent 72e65b03cf
commit c6811f0fb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 5
      include/grpc/event_engine/event_engine.h
  3. 55
      src/core/ext/filters/client_channel/subchannel.cc
  4. 9
      src/core/ext/filters/client_channel/subchannel.h

@ -3067,6 +3067,7 @@ grpc_cc_library(
"absl/strings",
"absl/strings:cord",
"absl/strings:str_format",
"absl/time",
"absl/types:optional",
"absl/types:variant",
"absl/status",
@ -3085,6 +3086,7 @@ grpc_cc_library(
"config",
"construct_destruct",
"debug_location",
"default_event_engine_factory_hdrs",
"dual_ref_counted",
"error",
"gpr_base",

@ -410,6 +410,11 @@ class EventEngine {
/// If the associated callback has not been scheduled to run, it will be
/// cancelled, and the associated std::function or \a Closure* will not be
/// executed. In this case, Cancel will return true.
///
/// Implementation note: closures should be destroyed in a timely manner after
/// execution or cancelliation (milliseconds), since any state bound to the
/// closure may need to be destroyed for things to progress (e.g., if a
/// closure holds a ref to some ref-counted object).
virtual bool Cancel(TaskHandle handle) = 0;
};

@ -28,7 +28,10 @@
#include <utility>
#include "absl/status/statusor.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/alloc.h>
@ -49,6 +52,7 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -81,6 +85,7 @@
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
namespace grpc_core {
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
TraceFlag grpc_trace_subchannel(false, "subchannel");
DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
@ -644,10 +649,18 @@ Subchannel::Subchannel(SubchannelKey key,
pollset_set_(grpc_pollset_set_create()),
connector_(std::move(connector)),
backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_)) {
// A grpc_init is added here to ensure that grpc_shutdown does not happen
// until the subchannel is destroyed. Subchannels can persist longer than
// channels because they maybe reused/shared among multiple channels. As a
// result the subchannel destruction happens asynchronously to channel
// destruction. If the last channel destruction triggers a grpc_shutdown
// before the last subchannel destruction, then there maybe race conditions
// triggering segmentation faults. To prevent this issue, we call a grpc_init
// here and a grpc_shutdown in the subchannel destructor.
grpc_init();
GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED();
GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this, nullptr);
// Check proxy mapper to determine address to connect to and channel
// args to use.
address_for_connect_ = key_.address();
@ -693,6 +706,8 @@ Subchannel::~Subchannel() {
grpc_channel_args_destroy(args_);
connector_.reset();
grpc_pollset_set_destroy(pollset_set_);
// grpc_shutdown is called here because grpc_init is called in the ctor.
grpc_shutdown();
}
RefCountedPtr<Subchannel> Subchannel::Create(
@ -784,10 +799,15 @@ void Subchannel::RequestConnection() {
}
void Subchannel::ResetBackoff() {
// Hold a ref to ensure cancellation and subsequent deletion of the closure
// does not eliminate the last ref and destroy the Subchannel before the
// method returns.
auto self = WeakRef(DEBUG_LOCATION, "ResetBackoff");
MutexLock lock(&mu_);
backoff_.Reset();
if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_timer_cancel(&retry_timer_);
if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
GetDefaultEventEngine()->Cancel(retry_timer_handle_)) {
OnRetryTimerLocked();
} else if (state_ == GRPC_CHANNEL_CONNECTING) {
next_attempt_time_ = ExecCtx::Get()->Now();
}
@ -872,13 +892,9 @@ void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
health_watcher_map_.NotifyLocked(state, status);
}
void Subchannel::OnRetryTimer(void* arg, grpc_error_handle /*error*/) {
WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg));
{
MutexLock lock(&c->mu_);
c->OnRetryTimerLocked();
}
c.reset(DEBUG_LOCATION, "RetryTimer");
void Subchannel::OnRetryTimer() {
MutexLock lock(&mu_);
OnRetryTimerLocked();
}
void Subchannel::OnRetryTimerLocked() {
@ -938,6 +954,8 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
if (connecting_result_.transport == nullptr || !PublishTransportLocked()) {
const Duration time_until_next_attempt =
next_attempt_time_ - ExecCtx::Get()->Now();
auto ee_deadline =
absl::Now() + absl::Milliseconds(time_until_next_attempt.millis());
gpr_log(GPR_INFO,
"subchannel %p %s: connect failed (%s), backing off for %" PRId64
" ms",
@ -945,8 +963,21 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
time_until_next_attempt.millis());
SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_to_absl_status(error));
WeakRef(DEBUG_LOCATION, "RetryTimer").release(); // Ref held by callback.
grpc_timer_init(&retry_timer_, next_attempt_time_, &on_retry_timer_);
retry_timer_handle_ = GetDefaultEventEngine()->RunAt(
ee_deadline, [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable {
{
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnRetryTimer();
// Subchannel deletion might require an active ExecCtx. So if
// self.reset() is not called here, the WeakRefCountedPtr destructor
// may run after the ExecCtx declared in the callback is destroyed.
// Since subchannel may get destroyed when the WeakRefCountedPtr
// destructor runs, it may not have an active ExecCtx - thus leading
// to crashes.
self.reset();
}
});
}
(void)GRPC_ERROR_UNREF(error);
}

@ -29,6 +29,7 @@
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>
@ -53,7 +54,6 @@
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/connectivity_state.h"
@ -355,8 +355,7 @@ class Subchannel : public DualRefCounted<Subchannel> {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Methods for connection.
static void OnRetryTimer(void* arg, grpc_error_handle error)
ABSL_LOCKS_EXCLUDED(mu_);
void OnRetryTimer() ABSL_LOCKS_EXCLUDED(mu_);
void OnRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void StartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
static void OnConnectingFinished(void* arg, grpc_error_handle error)
@ -414,8 +413,8 @@ class Subchannel : public DualRefCounted<Subchannel> {
// Backoff state.
BackOff backoff_ ABSL_GUARDED_BY(mu_);
Timestamp next_attempt_time_ ABSL_GUARDED_BY(mu_);
grpc_timer retry_timer_ ABSL_GUARDED_BY(mu_);
grpc_closure on_retry_timer_ ABSL_GUARDED_BY(mu_);
grpc_event_engine::experimental::EventEngine::TaskHandle retry_timer_handle_
ABSL_GUARDED_BY(mu_);
// Keepalive time period (-1 for unset)
int keepalive_time_ ABSL_GUARDED_BY(mu_) = -1;

Loading…
Cancel
Save