diff --git a/CMakeLists.txt b/CMakeLists.txt index ea6df645a3b..82e1a6e848d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1156,6 +1156,7 @@ add_library(end2end_nosec_tests test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc test/core/end2end/tests/retry_recv_initial_metadata.cc test/core/end2end/tests/retry_recv_message.cc + test/core/end2end/tests/retry_send_op_fails.cc test/core/end2end/tests/retry_server_pushback_delay.cc test/core/end2end/tests/retry_server_pushback_disabled.cc test/core/end2end/tests/retry_streaming.cc @@ -1291,6 +1292,7 @@ add_library(end2end_tests test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc test/core/end2end/tests/retry_recv_initial_metadata.cc test/core/end2end/tests/retry_recv_message.cc + test/core/end2end/tests/retry_send_op_fails.cc test/core/end2end/tests/retry_server_pushback_delay.cc test/core/end2end/tests/retry_server_pushback_disabled.cc test/core/end2end/tests/retry_streaming.cc diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index aadb297813b..76474d0e0e8 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -95,6 +95,7 @@ libs: - test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc - test/core/end2end/tests/retry_recv_initial_metadata.cc - test/core/end2end/tests/retry_recv_message.cc + - test/core/end2end/tests/retry_send_op_fails.cc - test/core/end2end/tests/retry_server_pushback_delay.cc - test/core/end2end/tests/retry_server_pushback_disabled.cc - test/core/end2end/tests/retry_streaming.cc @@ -205,6 +206,7 @@ libs: - test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc - test/core/end2end/tests/retry_recv_initial_metadata.cc - test/core/end2end/tests/retry_recv_message.cc + - test/core/end2end/tests/retry_send_op_fails.cc - test/core/end2end/tests/retry_server_pushback_delay.cc - test/core/end2end/tests/retry_server_pushback_disabled.cc - test/core/end2end/tests/retry_streaming.cc diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 0694666498d..5833399998e 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -2104,6 +2104,7 @@ Pod::Spec.new do |s| 'test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc', 'test/core/end2end/tests/retry_recv_initial_metadata.cc', 'test/core/end2end/tests/retry_recv_message.cc', + 'test/core/end2end/tests/retry_send_op_fails.cc', 'test/core/end2end/tests/retry_server_pushback_delay.cc', 'test/core/end2end/tests/retry_server_pushback_disabled.cc', 'test/core/end2end/tests/retry_streaming.cc', diff --git a/grpc.gyp b/grpc.gyp index 3b2bd521dfa..e1eb2f543d5 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -249,6 +249,7 @@ 'test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc', 'test/core/end2end/tests/retry_recv_initial_metadata.cc', 'test/core/end2end/tests/retry_recv_message.cc', + 'test/core/end2end/tests/retry_send_op_fails.cc', 'test/core/end2end/tests/retry_server_pushback_delay.cc', 'test/core/end2end/tests/retry_server_pushback_disabled.cc', 'test/core/end2end/tests/retry_streaming.cc', @@ -352,6 +353,7 @@ 'test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc', 'test/core/end2end/tests/retry_recv_initial_metadata.cc', 'test/core/end2end/tests/retry_recv_message.cc', + 'test/core/end2end/tests/retry_send_op_fails.cc', 'test/core/end2end/tests/retry_server_pushback_delay.cc', 'test/core/end2end/tests/retry_server_pushback_disabled.cc', 'test/core/end2end/tests/retry_streaming.cc', diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc index fd24641c472..d1083f73a03 100644 --- a/src/core/ext/filters/client_channel/retry_filter.cc +++ b/src/core/ext/filters/client_channel/retry_filter.cc @@ -241,23 +241,26 @@ class RetryFilter::CallData { grpc_transport_stream_op_batch* batch() { return &batch_; } - // Adds retriable send_initial_metadata op to batch_data. + // Adds retriable send_initial_metadata op. void AddRetriableSendInitialMetadataOp(); - // Adds retriable send_message op to batch_data. + // Adds retriable send_message op. void AddRetriableSendMessageOp(); - // Adds retriable send_trailing_metadata op to batch_data. + // Adds retriable send_trailing_metadata op. void AddRetriableSendTrailingMetadataOp(); - // Adds retriable recv_initial_metadata op to batch_data. + // Adds retriable recv_initial_metadata op. void AddRetriableRecvInitialMetadataOp(); - // Adds retriable recv_message op to batch_data. + // Adds retriable recv_message op. void AddRetriableRecvMessageOp(); - // Adds retriable recv_trailing_metadata op to batch_data. + // Adds retriable recv_trailing_metadata op. void AddRetriableRecvTrailingMetadataOp(); + // Adds cancel_stream op. + void AddCancelStreamOp(); private: - // Returns true if the call is being retried. - bool MaybeRetry(grpc_status_code status, grpc_mdelem* server_pushback_md, - bool is_lb_drop); + // Returns true if the call should be retried. + // Populates server_pushback_ms if appropriate. + bool ShouldRetry(grpc_status_code status, grpc_mdelem* server_pushback_md, + bool is_lb_drop, grpc_millis* server_pushback_ms); // Frees cached send ops that were completed by the completed batch in // batch_data. Used when batches are completed after the call is @@ -280,9 +283,9 @@ class RetryFilter::CallData { // Adds recv_trailing_metadata_ready closure to closures. void AddClosureForRecvTrailingMetadataReady( grpc_error_handle error, CallCombinerClosureList* closures); - // Adds any necessary closures for deferred recv_initial_metadata and - // recv_message callbacks to closures. - void AddClosuresForDeferredRecvCallbacks( + // Adds any necessary closures for deferred batch completion + // callbacks to closures. + void AddClosuresForDeferredCompletionCallbacks( CallCombinerClosureList* closures); // For any pending batch containing an op that has not yet been started, // adds the pending batch's completion closures to closures. @@ -320,7 +323,8 @@ class RetryFilter::CallData { // on_complete callback will be set to point to on_complete(); // otherwise, the batch's on_complete callback will be null. BatchData* CreateBatch(int refcount, bool set_on_complete) { - return calld_->arena_->New(Ref(), refcount, set_on_complete); + return calld_->arena_->New(Ref(DEBUG_LOCATION, "CreateBatch"), + refcount, set_on_complete); } // If there are any cached send ops that need to be replayed on this @@ -404,7 +408,10 @@ class RetryFilter::CallData { grpc_error_handle recv_initial_metadata_error_ = GRPC_ERROR_NONE; RefCountedPtr recv_message_ready_deferred_batch_; grpc_error_handle recv_message_error_ = GRPC_ERROR_NONE; + RefCountedPtr on_complete_deferred_batch_; + grpc_error_handle on_complete_error_ = GRPC_ERROR_NONE; RefCountedPtr recv_trailing_metadata_internal_batch_; + bool seen_recv_trailing_metadata_from_surface_ : 1; // NOTE: Do not move this next to the metadata bitfields above. That would // save space but will also result in a data race because compiler // will generate a 2 byte store which overwrites the meta-data @@ -443,8 +450,8 @@ class RetryFilter::CallData { // Commits the call so that no further retry attempts will be performed. void RetryCommit(CallAttempt* call_attempt); - // Starts a retry after appropriate back-off. - void DoRetry(grpc_millis server_pushback_ms); + // Starts a timer to retry after appropriate back-off. + void StartRetryTimer(grpc_millis server_pushback_ms); static void OnRetryTimer(void* arg, grpc_error_handle error); static void OnRetryTimerLocked(void* arg, grpc_error_handle error); @@ -588,7 +595,9 @@ class RetryFilter::CallData::CallStackDestructionBarrier // RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld) - : calld_(calld), + : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace) ? "CallAttempt" + : nullptr), + calld_(calld), batch_payload_(calld->call_context_), started_send_initial_metadata_(false), completed_send_initial_metadata_(false), @@ -598,6 +607,7 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld) completed_recv_initial_metadata_(false), started_recv_trailing_metadata_(false), completed_recv_trailing_metadata_(false), + seen_recv_trailing_metadata_from_surface_(false), retry_dispatched_(false) { lb_call_ = calld->CreateLoadBalancedCall(); if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { @@ -675,7 +685,7 @@ void RetryFilter::CallData::CallAttempt::MaybeSwitchToFastPath() { calld_->chand_, calld_, this); } calld_->committed_call_ = std::move(lb_call_); - calld_->call_attempt_.reset(); + calld_->call_attempt_.reset(DEBUG_LOCATION, "MaybeSwitchToFastPath"); } } @@ -820,6 +830,7 @@ void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches( continue; } if (batch->recv_trailing_metadata && started_recv_trailing_metadata_) { + seen_recv_trailing_metadata_from_surface_ = true; // If we previously completed a recv_trailing_metadata op // initiated by StartInternalRecvTrailingMetadata(), use the // result of that instead of trying to re-start this op. @@ -837,7 +848,10 @@ void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches( // Ref will be released by callback. recv_trailing_metadata_internal_batch_.release(); } else { - recv_trailing_metadata_internal_batch_.reset(); + recv_trailing_metadata_internal_batch_.reset( + DEBUG_LOCATION, + "internally started recv_trailing_metadata batch pending and " + "recv_trailing_metadata started from surface"); } } continue; @@ -933,7 +947,15 @@ void RetryFilter::CallData::CallAttempt::StartRetriableBatches() { RetryFilter::CallData::CallAttempt::BatchData::BatchData( RefCountedPtr attempt, int refcount, bool set_on_complete) - : RefCounted(nullptr, refcount), call_attempt_(std::move(attempt)) { + : RefCounted( + GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace) ? "BatchData" : nullptr, + refcount), + call_attempt_(std::move(attempt)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: creating batch %p", + call_attempt_->calld_->chand_, call_attempt_->calld_, + call_attempt_.get(), this); + } // We hold a ref to the call stack for every batch sent on a call attempt. // This is because some batches on the call attempt may not complete // until after all of the batches are completed at the surface (because @@ -951,6 +973,11 @@ RetryFilter::CallData::CallAttempt::BatchData::BatchData( } RetryFilter::CallData::CallAttempt::BatchData::~BatchData() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: destroying batch %p", + call_attempt_->calld_->chand_, call_attempt_->calld_, + call_attempt_.get(), this); + } if (batch_.send_initial_metadata) { grpc_metadata_batch_destroy(&call_attempt_->send_initial_metadata_); } @@ -964,6 +991,7 @@ RetryFilter::CallData::CallAttempt::BatchData::~BatchData() { grpc_metadata_batch_destroy(&call_attempt_->recv_trailing_metadata_); } GRPC_CALL_STACK_UNREF(call_attempt_->calld_->owning_call_, "Retry BatchData"); + call_attempt_.reset(DEBUG_LOCATION, "~BatchData"); } void RetryFilter::CallData::CallAttempt::BatchData:: @@ -985,24 +1013,14 @@ void RetryFilter::CallData::CallAttempt::BatchData:: } } -bool RetryFilter::CallData::CallAttempt::BatchData::MaybeRetry( - grpc_status_code status, grpc_mdelem* server_pushback_md, bool is_lb_drop) { +bool RetryFilter::CallData::CallAttempt::BatchData::ShouldRetry( + grpc_status_code status, grpc_mdelem* server_pushback_md, bool is_lb_drop, + grpc_millis* server_pushback_ms) { auto* calld = call_attempt_->calld_; // LB drops always inhibit retries. if (is_lb_drop) return false; // Get retry policy. if (calld->retry_policy_ == nullptr) return false; - // If we've already dispatched a retry from this call, return true. - // This catches the case where the batch has multiple callbacks - // (i.e., it includes either recv_message or recv_initial_metadata). - if (call_attempt_->retry_dispatched_) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { - gpr_log(GPR_INFO, - "chand=%p calld=%p attempt=%p: retry already dispatched", - calld->chand_, calld, call_attempt_.get()); - } - return true; - } // Check status. if (GPR_LIKELY(status == GRPC_STATUS_OK)) { if (calld->retry_throttle_data_ != nullptr) { @@ -1061,7 +1079,6 @@ bool RetryFilter::CallData::CallAttempt::BatchData::MaybeRetry( return false; } // Check server push-back. - grpc_millis server_pushback_ms = -1; if (server_pushback_md != nullptr) { // If the value is "-1" or any other unparseable string, we do not retry. uint32_t ms; @@ -1080,12 +1097,10 @@ bool RetryFilter::CallData::CallAttempt::BatchData::MaybeRetry( "chand=%p calld=%p attempt=%p: server push-back: retry in %u ms", calld->chand_, calld, call_attempt_.get(), ms); } - server_pushback_ms = static_cast(ms); + *server_pushback_ms = static_cast(ms); } } - // Do retry. - call_attempt_->retry_dispatched_ = true; - calld->DoRetry(server_pushback_ms); + // We should retry. return true; } @@ -1337,7 +1352,8 @@ void RetryFilter::CallData::CallAttempt::BatchData:: } void RetryFilter::CallData::CallAttempt::BatchData:: - AddClosuresForDeferredRecvCallbacks(CallCombinerClosureList* closures) { + AddClosuresForDeferredCompletionCallbacks( + CallCombinerClosureList* closures) { if (batch_.recv_trailing_metadata) { // Add closure for deferred recv_initial_metadata_ready. if (GPR_UNLIKELY( @@ -1363,6 +1379,12 @@ void RetryFilter::CallData::CallAttempt::BatchData:: call_attempt_->recv_message_error_, "resuming recv_message_ready"); } + // Add closure for deferred on_complete. + if (GPR_UNLIKELY(call_attempt_->on_complete_deferred_batch_ != nullptr)) { + closures->Add(&call_attempt_->on_complete_deferred_batch_->on_complete_, + call_attempt_->on_complete_error_, "resuming on_complete"); + call_attempt_->on_complete_deferred_batch_.release(); + } } } @@ -1394,9 +1416,8 @@ void RetryFilter::CallData::CallAttempt::BatchData::RunClosuresForCompletedCall( CallCombinerClosureList closures; // First, add closure for recv_trailing_metadata_ready. AddClosureForRecvTrailingMetadataReady(GRPC_ERROR_REF(error), &closures); - // If there are deferred recv_initial_metadata_ready or recv_message_ready - // callbacks, add them to closures. - AddClosuresForDeferredRecvCallbacks(&closures); + // If there are deferred batch completion callbacks, add them to closures. + AddClosuresForDeferredCompletionCallbacks(&closures); // Add closures to fail any pending batches that have not yet been started. AddClosuresToFailUnstartedPendingBatches(GRPC_ERROR_REF(error), &closures); // Schedule all of the closures identified above. @@ -1434,15 +1455,46 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady( is_lb_drop); } // Check if we should retry. - if (batch_data->MaybeRetry(status, server_pushback_md, is_lb_drop)) { - // Unref BatchData objects for deferred recv_initial_metadata_ready or - // recv_message_ready callbacks, if any. - call_attempt->recv_initial_metadata_ready_deferred_batch_.reset(); + grpc_millis server_pushback_ms = -1; + if (batch_data->ShouldRetry(status, server_pushback_md, is_lb_drop, + &server_pushback_ms)) { + // We're retrying. + // Unref batches for deferred completion callbacks that will now never + // be invoked. + if (!call_attempt->seen_recv_trailing_metadata_from_surface_) { + call_attempt->recv_trailing_metadata_internal_batch_.reset( + DEBUG_LOCATION, + "internal recv_trailing_metadata completed before that op was " + "started from the surface"); + } + call_attempt->recv_initial_metadata_ready_deferred_batch_.reset( + DEBUG_LOCATION, + "unref deferred recv_initial_metadata_ready batch due to retry"); GRPC_ERROR_UNREF(call_attempt->recv_initial_metadata_error_); call_attempt->recv_initial_metadata_error_ = GRPC_ERROR_NONE; - call_attempt->recv_message_ready_deferred_batch_.reset(); + call_attempt->recv_message_ready_deferred_batch_.reset( + DEBUG_LOCATION, "unref deferred recv_message_ready batch due to retry"); GRPC_ERROR_UNREF(call_attempt->recv_message_error_); call_attempt->recv_message_error_ = GRPC_ERROR_NONE; + call_attempt->on_complete_deferred_batch_.reset( + DEBUG_LOCATION, "unref deferred on_complete batch due to retry"); + GRPC_ERROR_UNREF(call_attempt->on_complete_error_); + call_attempt->on_complete_error_ = GRPC_ERROR_NONE; + // Start retry timer. + call_attempt->retry_dispatched_ = true; + calld->StartRetryTimer(server_pushback_ms); + // Start a cancellation op on this call attempt to make sure the + // transport knows that this call should be cleaned up, even if it + // hasn't received any ops. + BatchData* cancel_batch_data = + call_attempt->CreateBatch(1, /*set_on_complete=*/true); + cancel_batch_data->AddCancelStreamOp(); + CallCombinerClosureList closures; + call_attempt->AddClosureForBatch(cancel_batch_data->batch(), + "start cancellation batch on call attempt", + &closures); + // Yields call combiner. + closures.RunClosures(calld->call_combiner_); return; } // Not retrying, so commit the call. @@ -1518,9 +1570,6 @@ void RetryFilter::CallData::CallAttempt::BatchData::OnComplete( RefCountedPtr batch_data(static_cast(arg)); CallAttempt* call_attempt = batch_data->call_attempt_.get(); CallData* calld = call_attempt->calld_; - // TODO(roth): If error is not GRPC_ERROR_NONE, then we should defer - // sending the completion of this batch to the surface until we see - // recv_trailing_metadata_ready and decide whether we want to retry. if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: got on_complete, error=%s, batch=%s", @@ -1528,6 +1577,36 @@ void RetryFilter::CallData::CallAttempt::BatchData::OnComplete( grpc_error_std_string(error).c_str(), grpc_transport_stream_op_batch_string(&batch_data->batch_).c_str()); } + // If a retry was already dispatched, then we're not going to propagate + // the completion of this batch, so do nothing. + if (call_attempt->retry_dispatched_) { + GRPC_CALL_COMBINER_STOP(calld->call_combiner_, + "on_complete after retry dispatched"); + return; + } + // If we got an error and have not yet gotten the + // recv_trailing_metadata_ready callback, then defer propagating this + // callback back to the surface. We can evaluate whether to retry when + // recv_trailing_metadata comes back. + if (GPR_UNLIKELY(!calld->retry_committed_ && error != GRPC_ERROR_NONE && + !call_attempt->completed_recv_trailing_metadata_)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: deferring on_complete", + calld->chand_, calld, call_attempt); + } + call_attempt->on_complete_deferred_batch_ = std::move(batch_data); + call_attempt->on_complete_error_ = GRPC_ERROR_REF(error); + if (!call_attempt->started_recv_trailing_metadata_) { + // recv_trailing_metadata not yet started by application; start it + // ourselves to get status. + call_attempt->StartInternalRecvTrailingMetadata(); + } else { + GRPC_CALL_COMBINER_STOP( + calld->call_combiner_, + "on_complete failure before recv_trailing_metadata_ready"); + } + return; + } // Update bookkeeping in call_attempt. if (batch_data->batch_.send_initial_metadata) { call_attempt->completed_send_initial_metadata_ = true; @@ -1545,26 +1624,21 @@ void RetryFilter::CallData::CallAttempt::BatchData::OnComplete( } // Construct list of closures to execute. CallCombinerClosureList closures; - // If a retry was already dispatched, that means we saw - // recv_trailing_metadata before this, so we do nothing here. - // Otherwise, invoke the callback to return the result to the surface. - if (!call_attempt->retry_dispatched_) { - // Add closure for the completed pending batch, if any. - batch_data->AddClosuresForCompletedPendingBatch(GRPC_ERROR_REF(error), - &closures); - // If needed, add a callback to start any replay or pending send ops on - // the LB call. - if (!call_attempt->completed_recv_trailing_metadata_) { - batch_data->AddClosuresForReplayOrPendingSendOps(&closures); - } - // If the call is committed and retry state is no longer needed, switch - // to fast path for subsequent batches. - // TODO(roth): As part of implementing hedging, this logic needs to - // check that *this* call attempt is the one that we've committed to. - // Might need to replace retry_dispatched_ with an enum indicating - // whether we're in flight, abandoned, or the winning call attempt. - if (calld->retry_committed_) call_attempt->MaybeSwitchToFastPath(); - } + // Add closure for the completed pending batch, if any. + batch_data->AddClosuresForCompletedPendingBatch(GRPC_ERROR_REF(error), + &closures); + // If needed, add a callback to start any replay or pending send ops on + // the LB call. + if (!call_attempt->completed_recv_trailing_metadata_) { + batch_data->AddClosuresForReplayOrPendingSendOps(&closures); + } + // If the call is committed and retry state is no longer needed, switch + // to fast path for subsequent batches. + // TODO(roth): As part of implementing hedging, this logic needs to + // check that *this* call attempt is the one that we've committed to. + // Might need to replace retry_dispatched_ with an enum indicating + // whether we're in flight, abandoned, or the winning call attempt. + if (calld->retry_committed_) call_attempt->MaybeSwitchToFastPath(); // Schedule all of the closures identified above. // Note: This yields the call combiner. closures.RunClosures(calld->call_combiner_); @@ -1704,6 +1778,12 @@ void RetryFilter::CallData::CallAttempt::BatchData:: &call_attempt_->recv_trailing_metadata_ready_; } +void RetryFilter::CallData::CallAttempt::BatchData::AddCancelStreamOp() { + batch_.cancel_stream = true; + batch_.payload->cancel_stream.cancel_error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("retry attempt abandoned"); +} + // // CallData vtable functions // @@ -1849,7 +1929,8 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( // If the timer is pending, yield the call combiner and wait for it to // run, since we don't want to start another call attempt until it does. if (retry_timer_pending_) { - GRPC_CALL_COMBINER_STOP(call_combiner_, "RetryTimerPending"); + GRPC_CALL_COMBINER_STOP(call_combiner_, + "added pending batch while retry timer pending"); return; } // If we do not yet have a call attempt, create one. @@ -2164,9 +2245,9 @@ void RetryFilter::CallData::RetryCommit(CallAttempt* call_attempt) { } } -void RetryFilter::CallData::DoRetry(grpc_millis server_pushback_ms) { +void RetryFilter::CallData::StartRetryTimer(grpc_millis server_pushback_ms) { // Reset call attempt. - call_attempt_.reset(); + call_attempt_.reset(DEBUG_LOCATION, "StartRetryTimer"); // Compute backoff delay. grpc_millis next_attempt_time; if (server_pushback_ms >= 0) { @@ -2188,14 +2269,13 @@ void RetryFilter::CallData::DoRetry(grpc_millis server_pushback_ms) { GRPC_CALL_STACK_REF(owning_call_, "OnRetryTimer"); retry_timer_pending_ = true; grpc_timer_init(&retry_timer_, next_attempt_time, &retry_closure_); - GRPC_CALL_COMBINER_STOP(call_combiner_, "RetryTimer"); } void RetryFilter::CallData::OnRetryTimer(void* arg, grpc_error_handle error) { auto* calld = static_cast(arg); GRPC_CLOSURE_INIT(&calld->retry_closure_, OnRetryTimerLocked, calld, nullptr); GRPC_CALL_COMBINER_START(calld->call_combiner_, &calld->retry_closure_, - GRPC_ERROR_REF(error), "OnRetryTimer"); + GRPC_ERROR_REF(error), "retry timer fired"); } void RetryFilter::CallData::OnRetryTimerLocked(void* arg, diff --git a/test/core/end2end/end2end_nosec_tests.cc b/test/core/end2end/end2end_nosec_tests.cc index 277df1765ac..1bd3aca7c8b 100644 --- a/test/core/end2end/end2end_nosec_tests.cc +++ b/test/core/end2end/end2end_nosec_tests.cc @@ -147,6 +147,8 @@ extern void retry_recv_initial_metadata(grpc_end2end_test_config config); extern void retry_recv_initial_metadata_pre_init(void); extern void retry_recv_message(grpc_end2end_test_config config); extern void retry_recv_message_pre_init(void); +extern void retry_send_op_fails(grpc_end2end_test_config config); +extern void retry_send_op_fails_pre_init(void); extern void retry_server_pushback_delay(grpc_end2end_test_config config); extern void retry_server_pushback_delay_pre_init(void); extern void retry_server_pushback_disabled(grpc_end2end_test_config config); @@ -256,6 +258,7 @@ void grpc_end2end_tests_pre_init(void) { retry_non_retriable_status_before_recv_trailing_metadata_started_pre_init(); retry_recv_initial_metadata_pre_init(); retry_recv_message_pre_init(); + retry_send_op_fails_pre_init(); retry_server_pushback_delay_pre_init(); retry_server_pushback_disabled_pre_init(); retry_streaming_pre_init(); @@ -347,6 +350,7 @@ void grpc_end2end_tests(int argc, char **argv, retry_non_retriable_status_before_recv_trailing_metadata_started(config); retry_recv_initial_metadata(config); retry_recv_message(config); + retry_send_op_fails(config); retry_server_pushback_delay(config); retry_server_pushback_disabled(config); retry_streaming(config); @@ -610,6 +614,10 @@ void grpc_end2end_tests(int argc, char **argv, retry_recv_message(config); continue; } + if (0 == strcmp("retry_send_op_fails", argv[i])) { + retry_send_op_fails(config); + continue; + } if (0 == strcmp("retry_server_pushback_delay", argv[i])) { retry_server_pushback_delay(config); continue; diff --git a/test/core/end2end/end2end_tests.cc b/test/core/end2end/end2end_tests.cc index a00026496ae..6a79916efae 100644 --- a/test/core/end2end/end2end_tests.cc +++ b/test/core/end2end/end2end_tests.cc @@ -149,6 +149,8 @@ extern void retry_recv_initial_metadata(grpc_end2end_test_config config); extern void retry_recv_initial_metadata_pre_init(void); extern void retry_recv_message(grpc_end2end_test_config config); extern void retry_recv_message_pre_init(void); +extern void retry_send_op_fails(grpc_end2end_test_config config); +extern void retry_send_op_fails_pre_init(void); extern void retry_server_pushback_delay(grpc_end2end_test_config config); extern void retry_server_pushback_delay_pre_init(void); extern void retry_server_pushback_disabled(grpc_end2end_test_config config); @@ -259,6 +261,7 @@ void grpc_end2end_tests_pre_init(void) { retry_non_retriable_status_before_recv_trailing_metadata_started_pre_init(); retry_recv_initial_metadata_pre_init(); retry_recv_message_pre_init(); + retry_send_op_fails_pre_init(); retry_server_pushback_delay_pre_init(); retry_server_pushback_disabled_pre_init(); retry_streaming_pre_init(); @@ -351,6 +354,7 @@ void grpc_end2end_tests(int argc, char **argv, retry_non_retriable_status_before_recv_trailing_metadata_started(config); retry_recv_initial_metadata(config); retry_recv_message(config); + retry_send_op_fails(config); retry_server_pushback_delay(config); retry_server_pushback_disabled(config); retry_streaming(config); @@ -618,6 +622,10 @@ void grpc_end2end_tests(int argc, char **argv, retry_recv_message(config); continue; } + if (0 == strcmp("retry_send_op_fails", argv[i])) { + retry_send_op_fails(config); + continue; + } if (0 == strcmp("retry_server_pushback_delay", argv[i])) { retry_server_pushback_delay(config); continue; diff --git a/test/core/end2end/fixtures/h2_proxy.cc b/test/core/end2end/fixtures/h2_proxy.cc index c6fd2ec79e6..fcc508ef629 100644 --- a/test/core/end2end/fixtures/h2_proxy.cc +++ b/test/core/end2end/fixtures/h2_proxy.cc @@ -47,14 +47,7 @@ static grpc_server* create_proxy_server(const char* port, static grpc_channel* create_proxy_client(const char* target, grpc_channel_args* client_args) { - // Disable retries in proxy client. - const char* args_to_remove = GRPC_ARG_ENABLE_RETRIES; - grpc_channel_args* new_args = - grpc_channel_args_copy_and_remove(client_args, &args_to_remove, 1); - grpc_channel* channel = - grpc_insecure_channel_create(target, new_args, nullptr); - grpc_channel_args_destroy(new_args); - return channel; + return grpc_insecure_channel_create(target, client_args, nullptr); } static const grpc_end2end_proxy_def proxy_def = {create_proxy_server, diff --git a/test/core/end2end/generate_tests.bzl b/test/core/end2end/generate_tests.bzl index 0e886dcb4cf..e8b9cefeee1 100755 --- a/test/core/end2end/generate_tests.bzl +++ b/test/core/end2end/generate_tests.bzl @@ -301,6 +301,7 @@ END2END_TESTS = { ), "retry_recv_initial_metadata": _test_options(needs_client_channel = True), "retry_recv_message": _test_options(needs_client_channel = True), + "retry_send_op_fails": _test_options(needs_client_channel = True), "retry_server_pushback_delay": _test_options(needs_client_channel = True), "retry_server_pushback_disabled": _test_options(needs_client_channel = True), "retry_streaming": _test_options(needs_client_channel = True), diff --git a/test/core/end2end/tests/retry_send_op_fails.cc b/test/core/end2end/tests/retry_send_op_fails.cc new file mode 100644 index 00000000000..8f70e8c0fee --- /dev/null +++ b/test/core/end2end/tests/retry_send_op_fails.cc @@ -0,0 +1,396 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "test/core/end2end/end2end_tests.h" + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/surface/channel_init.h" +#include "src/core/lib/transport/static_metadata.h" + +#include "test/core/end2end/cq_verifier.h" +#include "test/core/end2end/tests/cancel_test_helpers.h" + +static void* tag(intptr_t t) { return reinterpret_cast(t); } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char* test_name, + grpc_channel_args* client_args, + grpc_channel_args* server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_server(&f, server_args); + config.init_client(&f, client_args); + return f; +} + +static gpr_timespec n_seconds_from_now(int n) { + return grpc_timeout_seconds_to_deadline(n); +} + +static gpr_timespec five_seconds_from_now(void) { + return n_seconds_from_now(5); +} + +static void drain_cq(grpc_completion_queue* cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture* f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + nullptr) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = nullptr; +} + +static void shutdown_client(grpc_end2end_test_fixture* f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = nullptr; +} + +static void end_test(grpc_end2end_test_fixture* f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); +} + +// Tests failure on a send op batch: +// - 2 retries allowed for ABORTED status +// - on the first call attempt, the batch containing the +// send_initial_metadata op fails, and then the call returns ABORTED, +// all without ever going out on the wire +// - second attempt returns ABORTED but does not retry, because only 2 +// attempts are allowed +static void test_retry_send_op_fails(grpc_end2end_test_config config) { + grpc_call* c; + grpc_call* s; + grpc_op ops[6]; + grpc_op* op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_slice request_payload_slice = grpc_slice_from_static_string("foo"); + grpc_slice response_payload_slice = grpc_slice_from_static_string("bar"); + grpc_byte_buffer* request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_byte_buffer* response_payload = + grpc_raw_byte_buffer_create(&response_payload_slice, 1); + grpc_byte_buffer* request_payload_recv = nullptr; + grpc_byte_buffer* response_payload_recv = nullptr; + grpc_status_code status; + grpc_call_error error; + grpc_slice details; + int was_cancelled = 2; + char* peer; + + grpc_arg args[] = { + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_ENABLE_RETRIES), 1), + grpc_channel_arg_string_create( + const_cast(GRPC_ARG_SERVICE_CONFIG), + const_cast( + "{\n" + " \"methodConfig\": [ {\n" + " \"name\": [\n" + " { \"service\": \"service\", \"method\": \"method\" }\n" + " ],\n" + " \"retryPolicy\": {\n" + " \"maxAttempts\": 2,\n" + " \"initialBackoff\": \"1s\",\n" + " \"maxBackoff\": \"120s\",\n" + " \"backoffMultiplier\": 1.6,\n" + " \"retryableStatusCodes\": [ \"ABORTED\" ]\n" + " }\n" + " } ]\n" + "}")), + }; + grpc_channel_args client_args = {GPR_ARRAY_SIZE(args), args}; + grpc_end2end_test_fixture f = + begin_test(config, "retry_send_op_fails", &client_args, nullptr); + + cq_verifier* cqv = cq_verifier_create(f.cq); + + gpr_timespec deadline = five_seconds_from_now(); + c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq, + grpc_slice_from_static_string("/service/method"), + nullptr, deadline, nullptr); + GPR_ASSERT(c); + + peer = grpc_call_get_peer(c); + GPR_ASSERT(peer != nullptr); + gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer); + gpr_free(peer); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + grpc_slice status_details = grpc_slice_from_static_string("xyz"); + + // Start a batch containing send ops. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Start a batch containing recv ops. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload_recv; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(2), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Client send ops should now complete. + CQ_EXPECT_COMPLETION(cqv, tag(1), true); + cq_verify(cqv); + + // Server should get a call. + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(101)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(101), true); + cq_verify(cqv); + + // Server fails with status ABORTED. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_ABORTED; + op->data.send_status_from_server.status_details = &status_details; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(102), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // In principle, the server batch should complete before the client + // recv ops batch, but in the proxy fixtures, there are multiple threads + // involved, so the completion order tends to be a little racy. + CQ_EXPECT_COMPLETION(cqv, tag(102), true); + CQ_EXPECT_COMPLETION(cqv, tag(2), true); + cq_verify(cqv); + + GPR_ASSERT(status == GRPC_STATUS_ABORTED); + GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); + GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method")); + GPR_ASSERT(0 == call_details.flags); + GPR_ASSERT(was_cancelled == 0); + + // Make sure the "grpc-previous-rpc-attempts" header was sent in the retry. + bool found_retry_header = false; + for (size_t i = 0; i < request_metadata_recv.count; ++i) { + if (grpc_slice_eq(request_metadata_recv.metadata[i].key, + GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS)) { + GPR_ASSERT( + grpc_slice_eq(request_metadata_recv.metadata[i].value, GRPC_MDSTR_1)); + found_retry_header = true; + break; + } + } + GPR_ASSERT(found_retry_header); + + grpc_slice_unref(details); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(response_payload); + grpc_byte_buffer_destroy(request_payload_recv); + grpc_byte_buffer_destroy(response_payload_recv); + + grpc_call_unref(c); + grpc_call_unref(s); + + cq_verifier_destroy(cqv); + + end_test(&f); + config.tear_down_data(&f); +} + +namespace { + +// A filter that, for the first call it sees, will fail the batch +// containing send_initial_metadata and then fail the call with status +// ABORTED. All subsequent calls are allowed through without failures. +class FailFirstSendOpFilter { + public: + static grpc_channel_filter kFilterVtable; + + public: + class CallData { + public: + static grpc_error_handle Init(grpc_call_element* elem, + const grpc_call_element_args* args) { + new (elem->call_data) CallData(args); + return GRPC_ERROR_NONE; + } + + static void Destroy(grpc_call_element* elem, + const grpc_call_final_info* /*final_info*/, + grpc_closure* /*ignored*/) { + auto* calld = static_cast(elem->call_data); + calld->~CallData(); + } + + static void StartTransportStreamOpBatch( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + auto* chand = static_cast(elem->channel_data); + auto* calld = static_cast(elem->call_data); + if (!chand->seen_first_) { + chand->seen_first_ = true; + calld->fail_ = true; + } + if (calld->fail_ && !batch->cancel_stream) { + grpc_transport_stream_op_batch_finish_with_failure( + batch, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "FailFirstSendOpFilter failing batch"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_ABORTED), + calld->call_combiner_); + return; + } + grpc_call_next_op(elem, batch); + } + + private: + explicit CallData(const grpc_call_element_args* args) + : call_combiner_(args->call_combiner) {} + + grpc_core::CallCombiner* call_combiner_; + bool fail_ = false; + }; + + static grpc_error_handle Init(grpc_channel_element* elem, + grpc_channel_element_args* /*args*/) { + new (elem->channel_data) FailFirstSendOpFilter(); + return GRPC_ERROR_NONE; + } + + static void Destroy(grpc_channel_element* elem) { + auto* chand = static_cast(elem->channel_data); + chand->~FailFirstSendOpFilter(); + } + + bool seen_first_ = false; +}; + +grpc_channel_filter FailFirstSendOpFilter::kFilterVtable = { + CallData::StartTransportStreamOpBatch, + grpc_channel_next_op, + sizeof(CallData), + CallData::Init, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + CallData::Destroy, + sizeof(FailFirstSendOpFilter), + Init, + Destroy, + grpc_channel_next_get_info, + "FailFirstSendOpFilter", +}; + +bool g_enable_filter = false; + +bool MaybeAddFilter(grpc_channel_stack_builder* builder, void* /*arg*/) { + // Skip if filter is not enabled. + if (!g_enable_filter) return true; + // Skip on proxy (which explicitly disables retries). + const grpc_channel_args* args = + grpc_channel_stack_builder_get_channel_arguments(builder); + if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES, true)) { + return true; + } + // Install filter. + return grpc_channel_stack_builder_prepend_filter( + builder, &FailFirstSendOpFilter::kFilterVtable, nullptr, nullptr); +} + +void InitPlugin(void) { + grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, 0, MaybeAddFilter, + nullptr); +} + +void DestroyPlugin(void) {} + +} // namespace + +void retry_send_op_fails(grpc_end2end_test_config config) { + GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL); + g_enable_filter = true; + test_retry_send_op_fails(config); + g_enable_filter = false; +} + +void retry_send_op_fails_pre_init(void) { + grpc_register_plugin(InitPlugin, DestroyPlugin); +}