[EventEngine] RunAfter migration: RetryFilter (#32610)

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/30708/merge
Yijie Ma 2 years ago committed by GitHub
parent 4429066516
commit a51180dcdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 155
      src/core/ext/filters/client_channel/retry_filter.cc

@ -35,6 +35,7 @@
#include "absl/strings/strip.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/status.h>
@ -64,7 +65,6 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/service_config/service_config_call_data.h"
@ -132,6 +132,7 @@ namespace grpc_core {
namespace {
using grpc_event_engine::experimental::EventEngine;
using internal::RetryGlobalConfig;
using internal::RetryMethodConfig;
using internal::RetryServiceConfigParser;
@ -176,6 +177,7 @@ class RetryFilter {
RetryFilter(const ChannelArgs& args, grpc_error_handle* error)
: client_channel_(args.GetObject<ClientChannel>()),
event_engine_(args.GetObject<EventEngine>()),
per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)),
service_config_parser_index_(
internal::RetryServiceConfigParser::ParserIndex()) {
@ -212,6 +214,7 @@ class RetryFilter {
const grpc_call_context_element* context);
ClientChannel* client_channel_;
EventEngine* const event_engine_;
size_t per_rpc_retry_buffer_size_;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
const size_t service_config_parser_index_;
@ -442,7 +445,7 @@ class RetryFilter::CallData {
// Abandons the call attempt. Unrefs any deferred batches.
void Abandon();
static void OnPerAttemptRecvTimer(void* arg, grpc_error_handle error);
void OnPerAttemptRecvTimer();
static void OnPerAttemptRecvTimerLocked(void* arg, grpc_error_handle error);
void MaybeCancelPerAttemptRecvTimer();
@ -451,9 +454,8 @@ class RetryFilter::CallData {
OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall> lb_call_;
bool lb_call_committed_ = false;
grpc_timer per_attempt_recv_timer_;
grpc_closure on_per_attempt_recv_timer_;
bool per_attempt_recv_timer_pending_ = false;
absl::optional<EventEngine::TaskHandle> per_attempt_recv_timer_handle_;
// BatchData.batch.payload points to this.
grpc_transport_stream_op_batch_payload batch_payload_;
@ -548,8 +550,8 @@ class RetryFilter::CallData {
// If server_pushback is nullopt, retry_backoff_ is used.
void StartRetryTimer(absl::optional<Duration> server_pushback);
static void OnRetryTimer(void* arg, grpc_error_handle error);
static void OnRetryTimerLocked(void* arg, grpc_error_handle error);
void OnRetryTimer();
static void OnRetryTimerLocked(void* arg, grpc_error_handle /*error*/);
// Adds a closure to closures to start a transparent retry.
void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures);
@ -602,11 +604,10 @@ class RetryFilter::CallData {
// Retry state.
bool retry_committed_ : 1;
bool retry_timer_pending_ : 1;
bool retry_codepath_started_ : 1;
bool sent_transparent_retry_not_seen_by_server_ : 1;
int num_attempts_completed_ = 0;
grpc_timer retry_timer_;
absl::optional<EventEngine::TaskHandle> retry_timer_handle_;
grpc_closure retry_closure_;
// Cached data for retrying send ops.
@ -715,23 +716,23 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld,
// If per_attempt_recv_timeout is set, start a timer.
if (calld->retry_policy_ != nullptr &&
calld->retry_policy_->per_attempt_recv_timeout().has_value()) {
Timestamp per_attempt_recv_deadline =
Timestamp::Now() + *calld->retry_policy_->per_attempt_recv_timeout();
const Duration per_attempt_recv_timeout =
*calld->retry_policy_->per_attempt_recv_timeout();
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p attempt=%p: per-attempt timeout in %" PRId64
" ms",
calld->chand_, calld, this,
calld->retry_policy_->per_attempt_recv_timeout()->millis());
calld->chand_, calld, this, per_attempt_recv_timeout.millis());
}
// Schedule retry after computed delay.
GRPC_CLOSURE_INIT(&on_per_attempt_recv_timer_, OnPerAttemptRecvTimer, this,
nullptr);
GRPC_CALL_STACK_REF(calld->owning_call_, "OnPerAttemptRecvTimer");
Ref(DEBUG_LOCATION, "OnPerAttemptRecvTimer").release();
per_attempt_recv_timer_pending_ = true;
grpc_timer_init(&per_attempt_recv_timer_, per_attempt_recv_deadline,
&on_per_attempt_recv_timer_);
per_attempt_recv_timer_handle_ = calld_->chand_->event_engine_->RunAfter(
per_attempt_recv_timeout, [this] {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
OnPerAttemptRecvTimer();
});
}
}
@ -795,7 +796,7 @@ void RetryFilter::CallData::CallAttempt::MaybeSwitchToFastPath() {
// If we've already switched to fast path, there's nothing to do here.
if (calld_->committed_call_ != nullptr) return;
// If the perAttemptRecvTimeout timer is pending, we can't switch yet.
if (per_attempt_recv_timer_pending_) return;
if (per_attempt_recv_timer_handle_.has_value()) return;
// If there are still send ops to replay, we can't switch yet.
if (HaveSendOpsToReplay()) return;
// If we started an internal batch for recv_trailing_metadata but have not
@ -1241,14 +1242,11 @@ void RetryFilter::CallData::CallAttempt::Abandon() {
on_complete_deferred_batches_.clear();
}
void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimer(
void* arg, grpc_error_handle error) {
auto* call_attempt = static_cast<CallAttempt*>(arg);
GRPC_CLOSURE_INIT(&call_attempt->on_per_attempt_recv_timer_,
OnPerAttemptRecvTimerLocked, call_attempt, nullptr);
GRPC_CALL_COMBINER_START(call_attempt->calld_->call_combiner_,
&call_attempt->on_per_attempt_recv_timer_, error,
"per-attempt timer fired");
void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimer() {
GRPC_CLOSURE_INIT(&on_per_attempt_recv_timer_, OnPerAttemptRecvTimerLocked,
this, nullptr);
GRPC_CALL_COMBINER_START(calld_->call_combiner_, &on_per_attempt_recv_timer_,
absl::OkStatus(), "per-attempt timer fired");
}
void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked(
@ -1258,35 +1256,33 @@ void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked(
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p attempt=%p: perAttemptRecvTimeout timer fired: "
"error=%s, per_attempt_recv_timer_pending_=%d",
"error=%s, per_attempt_recv_timer_handle_.has_value()=%d",
calld->chand_, calld, call_attempt, StatusToString(error).c_str(),
call_attempt->per_attempt_recv_timer_pending_);
call_attempt->per_attempt_recv_timer_handle_.has_value());
}
CallCombinerClosureList closures;
if (error.ok() && call_attempt->per_attempt_recv_timer_pending_) {
call_attempt->per_attempt_recv_timer_pending_ = false;
// Cancel this attempt.
// TODO(roth): When implementing hedging, we should not cancel the
// current attempt.
call_attempt->MaybeAddBatchForCancelOp(
grpc_error_set_int(
GRPC_ERROR_CREATE("retry perAttemptRecvTimeout exceeded"),
StatusIntProperty::kRpcStatus, GRPC_STATUS_CANCELLED),
&closures);
// Check whether we should retry.
if (call_attempt->ShouldRetry(/*status=*/absl::nullopt,
/*server_pushback_ms=*/absl::nullopt)) {
// Mark current attempt as abandoned.
call_attempt->Abandon();
// We are retrying. Start backoff timer.
calld->StartRetryTimer(/*server_pushback=*/absl::nullopt);
} else {
// Not retrying, so commit the call.
calld->RetryCommit(call_attempt);
// If retry state is no longer needed, switch to fast path for
// subsequent batches.
call_attempt->MaybeSwitchToFastPath();
}
call_attempt->per_attempt_recv_timer_handle_.reset();
// Cancel this attempt.
// TODO(roth): When implementing hedging, we should not cancel the
// current attempt.
call_attempt->MaybeAddBatchForCancelOp(
grpc_error_set_int(
GRPC_ERROR_CREATE("retry perAttemptRecvTimeout exceeded"),
StatusIntProperty::kRpcStatus, GRPC_STATUS_CANCELLED),
&closures);
// Check whether we should retry.
if (call_attempt->ShouldRetry(/*status=*/absl::nullopt,
/*server_pushback_ms=*/absl::nullopt)) {
// Mark current attempt as abandoned.
call_attempt->Abandon();
// We are retrying. Start backoff timer.
calld->StartRetryTimer(/*server_pushback=*/absl::nullopt);
} else {
// Not retrying, so commit the call.
calld->RetryCommit(call_attempt);
// If retry state is no longer needed, switch to fast path for
// subsequent batches.
call_attempt->MaybeSwitchToFastPath();
}
closures.RunClosures(calld->call_combiner_);
call_attempt->Unref(DEBUG_LOCATION, "OnPerAttemptRecvTimer");
@ -1294,15 +1290,19 @@ void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked(
}
void RetryFilter::CallData::CallAttempt::MaybeCancelPerAttemptRecvTimer() {
if (per_attempt_recv_timer_pending_) {
if (per_attempt_recv_timer_handle_.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p attempt=%p: cancelling "
"perAttemptRecvTimeout timer",
calld_->chand_, calld_, this);
}
per_attempt_recv_timer_pending_ = false;
grpc_timer_cancel(&per_attempt_recv_timer_);
if (calld_->chand_->event_engine_->Cancel(
*per_attempt_recv_timer_handle_)) {
Unref(DEBUG_LOCATION, "OnPerAttemptRecvTimer");
GRPC_CALL_STACK_UNREF(calld_->owning_call_, "OnPerAttemptRecvTimer");
}
per_attempt_recv_timer_handle_.reset();
}
}
@ -2154,7 +2154,6 @@ RetryFilter::CallData::CallData(RetryFilter* chand,
pending_send_message_(false),
pending_send_trailing_metadata_(false),
retry_committed_(false),
retry_timer_pending_(false),
retry_codepath_started_(false),
sent_transparent_retry_not_seen_by_server_(false) {}
@ -2215,13 +2214,15 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
return;
}
// Cancel retry timer if needed.
if (retry_timer_pending_) {
if (retry_timer_handle_.has_value()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling retry timer", chand_,
this);
}
retry_timer_pending_ = false; // Lame timer callback.
grpc_timer_cancel(&retry_timer_);
if (chand_->event_engine_->Cancel(*retry_timer_handle_)) {
GRPC_CALL_STACK_UNREF(owning_call_, "OnRetryTimer");
}
retry_timer_handle_.reset();
FreeAllCachedSendOpData();
}
// We have no call attempt, so there's nowhere to send the cancellation
@ -2235,7 +2236,7 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
PendingBatch* pending = PendingBatchesAdd(batch);
// If the timer is pending, yield the call combiner and wait for it to
// run, since we don't want to start another call attempt until it does.
if (retry_timer_pending_) {
if (retry_timer_handle_.has_value()) {
GRPC_CALL_COMBINER_STOP(call_combiner_,
"added pending batch while retry timer pending");
return;
@ -2569,42 +2570,40 @@ void RetryFilter::CallData::StartRetryTimer(
// Reset call attempt.
call_attempt_.reset(DEBUG_LOCATION, "StartRetryTimer");
// Compute backoff delay.
Timestamp next_attempt_time;
Duration next_attempt_timeout;
if (server_pushback.has_value()) {
GPR_ASSERT(*server_pushback >= Duration::Zero());
next_attempt_time = Timestamp::Now() + *server_pushback;
next_attempt_timeout = *server_pushback;
retry_backoff_.Reset();
} else {
next_attempt_time = retry_backoff_.NextAttemptTime();
next_attempt_timeout = retry_backoff_.NextAttemptTime() - Timestamp::Now();
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand_,
this, (next_attempt_time - Timestamp::Now()).millis());
this, next_attempt_timeout.millis());
}
// Schedule retry after computed delay.
GRPC_CLOSURE_INIT(&retry_closure_, OnRetryTimer, this, nullptr);
GRPC_CALL_STACK_REF(owning_call_, "OnRetryTimer");
retry_timer_pending_ = true;
grpc_timer_init(&retry_timer_, next_attempt_time, &retry_closure_);
retry_timer_handle_ =
chand_->event_engine_->RunAfter(next_attempt_timeout, [this] {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
OnRetryTimer();
});
}
void RetryFilter::CallData::OnRetryTimer(void* arg, grpc_error_handle error) {
auto* calld = static_cast<CallData*>(arg);
GRPC_CLOSURE_INIT(&calld->retry_closure_, OnRetryTimerLocked, calld, nullptr);
GRPC_CALL_COMBINER_START(calld->call_combiner_, &calld->retry_closure_, error,
void RetryFilter::CallData::OnRetryTimer() {
GRPC_CLOSURE_INIT(&retry_closure_, OnRetryTimerLocked, this, nullptr);
GRPC_CALL_COMBINER_START(call_combiner_, &retry_closure_, absl::OkStatus(),
"retry timer fired");
}
void RetryFilter::CallData::OnRetryTimerLocked(void* arg,
grpc_error_handle error) {
grpc_error_handle /*error*/) {
auto* calld = static_cast<CallData*>(arg);
if (error.ok() && calld->retry_timer_pending_) {
calld->retry_timer_pending_ = false;
calld->CreateCallAttempt(/*is_transparent_retry=*/false);
} else {
GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "retry timer cancelled");
}
calld->retry_timer_handle_.reset();
calld->CreateCallAttempt(/*is_transparent_retry=*/false);
GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnRetryTimer");
}

Loading…
Cancel
Save