diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc index 0778cf8c3e2..c52b6300208 100644 --- a/src/core/ext/filters/client_channel/retry_filter.cc +++ b/src/core/ext/filters/client_channel/retry_filter.cc @@ -35,6 +35,7 @@ #include "absl/strings/strip.h" #include "absl/types/optional.h" +#include #include #include #include @@ -64,7 +65,6 @@ #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/polling_entity.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/service_config/service_config.h" #include "src/core/lib/service_config/service_config_call_data.h" @@ -132,6 +132,7 @@ namespace grpc_core { namespace { +using grpc_event_engine::experimental::EventEngine; using internal::RetryGlobalConfig; using internal::RetryMethodConfig; using internal::RetryServiceConfigParser; @@ -176,6 +177,7 @@ class RetryFilter { RetryFilter(const ChannelArgs& args, grpc_error_handle* error) : client_channel_(args.GetObject()), + event_engine_(args.GetObject()), per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)), service_config_parser_index_( internal::RetryServiceConfigParser::ParserIndex()) { @@ -212,6 +214,7 @@ class RetryFilter { const grpc_call_context_element* context); ClientChannel* client_channel_; + EventEngine* const event_engine_; size_t per_rpc_retry_buffer_size_; RefCountedPtr retry_throttle_data_; const size_t service_config_parser_index_; @@ -442,7 +445,7 @@ class RetryFilter::CallData { // Abandons the call attempt. Unrefs any deferred batches. void Abandon(); - static void OnPerAttemptRecvTimer(void* arg, grpc_error_handle error); + void OnPerAttemptRecvTimer(); static void OnPerAttemptRecvTimerLocked(void* arg, grpc_error_handle error); void MaybeCancelPerAttemptRecvTimer(); @@ -451,9 +454,8 @@ class RetryFilter::CallData { OrphanablePtr lb_call_; bool lb_call_committed_ = false; - grpc_timer per_attempt_recv_timer_; grpc_closure on_per_attempt_recv_timer_; - bool per_attempt_recv_timer_pending_ = false; + absl::optional per_attempt_recv_timer_handle_; // BatchData.batch.payload points to this. grpc_transport_stream_op_batch_payload batch_payload_; @@ -548,8 +550,8 @@ class RetryFilter::CallData { // If server_pushback is nullopt, retry_backoff_ is used. void StartRetryTimer(absl::optional server_pushback); - static void OnRetryTimer(void* arg, grpc_error_handle error); - static void OnRetryTimerLocked(void* arg, grpc_error_handle error); + void OnRetryTimer(); + static void OnRetryTimerLocked(void* arg, grpc_error_handle /*error*/); // Adds a closure to closures to start a transparent retry. void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures); @@ -602,11 +604,10 @@ class RetryFilter::CallData { // Retry state. bool retry_committed_ : 1; - bool retry_timer_pending_ : 1; bool retry_codepath_started_ : 1; bool sent_transparent_retry_not_seen_by_server_ : 1; int num_attempts_completed_ = 0; - grpc_timer retry_timer_; + absl::optional retry_timer_handle_; grpc_closure retry_closure_; // Cached data for retrying send ops. @@ -715,23 +716,23 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld, // If per_attempt_recv_timeout is set, start a timer. if (calld->retry_policy_ != nullptr && calld->retry_policy_->per_attempt_recv_timeout().has_value()) { - Timestamp per_attempt_recv_deadline = - Timestamp::Now() + *calld->retry_policy_->per_attempt_recv_timeout(); + const Duration per_attempt_recv_timeout = + *calld->retry_policy_->per_attempt_recv_timeout(); if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: per-attempt timeout in %" PRId64 " ms", - calld->chand_, calld, this, - calld->retry_policy_->per_attempt_recv_timeout()->millis()); + calld->chand_, calld, this, per_attempt_recv_timeout.millis()); } // Schedule retry after computed delay. - GRPC_CLOSURE_INIT(&on_per_attempt_recv_timer_, OnPerAttemptRecvTimer, this, - nullptr); GRPC_CALL_STACK_REF(calld->owning_call_, "OnPerAttemptRecvTimer"); Ref(DEBUG_LOCATION, "OnPerAttemptRecvTimer").release(); - per_attempt_recv_timer_pending_ = true; - grpc_timer_init(&per_attempt_recv_timer_, per_attempt_recv_deadline, - &on_per_attempt_recv_timer_); + per_attempt_recv_timer_handle_ = calld_->chand_->event_engine_->RunAfter( + per_attempt_recv_timeout, [this] { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + OnPerAttemptRecvTimer(); + }); } } @@ -795,7 +796,7 @@ void RetryFilter::CallData::CallAttempt::MaybeSwitchToFastPath() { // If we've already switched to fast path, there's nothing to do here. if (calld_->committed_call_ != nullptr) return; // If the perAttemptRecvTimeout timer is pending, we can't switch yet. - if (per_attempt_recv_timer_pending_) return; + if (per_attempt_recv_timer_handle_.has_value()) return; // If there are still send ops to replay, we can't switch yet. if (HaveSendOpsToReplay()) return; // If we started an internal batch for recv_trailing_metadata but have not @@ -1241,14 +1242,11 @@ void RetryFilter::CallData::CallAttempt::Abandon() { on_complete_deferred_batches_.clear(); } -void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimer( - void* arg, grpc_error_handle error) { - auto* call_attempt = static_cast(arg); - GRPC_CLOSURE_INIT(&call_attempt->on_per_attempt_recv_timer_, - OnPerAttemptRecvTimerLocked, call_attempt, nullptr); - GRPC_CALL_COMBINER_START(call_attempt->calld_->call_combiner_, - &call_attempt->on_per_attempt_recv_timer_, error, - "per-attempt timer fired"); +void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimer() { + GRPC_CLOSURE_INIT(&on_per_attempt_recv_timer_, OnPerAttemptRecvTimerLocked, + this, nullptr); + GRPC_CALL_COMBINER_START(calld_->call_combiner_, &on_per_attempt_recv_timer_, + absl::OkStatus(), "per-attempt timer fired"); } void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked( @@ -1258,35 +1256,33 @@ void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked( if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: perAttemptRecvTimeout timer fired: " - "error=%s, per_attempt_recv_timer_pending_=%d", + "error=%s, per_attempt_recv_timer_handle_.has_value()=%d", calld->chand_, calld, call_attempt, StatusToString(error).c_str(), - call_attempt->per_attempt_recv_timer_pending_); + call_attempt->per_attempt_recv_timer_handle_.has_value()); } CallCombinerClosureList closures; - if (error.ok() && call_attempt->per_attempt_recv_timer_pending_) { - call_attempt->per_attempt_recv_timer_pending_ = false; - // Cancel this attempt. - // TODO(roth): When implementing hedging, we should not cancel the - // current attempt. - call_attempt->MaybeAddBatchForCancelOp( - grpc_error_set_int( - GRPC_ERROR_CREATE("retry perAttemptRecvTimeout exceeded"), - StatusIntProperty::kRpcStatus, GRPC_STATUS_CANCELLED), - &closures); - // Check whether we should retry. - if (call_attempt->ShouldRetry(/*status=*/absl::nullopt, - /*server_pushback_ms=*/absl::nullopt)) { - // Mark current attempt as abandoned. - call_attempt->Abandon(); - // We are retrying. Start backoff timer. - calld->StartRetryTimer(/*server_pushback=*/absl::nullopt); - } else { - // Not retrying, so commit the call. - calld->RetryCommit(call_attempt); - // If retry state is no longer needed, switch to fast path for - // subsequent batches. - call_attempt->MaybeSwitchToFastPath(); - } + call_attempt->per_attempt_recv_timer_handle_.reset(); + // Cancel this attempt. + // TODO(roth): When implementing hedging, we should not cancel the + // current attempt. + call_attempt->MaybeAddBatchForCancelOp( + grpc_error_set_int( + GRPC_ERROR_CREATE("retry perAttemptRecvTimeout exceeded"), + StatusIntProperty::kRpcStatus, GRPC_STATUS_CANCELLED), + &closures); + // Check whether we should retry. + if (call_attempt->ShouldRetry(/*status=*/absl::nullopt, + /*server_pushback_ms=*/absl::nullopt)) { + // Mark current attempt as abandoned. + call_attempt->Abandon(); + // We are retrying. Start backoff timer. + calld->StartRetryTimer(/*server_pushback=*/absl::nullopt); + } else { + // Not retrying, so commit the call. + calld->RetryCommit(call_attempt); + // If retry state is no longer needed, switch to fast path for + // subsequent batches. + call_attempt->MaybeSwitchToFastPath(); } closures.RunClosures(calld->call_combiner_); call_attempt->Unref(DEBUG_LOCATION, "OnPerAttemptRecvTimer"); @@ -1294,15 +1290,19 @@ void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked( } void RetryFilter::CallData::CallAttempt::MaybeCancelPerAttemptRecvTimer() { - if (per_attempt_recv_timer_pending_) { + if (per_attempt_recv_timer_handle_.has_value()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: cancelling " "perAttemptRecvTimeout timer", calld_->chand_, calld_, this); } - per_attempt_recv_timer_pending_ = false; - grpc_timer_cancel(&per_attempt_recv_timer_); + if (calld_->chand_->event_engine_->Cancel( + *per_attempt_recv_timer_handle_)) { + Unref(DEBUG_LOCATION, "OnPerAttemptRecvTimer"); + GRPC_CALL_STACK_UNREF(calld_->owning_call_, "OnPerAttemptRecvTimer"); + } + per_attempt_recv_timer_handle_.reset(); } } @@ -2154,7 +2154,6 @@ RetryFilter::CallData::CallData(RetryFilter* chand, pending_send_message_(false), pending_send_trailing_metadata_(false), retry_committed_(false), - retry_timer_pending_(false), retry_codepath_started_(false), sent_transparent_retry_not_seen_by_server_(false) {} @@ -2215,13 +2214,15 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( return; } // Cancel retry timer if needed. - if (retry_timer_pending_) { + if (retry_timer_handle_.has_value()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling retry timer", chand_, this); } - retry_timer_pending_ = false; // Lame timer callback. - grpc_timer_cancel(&retry_timer_); + if (chand_->event_engine_->Cancel(*retry_timer_handle_)) { + GRPC_CALL_STACK_UNREF(owning_call_, "OnRetryTimer"); + } + retry_timer_handle_.reset(); FreeAllCachedSendOpData(); } // We have no call attempt, so there's nowhere to send the cancellation @@ -2235,7 +2236,7 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( PendingBatch* pending = PendingBatchesAdd(batch); // If the timer is pending, yield the call combiner and wait for it to // run, since we don't want to start another call attempt until it does. - if (retry_timer_pending_) { + if (retry_timer_handle_.has_value()) { GRPC_CALL_COMBINER_STOP(call_combiner_, "added pending batch while retry timer pending"); return; @@ -2569,42 +2570,40 @@ void RetryFilter::CallData::StartRetryTimer( // Reset call attempt. call_attempt_.reset(DEBUG_LOCATION, "StartRetryTimer"); // Compute backoff delay. - Timestamp next_attempt_time; + Duration next_attempt_timeout; if (server_pushback.has_value()) { GPR_ASSERT(*server_pushback >= Duration::Zero()); - next_attempt_time = Timestamp::Now() + *server_pushback; + next_attempt_timeout = *server_pushback; retry_backoff_.Reset(); } else { - next_attempt_time = retry_backoff_.NextAttemptTime(); + next_attempt_timeout = retry_backoff_.NextAttemptTime() - Timestamp::Now(); } if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand_, - this, (next_attempt_time - Timestamp::Now()).millis()); + this, next_attempt_timeout.millis()); } // Schedule retry after computed delay. - GRPC_CLOSURE_INIT(&retry_closure_, OnRetryTimer, this, nullptr); GRPC_CALL_STACK_REF(owning_call_, "OnRetryTimer"); - retry_timer_pending_ = true; - grpc_timer_init(&retry_timer_, next_attempt_time, &retry_closure_); + retry_timer_handle_ = + chand_->event_engine_->RunAfter(next_attempt_timeout, [this] { + ApplicationCallbackExecCtx callback_exec_ctx; + ExecCtx exec_ctx; + OnRetryTimer(); + }); } -void RetryFilter::CallData::OnRetryTimer(void* arg, grpc_error_handle error) { - auto* calld = static_cast(arg); - GRPC_CLOSURE_INIT(&calld->retry_closure_, OnRetryTimerLocked, calld, nullptr); - GRPC_CALL_COMBINER_START(calld->call_combiner_, &calld->retry_closure_, error, +void RetryFilter::CallData::OnRetryTimer() { + GRPC_CLOSURE_INIT(&retry_closure_, OnRetryTimerLocked, this, nullptr); + GRPC_CALL_COMBINER_START(call_combiner_, &retry_closure_, absl::OkStatus(), "retry timer fired"); } void RetryFilter::CallData::OnRetryTimerLocked(void* arg, - grpc_error_handle error) { + grpc_error_handle /*error*/) { auto* calld = static_cast(arg); - if (error.ok() && calld->retry_timer_pending_) { - calld->retry_timer_pending_ = false; - calld->CreateCallAttempt(/*is_transparent_retry=*/false); - } else { - GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "retry timer cancelled"); - } + calld->retry_timer_handle_.reset(); + calld->CreateCallAttempt(/*is_transparent_retry=*/false); GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnRetryTimer"); }