EventEngine::RunAt - Subchannel connection retries (#29744)

* EventEngine::RunAt - Subchannel connection retries

* Fix refcounting on retry timer reset

* Automated change: Fix sanity tests

* fix autofixer's goof

* Squashed commit of the following:

commit d4aed9e615
Author: Craig Tiller <ctiller@google.com>
Date:   Fri May 20 12:58:47 2022 -0700

    Revert "[c++14] Remove Capture type (#29327)" (#29748)

    This reverts commit 944c0b2ce9.

commit 965feb5726
Author: apolcyn <apolcyn@google.com>
Date:   Fri May 20 12:45:33 2022 -0700

    xds: Remove aggregate and logical dns clusters env var guard (#29742)

    * Remove aggregate and logical dns clusters env var guard

commit d5c8bbce51
Author: Sergii Tkachenko <sergiitk@google.com>
Date:   Fri May 20 10:44:59 2022 -0700

    xds-k8s: Do not recommend enabling mesh certs by default (#29743)

    This should covered separately per this note:

    > For more details, and for the setup for security tests, see
    ["Setting up Traffic Director service security with proxyless gRPC"](https://cloud.google.com/traffic-director/docs/security-proxyless-setup)
     user guide.

commit 1df32ca680
Author: AJ Heller <hork@google.com>
Date:   Fri May 20 10:18:53 2022 -0700

    Delete the EventEngine-driven iomgr implementation (#29654)

    This code is not compiled by default and has fallen out of sync with the
    rest of the codebase. There's a good chance it won't be used, given our
    current work to use an iomgr-drive EventEngine instead.

    This code will continue to live in git history, should we need to bring
    pieces of it back.

commit 944c0b2ce9
Author: Craig Tiller <ctiller@google.com>
Date:   Fri May 20 09:56:23 2022 -0700

    [c++14] Remove Capture type (#29327)

    * Remove Capture type

    * Automated change: Fix sanity tests

    * update

    Co-authored-by: ctiller <ctiller@users.noreply.github.com>

commit fd744e081d
Author: Esun Kim <veblush@google.com>
Date:   Fri May 20 08:54:33 2022 -0700

    Removed manylinux2010 python artifacts (#29734)

    * Removed manylinux2010 python artifacts

    * Fix

    * Fix2

    * Added cp37 to presubmit test for distribtest relying on cp37 artifacts

commit 5051566b27
Author: Jan Tattermusch <jtattermusch@google.com>
Date:   Fri May 20 11:07:04 2022 +0200

    Enable remote cache for selected ObjC bazel tests. (#29731)

    * enable remote cache for ObjC bazel tests

    * add bazel RBE cache for mac ios bazel builds

* release lock before unreffing

* rm some manual WeakRef-counting

* comments

* verbiage

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
Co-authored-by: Vignesh Babu <vigneshbabu@google.com>
pull/28865/head^2
AJ Heller 3 years ago committed by GitHub
parent aacf0e252b
commit f38ef257d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 5
      include/grpc/event_engine/event_engine.h
  3. 34
      src/core/ext/filters/client_channel/subchannel.cc
  4. 9
      src/core/ext/filters/client_channel/subchannel.h

@ -3067,6 +3067,7 @@ grpc_cc_library(
"absl/strings",
"absl/strings:cord",
"absl/strings:str_format",
"absl/time",
"absl/types:optional",
"absl/types:variant",
"absl/status",
@ -3085,6 +3086,7 @@ grpc_cc_library(
"config",
"construct_destruct",
"debug_location",
"default_event_engine_factory_hdrs",
"dual_ref_counted",
"error",
"gpr_base",

@ -410,6 +410,11 @@ class EventEngine {
/// If the associated callback has not been scheduled to run, it will be
/// cancelled, and the associated std::function or \a Closure* will not be
/// executed. In this case, Cancel will return true.
///
/// Implementation note: closures should be destroyed in a timely manner after
/// execution or cancelliation (milliseconds), since any state bound to the
/// closure may need to be destroyed for things to progress (e.g., if a
/// closure holds a ref to some ref-counted object).
virtual bool Cancel(TaskHandle handle) = 0;
};

@ -28,6 +28,8 @@
#include <utility>
#include "absl/status/statusor.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include <grpc/slice.h>
#include <grpc/status.h>
@ -49,6 +51,7 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/event_engine_factory.h"
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -81,6 +84,7 @@
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
namespace grpc_core {
using ::grpc_event_engine::experimental::GetDefaultEventEngine;
TraceFlag grpc_trace_subchannel(false, "subchannel");
DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
@ -651,7 +655,6 @@ Subchannel::Subchannel(SubchannelKey key,
GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED();
GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this, nullptr);
// Check proxy mapper to determine address to connect to and channel
// args to use.
address_for_connect_ = key_.address();
@ -801,10 +804,15 @@ void Subchannel::RequestConnection() {
}
void Subchannel::ResetBackoff() {
// Hold a ref to ensure cancellation and subsequent deletion of the closure
// does not eliminate the last ref and destroy the Subchannel before the
// method returns.
auto self = WeakRef(DEBUG_LOCATION, "ResetBackoff");
MutexLock lock(&mu_);
backoff_.Reset();
if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_timer_cancel(&retry_timer_);
if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
GetDefaultEventEngine()->Cancel(retry_timer_handle_)) {
OnRetryTimerLocked();
} else if (state_ == GRPC_CHANNEL_CONNECTING) {
next_attempt_time_ = ExecCtx::Get()->Now();
}
@ -889,13 +897,9 @@ void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
health_watcher_map_.NotifyLocked(state, status);
}
void Subchannel::OnRetryTimer(void* arg, grpc_error_handle /*error*/) {
WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg));
{
MutexLock lock(&c->mu_);
c->OnRetryTimerLocked();
}
c.reset(DEBUG_LOCATION, "RetryTimer");
void Subchannel::OnRetryTimer() {
MutexLock lock(&mu_);
OnRetryTimerLocked();
}
void Subchannel::OnRetryTimerLocked() {
@ -955,6 +959,8 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
if (connecting_result_.transport == nullptr || !PublishTransportLocked()) {
const Duration time_until_next_attempt =
next_attempt_time_ - ExecCtx::Get()->Now();
auto ee_deadline =
absl::Now() + absl::Milliseconds(time_until_next_attempt.millis());
gpr_log(GPR_INFO,
"subchannel %p %s: connect failed (%s), backing off for %" PRId64
" ms",
@ -962,8 +968,12 @@ void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
time_until_next_attempt.millis());
SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_to_absl_status(error));
WeakRef(DEBUG_LOCATION, "RetryTimer").release(); // Ref held by callback.
grpc_timer_init(&retry_timer_, next_attempt_time_, &on_retry_timer_);
retry_timer_handle_ = GetDefaultEventEngine()->RunAt(
ee_deadline, [self = WeakRef(DEBUG_LOCATION, "RetryTimer")] {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnRetryTimer();
});
}
(void)GRPC_ERROR_UNREF(error);
}

@ -29,6 +29,7 @@
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>
@ -53,7 +54,6 @@
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/connectivity_state.h"
@ -367,8 +367,7 @@ class Subchannel : public DualRefCounted<Subchannel> {
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
// Methods for connection.
static void OnRetryTimer(void* arg, grpc_error_handle error)
ABSL_LOCKS_EXCLUDED(mu_);
void OnRetryTimer() ABSL_LOCKS_EXCLUDED(mu_);
void OnRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void StartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
static void OnConnectingFinished(void* arg, grpc_error_handle error)
@ -426,8 +425,8 @@ class Subchannel : public DualRefCounted<Subchannel> {
// Backoff state.
BackOff backoff_ ABSL_GUARDED_BY(mu_);
Timestamp next_attempt_time_ ABSL_GUARDED_BY(mu_);
grpc_timer retry_timer_ ABSL_GUARDED_BY(mu_);
grpc_closure on_retry_timer_ ABSL_GUARDED_BY(mu_);
grpc_event_engine::experimental::EventEngine::TaskHandle retry_timer_handle_
ABSL_GUARDED_BY(mu_);
// Keepalive time period (-1 for unset)
int keepalive_time_ ABSL_GUARDED_BY(mu_) = -1;

Loading…
Cancel
Save