Revert "EventEngine::RunAt - Subchannel connection retries (#29744)" (#29899)

This reverts commit f38ef257d2.
pull/29894/head
Vignesh Babu 3 years ago committed by GitHub
parent ad4a387403
commit d89d42d6a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 5
      include/grpc/event_engine/event_engine.h
  3. 34
      src/core/ext/filters/client_channel/subchannel.cc
  4. 9
      src/core/ext/filters/client_channel/subchannel.h

@ -3054,7 +3054,6 @@ grpc_cc_library(
"absl/strings",
"absl/strings:cord",
"absl/strings:str_format",
"absl/time",
"absl/types:optional",
"absl/types:variant",
"absl/status",
@ -3071,7 +3070,6 @@ grpc_cc_library(
"chunked_vector",
"config",
"debug_location",
"default_event_engine_factory_hdrs",
"dual_ref_counted",
"error",
"gpr_base",

@ -410,11 +410,6 @@ class EventEngine {
/// If the associated callback has not been scheduled to run, it will be
/// cancelled, and the associated std::function or \a Closure* will not be
/// executed. In this case, Cancel will return true.
///
/// Implementation note: closures should be destroyed in a timely manner after
/// execution or cancelliation (milliseconds), since any state bound to the
/// closure may need to be destroyed for things to progress (e.g., if a
/// closure holds a ref to some ref-counted object).
virtual bool Cancel(TaskHandle handle) = 0;
};

@ -29,8 +29,6 @@
#include <utility>
#include "absl/status/statusor.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include <grpc/slice.h>
#include <grpc/status.h>
@ -52,7 +50,6 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -85,7 +82,6 @@
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
namespace grpc_core {
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
TraceFlag grpc_trace_subchannel(false, "subchannel");
DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
@ -652,6 +648,7 @@ Subchannel::Subchannel(SubchannelKey key,
GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED();
GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this, nullptr);
// Check proxy mapper to determine address to connect to and channel
// args to use.
address_for_connect_ = key_.address();
@ -788,15 +785,10 @@ void Subchannel::RequestConnection() {
}
void Subchannel::ResetBackoff() {
// Hold a ref to ensure cancellation and subsequent deletion of the closure
// does not eliminate the last ref and destroy the Subchannel before the
// method returns.
auto self = WeakRef(DEBUG_LOCATION, "ResetBackoff");
MutexLock lock(&mu_);
backoff_.Reset();
if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
GetDefaultEventEngine()->Cancel(retry_timer_handle_)) {
OnRetryTimerLocked();
if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_timer_cancel(&retry_timer_);
} else if (state_ == GRPC_CHANNEL_CONNECTING) {
next_attempt_time_ = ExecCtx::Get()->Now();
}
@ -881,9 +873,13 @@ void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
health_watcher_map_.NotifyLocked(state, status);
}
void Subchannel::OnRetryTimer() {
MutexLock lock(&mu_);
OnRetryTimerLocked();
void Subchannel::OnRetryTimer(void* arg, grpc_error_handle /*error*/) {
WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg));
{
MutexLock lock(&c->mu_);
c->OnRetryTimerLocked();
}
c.reset(DEBUG_LOCATION, "RetryTimer");
}
void Subchannel::OnRetryTimerLocked() {
@ -943,8 +939,6 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
if (connecting_result_.transport == nullptr || !PublishTransportLocked()) {
const Duration time_until_next_attempt =
next_attempt_time_ - ExecCtx::Get()->Now();
auto ee_deadline =
absl::Now() + absl::Milliseconds(time_until_next_attempt.millis());
gpr_log(GPR_INFO,
"subchannel %p %s: connect failed (%s), backing off for %" PRId64
" ms",
@ -952,12 +946,8 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
time_until_next_attempt.millis());
SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_to_absl_status(error));
retry_timer_handle_ = GetDefaultEventEngine()->RunAt(
ee_deadline, [self = WeakRef(DEBUG_LOCATION, "RetryTimer")] {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnRetryTimer();
});
WeakRef(DEBUG_LOCATION, "RetryTimer").release(); // Ref held by callback.
grpc_timer_init(&retry_timer_, next_attempt_time_, &on_retry_timer_);
}
(void)GRPC_ERROR_UNREF(error);
}

@ -29,7 +29,6 @@
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>
@ -54,6 +53,7 @@
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/connectivity_state.h"
@ -355,7 +355,8 @@ class Subchannel : public DualRefCounted<Subchannel> {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Methods for connection.
void OnRetryTimer() ABSL_LOCKS_EXCLUDED(mu_);
static void OnRetryTimer(void* arg, grpc_error_handle error)
ABSL_LOCKS_EXCLUDED(mu_);
void OnRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void StartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
static void OnConnectingFinished(void* arg, grpc_error_handle error)
@ -413,8 +414,8 @@ class Subchannel : public DualRefCounted<Subchannel> {
// Backoff state.
BackOff backoff_ ABSL_GUARDED_BY(mu_);
Timestamp next_attempt_time_ ABSL_GUARDED_BY(mu_);
grpc_event_engine::experimental::EventEngine::TaskHandle retry_timer_handle_
ABSL_GUARDED_BY(mu_);
grpc_timer retry_timer_ ABSL_GUARDED_BY(mu_);
grpc_closure on_retry_timer_ ABSL_GUARDED_BY(mu_);
// Keepalive time period (-1 for unset)
int keepalive_time_ ABSL_GUARDED_BY(mu_) = -1;

Loading…
Cancel
Save