EventEngine::RunAfter migration SubchannelStreamClient (#31838)

Note that the `StartRetryTimerLocked` code path is not covered by test.
And I'm having trouble adding a test case to cover it. Seems like we
need the health service (default_health_check_service) to shutdown and
end the server-side streaming RPC (and send the trailing metadata) but
keep the subchannel (and subchannel_stream_client) alive so that it
could retry a Health.Watch RPC.


<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/32527/head
Yijie Ma 2 years ago committed by GitHub
parent 2abf140f98
commit c4a0b2736a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 51
      src/core/ext/filters/client_channel/subchannel_stream_client.cc
  2. 13
      src/core/ext/filters/client_channel/subchannel_stream_client.h

@ -33,6 +33,7 @@
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/transport/error_utils.h"
@ -43,6 +44,8 @@
namespace grpc_core {
using ::grpc_event_engine::experimental::EventEngine;
//
// SubchannelStreamClient
//
@ -69,12 +72,11 @@ SubchannelStreamClient::SubchannelStreamClient(
.set_multiplier(SUBCHANNEL_STREAM_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(SUBCHANNEL_STREAM_RECONNECT_JITTER)
.set_max_backoff(Duration::Seconds(
SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS))) {
SUBCHANNEL_STREAM_RECONNECT_MAX_BACKOFF_SECONDS))),
event_engine_(connected_subchannel_->args().GetObject<EventEngine>()) {
if (GPR_UNLIKELY(tracer_ != nullptr)) {
gpr_log(GPR_INFO, "%s %p: created SubchannelStreamClient", tracer_, this);
}
GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this,
grpc_schedule_on_exec_ctx);
StartCall();
}
@ -94,8 +96,9 @@ void SubchannelStreamClient::Orphan() {
MutexLock lock(&mu_);
event_handler_.reset();
call_state_.reset();
if (retry_timer_callback_pending_) {
grpc_timer_cancel(&retry_timer_);
if (retry_timer_handle_.has_value()) {
event_engine_->Cancel(*retry_timer_handle_);
retry_timer_handle_.reset();
}
}
Unref(DEBUG_LOCATION, "orphan");
@ -124,11 +127,10 @@ void SubchannelStreamClient::StartRetryTimerLocked() {
if (event_handler_ != nullptr) {
event_handler_->OnRetryTimerStartLocked(this);
}
Timestamp next_try = retry_backoff_.NextAttemptTime();
const Duration timeout = retry_backoff_.NextAttemptTime() - Timestamp::Now();
if (GPR_UNLIKELY(tracer_ != nullptr)) {
gpr_log(GPR_INFO, "%s %p: SubchannelStreamClient health check call lost...",
tracer_, this);
Duration timeout = next_try - Timestamp::Now();
if (timeout > Duration::Zero()) {
gpr_log(GPR_INFO, "%s %p: ... will retry in %" PRId64 "ms.", tracer_,
this, timeout.millis());
@ -136,28 +138,27 @@ void SubchannelStreamClient::StartRetryTimerLocked() {
gpr_log(GPR_INFO, "%s %p: ... retrying immediately.", tracer_, this);
}
}
// Ref for callback, tracked manually.
Ref(DEBUG_LOCATION, "health_retry_timer").release();
retry_timer_callback_pending_ = true;
grpc_timer_init(&retry_timer_, next_try, &retry_timer_callback_);
retry_timer_handle_ = event_engine_->RunAfter(
timeout, [self = Ref(DEBUG_LOCATION, "health_retry_timer")]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnRetryTimer();
self.reset(DEBUG_LOCATION, "health_retry_timer");
});
}
void SubchannelStreamClient::OnRetryTimer(void* arg, grpc_error_handle error) {
auto* self = static_cast<SubchannelStreamClient*>(arg);
{
MutexLock lock(&self->mu_);
self->retry_timer_callback_pending_ = false;
if (self->event_handler_ != nullptr && error.ok() &&
self->call_state_ == nullptr) {
if (GPR_UNLIKELY(self->tracer_ != nullptr)) {
gpr_log(GPR_INFO,
"%s %p: SubchannelStreamClient restarting health check call",
self->tracer_, self);
}
self->StartCallLocked();
void SubchannelStreamClient::OnRetryTimer() {
MutexLock lock(&mu_);
if (event_handler_ != nullptr && retry_timer_handle_.has_value() &&
call_state_ == nullptr) {
if (GPR_UNLIKELY(tracer_ != nullptr)) {
gpr_log(GPR_INFO,
"%s %p: SubchannelStreamClient restarting health check call",
tracer_, this);
}
StartCallLocked();
}
self->Unref(DEBUG_LOCATION, "health_retry_timer");
retry_timer_handle_.reset();
}
//

@ -27,6 +27,7 @@
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/slice.h>
#include <grpc/status.h>
@ -42,7 +43,6 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/slice/slice.h"
@ -196,7 +196,7 @@ class SubchannelStreamClient
void StartCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
static void OnRetryTimer(void* arg, grpc_error_handle error);
void OnRetryTimer() ABSL_LOCKS_EXCLUDED(mu_);
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
grpc_pollset_set* interested_parties_; // Do not own.
@ -212,9 +212,12 @@ class SubchannelStreamClient
// Call retry state.
BackOff retry_backoff_ ABSL_GUARDED_BY(mu_);
grpc_timer retry_timer_ ABSL_GUARDED_BY(mu_);
grpc_closure retry_timer_callback_ ABSL_GUARDED_BY(mu_);
bool retry_timer_callback_pending_ ABSL_GUARDED_BY(mu_) = false;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
retry_timer_handle_ ABSL_GUARDED_BY(mu_);
// A raw pointer will suffice since connected_subchannel_ holds a copy of the
// ChannelArgs which holds an std::shared_ptr of the EventEngine.
grpc_event_engine::experimental::EventEngine* event_engine_
ABSL_GUARDED_BY(mu_);
};
} // namespace grpc_core

Loading…
Cancel
Save