retry: send at most one cancel_stream op on each call attempt (#28607)

pull/28504/head^2
Mark D. Roth 3 years ago committed by GitHub
parent 97584d8346
commit 6cdeb9de1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      src/core/ext/filters/client_channel/retry_filter.cc

@ -371,9 +371,10 @@ class RetryFilter::CallData {
void AddBatchForInternalRecvTrailingMetadata(
CallCombinerClosureList* closures);
// Adds a batch to closures to cancel this call attempt.
void AddBatchForCancelOp(grpc_error_handle error,
CallCombinerClosureList* closures);
// Adds a batch to closures to cancel this call attempt, if
// cancellation has not already been sent on the LB call.
void MaybeAddBatchForCancelOp(grpc_error_handle error,
CallCombinerClosureList* closures);
// Adds batches for pending batches to closures.
void AddBatchesForPendingBatches(CallCombinerClosureList* closures);
@ -447,6 +448,7 @@ class RetryFilter::CallData {
bool completed_recv_initial_metadata_ : 1;
bool started_recv_trailing_metadata_ : 1;
bool completed_recv_trailing_metadata_ : 1;
bool sent_cancel_stream_ : 1;
// State for callback processing.
RefCountedPtr<BatchData> recv_initial_metadata_ready_deferred_batch_;
grpc_error_handle recv_initial_metadata_error_ = GRPC_ERROR_NONE;
@ -662,6 +664,7 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld)
completed_recv_initial_metadata_(false),
started_recv_trailing_metadata_(false),
completed_recv_trailing_metadata_(false),
sent_cancel_stream_(false),
seen_recv_trailing_metadata_from_surface_(false),
abandoned_(false) {
lb_call_ = calld->CreateLoadBalancedCall(&attempt_dispatch_controller_);
@ -872,8 +875,13 @@ void RetryFilter::CallData::CallAttempt::
"starting internal recv_trailing_metadata", closures);
}
void RetryFilter::CallData::CallAttempt::AddBatchForCancelOp(
void RetryFilter::CallData::CallAttempt::MaybeAddBatchForCancelOp(
grpc_error_handle error, CallCombinerClosureList* closures) {
if (sent_cancel_stream_) {
GRPC_ERROR_UNREF(error);
return;
}
sent_cancel_stream_ = true;
BatchData* cancel_batch_data = CreateBatch(1, /*set_on_complete=*/true);
cancel_batch_data->AddCancelStreamOp(error);
AddClosureForBatch(cancel_batch_data->batch(),
@ -1221,7 +1229,7 @@ void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked(
// Cancel this attempt.
// TODO(roth): When implementing hedging, we should not cancel the
// current attempt.
call_attempt->AddBatchForCancelOp(
call_attempt->MaybeAddBatchForCancelOp(
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"retry perAttemptRecvTimeout exceeded"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED),
@ -1401,7 +1409,8 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
call_attempt->recv_initial_metadata_error_ = GRPC_ERROR_REF(error);
CallCombinerClosureList closures;
if (error != GRPC_ERROR_NONE) {
call_attempt->AddBatchForCancelOp(GRPC_ERROR_REF(error), &closures);
call_attempt->MaybeAddBatchForCancelOp(GRPC_ERROR_REF(error),
&closures);
}
if (!call_attempt->started_recv_trailing_metadata_) {
// recv_trailing_metadata not yet started by application; start it
@ -1498,7 +1507,8 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady(
call_attempt->recv_message_error_ = GRPC_ERROR_REF(error);
CallCombinerClosureList closures;
if (error != GRPC_ERROR_NONE) {
call_attempt->AddBatchForCancelOp(GRPC_ERROR_REF(error), &closures);
call_attempt->MaybeAddBatchForCancelOp(GRPC_ERROR_REF(error),
&closures);
}
if (!call_attempt->started_recv_trailing_metadata_) {
// recv_trailing_metadata not yet started by application; start it
@ -1693,7 +1703,7 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
calld->StartRetryTimer(server_pushback_ms);
// Cancel call attempt.
CallCombinerClosureList closures;
call_attempt->AddBatchForCancelOp(
call_attempt->MaybeAddBatchForCancelOp(
error == GRPC_ERROR_NONE
? grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("call attempt failed"),
@ -1812,7 +1822,7 @@ void RetryFilter::CallData::CallAttempt::BatchData::OnComplete(
call_attempt->on_complete_deferred_batches_.emplace_back(
std::move(batch_data), GRPC_ERROR_REF(error));
CallCombinerClosureList closures;
call_attempt->AddBatchForCancelOp(GRPC_ERROR_REF(error), &closures);
call_attempt->MaybeAddBatchForCancelOp(GRPC_ERROR_REF(error), &closures);
if (!call_attempt->started_recv_trailing_metadata_) {
// recv_trailing_metadata not yet started by application; start it
// ourselves to get status.

Loading…
Cancel
Save