From 6cdeb9de1ad3e85e736f2a12e675172e18377c60 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 21 Jan 2022 07:39:44 -0800 Subject: [PATCH] retry: send at most one cancel_stream op on each call attempt (#28607) --- .../filters/client_channel/retry_filter.cc | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc index f42928a630b..2561e12c992 100644 --- a/src/core/ext/filters/client_channel/retry_filter.cc +++ b/src/core/ext/filters/client_channel/retry_filter.cc @@ -371,9 +371,10 @@ class RetryFilter::CallData { void AddBatchForInternalRecvTrailingMetadata( CallCombinerClosureList* closures); - // Adds a batch to closures to cancel this call attempt. - void AddBatchForCancelOp(grpc_error_handle error, - CallCombinerClosureList* closures); + // Adds a batch to closures to cancel this call attempt, if + // cancellation has not already been sent on the LB call. + void MaybeAddBatchForCancelOp(grpc_error_handle error, + CallCombinerClosureList* closures); // Adds batches for pending batches to closures. void AddBatchesForPendingBatches(CallCombinerClosureList* closures); @@ -447,6 +448,7 @@ class RetryFilter::CallData { bool completed_recv_initial_metadata_ : 1; bool started_recv_trailing_metadata_ : 1; bool completed_recv_trailing_metadata_ : 1; + bool sent_cancel_stream_ : 1; // State for callback processing. RefCountedPtr recv_initial_metadata_ready_deferred_batch_; grpc_error_handle recv_initial_metadata_error_ = GRPC_ERROR_NONE; @@ -662,6 +664,7 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld) completed_recv_initial_metadata_(false), started_recv_trailing_metadata_(false), completed_recv_trailing_metadata_(false), + sent_cancel_stream_(false), seen_recv_trailing_metadata_from_surface_(false), abandoned_(false) { lb_call_ = calld->CreateLoadBalancedCall(&attempt_dispatch_controller_); @@ -872,8 +875,13 @@ void RetryFilter::CallData::CallAttempt:: "starting internal recv_trailing_metadata", closures); } -void RetryFilter::CallData::CallAttempt::AddBatchForCancelOp( +void RetryFilter::CallData::CallAttempt::MaybeAddBatchForCancelOp( grpc_error_handle error, CallCombinerClosureList* closures) { + if (sent_cancel_stream_) { + GRPC_ERROR_UNREF(error); + return; + } + sent_cancel_stream_ = true; BatchData* cancel_batch_data = CreateBatch(1, /*set_on_complete=*/true); cancel_batch_data->AddCancelStreamOp(error); AddClosureForBatch(cancel_batch_data->batch(), @@ -1221,7 +1229,7 @@ void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked( // Cancel this attempt. // TODO(roth): When implementing hedging, we should not cancel the // current attempt. - call_attempt->AddBatchForCancelOp( + call_attempt->MaybeAddBatchForCancelOp( grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "retry perAttemptRecvTimeout exceeded"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED), @@ -1401,7 +1409,8 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady( call_attempt->recv_initial_metadata_error_ = GRPC_ERROR_REF(error); CallCombinerClosureList closures; if (error != GRPC_ERROR_NONE) { - call_attempt->AddBatchForCancelOp(GRPC_ERROR_REF(error), &closures); + call_attempt->MaybeAddBatchForCancelOp(GRPC_ERROR_REF(error), + &closures); } if (!call_attempt->started_recv_trailing_metadata_) { // recv_trailing_metadata not yet started by application; start it @@ -1498,7 +1507,8 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady( call_attempt->recv_message_error_ = GRPC_ERROR_REF(error); CallCombinerClosureList closures; if (error != GRPC_ERROR_NONE) { - call_attempt->AddBatchForCancelOp(GRPC_ERROR_REF(error), &closures); + call_attempt->MaybeAddBatchForCancelOp(GRPC_ERROR_REF(error), + &closures); } if (!call_attempt->started_recv_trailing_metadata_) { // recv_trailing_metadata not yet started by application; start it @@ -1693,7 +1703,7 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady( calld->StartRetryTimer(server_pushback_ms); // Cancel call attempt. CallCombinerClosureList closures; - call_attempt->AddBatchForCancelOp( + call_attempt->MaybeAddBatchForCancelOp( error == GRPC_ERROR_NONE ? grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("call attempt failed"), @@ -1812,7 +1822,7 @@ void RetryFilter::CallData::CallAttempt::BatchData::OnComplete( call_attempt->on_complete_deferred_batches_.emplace_back( std::move(batch_data), GRPC_ERROR_REF(error)); CallCombinerClosureList closures; - call_attempt->AddBatchForCancelOp(GRPC_ERROR_REF(error), &closures); + call_attempt->MaybeAddBatchForCancelOp(GRPC_ERROR_REF(error), &closures); if (!call_attempt->started_recv_trailing_metadata_) { // recv_trailing_metadata not yet started by application; start it // ourselves to get status.