|
|
|
@ -330,6 +330,11 @@ class RetryFilter::CallData { |
|
|
|
|
// Otherwise, returns nullptr.
|
|
|
|
|
BatchData* MaybeCreateBatchForReplay(); |
|
|
|
|
|
|
|
|
|
// Adds a closure to closures that will execute batch in the call combiner.
|
|
|
|
|
void AddClosureForBatch(grpc_transport_stream_op_batch* batch, |
|
|
|
|
const char* reason, |
|
|
|
|
CallCombinerClosureList* closures); |
|
|
|
|
|
|
|
|
|
// Adds batches for pending batches to closures.
|
|
|
|
|
void AddBatchesForPendingBatches(CallCombinerClosureList* closures); |
|
|
|
|
|
|
|
|
@ -440,8 +445,11 @@ class RetryFilter::CallData { |
|
|
|
|
|
|
|
|
|
void CreateCallAttempt(); |
|
|
|
|
|
|
|
|
|
// Adds a closure to closures that will execute batch in the call combiner.
|
|
|
|
|
// Adds a closure to closures that will execute batch on lb_call in the
|
|
|
|
|
// call combiner.
|
|
|
|
|
void AddClosureForBatch(grpc_transport_stream_op_batch* batch, |
|
|
|
|
ClientChannel::LoadBalancedCall* lb_call, |
|
|
|
|
const char* reason, |
|
|
|
|
CallCombinerClosureList* closures); |
|
|
|
|
|
|
|
|
|
RetryFilter* chand_; |
|
|
|
@ -630,7 +638,7 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld) |
|
|
|
|
retry_dispatched_(false) { |
|
|
|
|
lb_call_ = calld->CreateLoadBalancedCall(); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: attempt=%p: create lb_call=%p", |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: create lb_call=%p", |
|
|
|
|
calld->chand_, calld, this, lb_call_.get()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -676,9 +684,9 @@ bool RetryFilter::CallData::CallAttempt::PendingBatchIsUnstarted( |
|
|
|
|
void RetryFilter::CallData::CallAttempt::StartInternalRecvTrailingMetadata() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: call failed but recv_trailing_metadata not " |
|
|
|
|
"started; starting it internally", |
|
|
|
|
calld_->chand_, calld_); |
|
|
|
|
"chand=%p calld=%p attempt=%p: call failed but " |
|
|
|
|
"recv_trailing_metadata not started; starting it internally", |
|
|
|
|
calld_->chand_, calld_, this); |
|
|
|
|
} |
|
|
|
|
// Create batch_data with 2 refs, since this batch will be unreffed twice:
|
|
|
|
|
// once for the recv_trailing_metadata_ready callback when the batch
|
|
|
|
@ -702,9 +710,9 @@ RetryFilter::CallData::CallAttempt::MaybeCreateBatchForReplay() { |
|
|
|
|
!calld_->pending_send_initial_metadata_) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: replaying previously completed " |
|
|
|
|
"chand=%p calld=%p attempt=%p: replaying previously completed " |
|
|
|
|
"send_initial_metadata op", |
|
|
|
|
calld_->chand_, calld_); |
|
|
|
|
calld_->chand_, calld_, this); |
|
|
|
|
} |
|
|
|
|
replay_batch_data = CreateBatch(1, true /* set_on_complete */); |
|
|
|
|
replay_batch_data->AddRetriableSendInitialMetadataOp(); |
|
|
|
@ -716,9 +724,9 @@ RetryFilter::CallData::CallAttempt::MaybeCreateBatchForReplay() { |
|
|
|
|
!calld_->pending_send_message_) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: replaying previously completed " |
|
|
|
|
"chand=%p calld=%p attempt=%p: replaying previously completed " |
|
|
|
|
"send_message op", |
|
|
|
|
calld_->chand_, calld_); |
|
|
|
|
calld_->chand_, calld_, this); |
|
|
|
|
} |
|
|
|
|
if (replay_batch_data == nullptr) { |
|
|
|
|
replay_batch_data = CreateBatch(1, true /* set_on_complete */); |
|
|
|
@ -735,9 +743,9 @@ RetryFilter::CallData::CallAttempt::MaybeCreateBatchForReplay() { |
|
|
|
|
!calld_->pending_send_trailing_metadata_) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: replaying previously completed " |
|
|
|
|
"chand=%p calld=%p attempt=%p: replaying previously completed " |
|
|
|
|
"send_trailing_metadata op", |
|
|
|
|
calld_->chand_, calld_); |
|
|
|
|
calld_->chand_, calld_, this); |
|
|
|
|
} |
|
|
|
|
if (replay_batch_data == nullptr) { |
|
|
|
|
replay_batch_data = CreateBatch(1, true /* set_on_complete */); |
|
|
|
@ -747,6 +755,17 @@ RetryFilter::CallData::CallAttempt::MaybeCreateBatchForReplay() { |
|
|
|
|
return replay_batch_data; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RetryFilter::CallData::CallAttempt::AddClosureForBatch( |
|
|
|
|
grpc_transport_stream_op_batch* batch, const char* reason, |
|
|
|
|
CallCombinerClosureList* closures) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: adding batch (%s): %s", |
|
|
|
|
calld_->chand_, calld_, this, reason, |
|
|
|
|
grpc_transport_stream_op_batch_string(batch).c_str()); |
|
|
|
|
} |
|
|
|
|
calld_->AddClosureForBatch(batch, lb_call_.get(), reason, closures); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches( |
|
|
|
|
CallCombinerClosureList* closures) { |
|
|
|
|
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld_->pending_batches_); ++i) { |
|
|
|
@ -811,7 +830,10 @@ void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches( |
|
|
|
|
} |
|
|
|
|
// If we're already committed, just send the batch as-is.
|
|
|
|
|
if (calld_->retry_committed_) { |
|
|
|
|
calld_->AddClosureForBatch(batch, closures); |
|
|
|
|
AddClosureForBatch( |
|
|
|
|
batch, |
|
|
|
|
"start non-replayable pending batch on call attempt after commit", |
|
|
|
|
closures); |
|
|
|
|
calld_->PendingBatchClear(pending); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
@ -852,7 +874,9 @@ void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches( |
|
|
|
|
if (batch->recv_trailing_metadata) { |
|
|
|
|
batch_data->AddRetriableRecvTrailingMetadataOp(); |
|
|
|
|
} |
|
|
|
|
calld_->AddClosureForBatch(batch_data->batch(), closures); |
|
|
|
|
AddClosureForBatch(batch_data->batch(), |
|
|
|
|
"start replayable pending batch on call attempt", |
|
|
|
|
closures); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -861,7 +885,8 @@ void RetryFilter::CallData::CallAttempt::AddRetriableBatches( |
|
|
|
|
// Replay previously-returned send_* ops if needed.
|
|
|
|
|
BatchData* replay_batch_data = MaybeCreateBatchForReplay(); |
|
|
|
|
if (replay_batch_data != nullptr) { |
|
|
|
|
calld_->AddClosureForBatch(replay_batch_data->batch(), closures); |
|
|
|
|
AddClosureForBatch(replay_batch_data->batch(), |
|
|
|
|
"start replay batch on call attempt", closures); |
|
|
|
|
} |
|
|
|
|
// Now add pending batches.
|
|
|
|
|
AddBatchesForPendingBatches(closures); |
|
|
|
@ -869,8 +894,9 @@ void RetryFilter::CallData::CallAttempt::AddRetriableBatches( |
|
|
|
|
|
|
|
|
|
void RetryFilter::CallData::CallAttempt::StartRetriableBatches() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches", |
|
|
|
|
calld_->chand_, calld_); |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p attempt=%p: constructing retriable batches", |
|
|
|
|
calld_->chand_, calld_, this); |
|
|
|
|
} |
|
|
|
|
// Construct list of closures to execute, one for each pending batch.
|
|
|
|
|
CallCombinerClosureList closures; |
|
|
|
@ -879,9 +905,9 @@ void RetryFilter::CallData::CallAttempt::StartRetriableBatches() { |
|
|
|
|
// Start batches on LB call.
|
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: starting %" PRIuPTR |
|
|
|
|
"chand=%p calld=%p attempt=%p: starting %" PRIuPTR |
|
|
|
|
" retriable batches on lb_call=%p", |
|
|
|
|
calld_->chand_, calld_, closures.size(), lb_call()); |
|
|
|
|
calld_->chand_, calld_, this, closures.size(), lb_call()); |
|
|
|
|
} |
|
|
|
|
closures.RunClosures(calld_->call_combiner_); |
|
|
|
|
} |
|
|
|
@ -956,8 +982,9 @@ bool RetryFilter::CallData::CallAttempt::BatchData::MaybeRetry( |
|
|
|
|
// (i.e., it includes either recv_message or recv_initial_metadata).
|
|
|
|
|
if (call_attempt_->retry_dispatched_) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", |
|
|
|
|
calld->chand_, calld); |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p attempt=%p: retry already dispatched", |
|
|
|
|
calld->chand_, calld, call_attempt_.get()); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
@ -967,8 +994,8 @@ bool RetryFilter::CallData::CallAttempt::BatchData::MaybeRetry( |
|
|
|
|
calld->retry_throttle_data_->RecordSuccess(); |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", calld->chand_, |
|
|
|
|
calld); |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: call succeeded", |
|
|
|
|
calld->chand_, calld, call_attempt_.get()); |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
@ -976,8 +1003,10 @@ bool RetryFilter::CallData::CallAttempt::BatchData::MaybeRetry( |
|
|
|
|
if (!calld->retry_policy_->retryable_status_codes().Contains(status)) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: status %s not configured as retryable", |
|
|
|
|
calld->chand_, calld, grpc_status_code_to_string(status)); |
|
|
|
|
"chand=%p calld=%p attempt=%p: status %s not configured as " |
|
|
|
|
"retryable", |
|
|
|
|
calld->chand_, calld, call_attempt_.get(), |
|
|
|
|
grpc_status_code_to_string(status)); |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
@ -991,16 +1020,17 @@ bool RetryFilter::CallData::CallAttempt::BatchData::MaybeRetry( |
|
|
|
|
if (calld->retry_throttle_data_ != nullptr && |
|
|
|
|
!calld->retry_throttle_data_->RecordFailure()) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", calld->chand_, |
|
|
|
|
calld); |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: retries throttled", |
|
|
|
|
calld->chand_, calld, call_attempt_.get()); |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
// Check whether the call is committed.
|
|
|
|
|
if (calld->retry_committed_) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", |
|
|
|
|
calld->chand_, calld); |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p attempt=%p: retries already committed", |
|
|
|
|
calld->chand_, calld, call_attempt_.get()); |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
@ -1008,8 +1038,10 @@ bool RetryFilter::CallData::CallAttempt::BatchData::MaybeRetry( |
|
|
|
|
++calld->num_attempts_completed_; |
|
|
|
|
if (calld->num_attempts_completed_ >= calld->retry_policy_->max_attempts()) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", |
|
|
|
|
calld->chand_, calld, calld->retry_policy_->max_attempts()); |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p attempt=%p: exceeded %d retry attempts", |
|
|
|
|
calld->chand_, calld, call_attempt_.get(), |
|
|
|
|
calld->retry_policy_->max_attempts()); |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
@ -1021,14 +1053,17 @@ bool RetryFilter::CallData::CallAttempt::BatchData::MaybeRetry( |
|
|
|
|
if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: not retrying due to server push-back", |
|
|
|
|
calld->chand_, calld); |
|
|
|
|
"chand=%p calld=%p attempt=%p: not retrying due to server " |
|
|
|
|
"push-back", |
|
|
|
|
calld->chand_, calld, call_attempt_.get()); |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} else { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms", |
|
|
|
|
calld->chand_, calld, ms); |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p calld=%p attempt=%p: server push-back: retry in %u ms", |
|
|
|
|
calld->chand_, calld, call_attempt_.get(), ms); |
|
|
|
|
} |
|
|
|
|
server_pushback_ms = static_cast<grpc_millis>(ms); |
|
|
|
|
} |
|
|
|
@ -1082,8 +1117,10 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady( |
|
|
|
|
CallData* calld = call_attempt->calld_; |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: got recv_initial_metadata_ready, error=%s", |
|
|
|
|
calld->chand_, calld, grpc_error_std_string(error).c_str()); |
|
|
|
|
"chand=%p calld=%p attempt=%p: got recv_initial_metadata_ready, " |
|
|
|
|
"error=%s", |
|
|
|
|
calld->chand_, calld, call_attempt, |
|
|
|
|
grpc_error_std_string(error).c_str()); |
|
|
|
|
} |
|
|
|
|
call_attempt->completed_recv_initial_metadata_ = true; |
|
|
|
|
// If a retry was already dispatched, then we're not going to use the
|
|
|
|
@ -1104,9 +1141,9 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady( |
|
|
|
|
!call_attempt->completed_recv_trailing_metadata_)) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: deferring recv_initial_metadata_ready " |
|
|
|
|
"(Trailers-Only)", |
|
|
|
|
calld->chand_, calld); |
|
|
|
|
"chand=%p calld=%p attempt=%p: deferring " |
|
|
|
|
"recv_initial_metadata_ready (Trailers-Only)", |
|
|
|
|
calld->chand_, calld, call_attempt); |
|
|
|
|
} |
|
|
|
|
call_attempt->recv_initial_metadata_ready_deferred_batch_ = |
|
|
|
|
std::move(batch_data); |
|
|
|
@ -1168,8 +1205,10 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady( |
|
|
|
|
CallAttempt* call_attempt = batch_data->call_attempt_.get(); |
|
|
|
|
CallData* calld = call_attempt->calld_; |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s", |
|
|
|
|
calld->chand_, calld, grpc_error_std_string(error).c_str()); |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p attempt=%p: got recv_message_ready, error=%s", |
|
|
|
|
calld->chand_, calld, call_attempt, |
|
|
|
|
grpc_error_std_string(error).c_str()); |
|
|
|
|
} |
|
|
|
|
++call_attempt->completed_recv_message_count_; |
|
|
|
|
// If a retry was already dispatched, then we're not going to use the
|
|
|
|
@ -1189,9 +1228,9 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady( |
|
|
|
|
!call_attempt->completed_recv_trailing_metadata_)) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: deferring recv_message_ready (nullptr " |
|
|
|
|
"message and recv_trailing_metadata pending)", |
|
|
|
|
calld->chand_, calld); |
|
|
|
|
"chand=%p calld=%p attempt=%p: deferring recv_message_ready " |
|
|
|
|
"(nullptr message and recv_trailing_metadata pending)", |
|
|
|
|
calld->chand_, calld, call_attempt); |
|
|
|
|
} |
|
|
|
|
call_attempt->recv_message_ready_deferred_batch_ = std::move(batch_data); |
|
|
|
|
call_attempt->recv_message_error_ = GRPC_ERROR_REF(error); |
|
|
|
@ -1315,9 +1354,9 @@ void RetryFilter::CallData::CallAttempt::BatchData:: |
|
|
|
|
if (call_attempt_->PendingBatchIsUnstarted(pending)) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: failing unstarted pending batch at " |
|
|
|
|
"index %" PRIuPTR, |
|
|
|
|
calld->chand_, calld, i); |
|
|
|
|
"chand=%p calld=%p attempt=%p: failing unstarted pending " |
|
|
|
|
"batch at index %" PRIuPTR, |
|
|
|
|
calld->chand_, calld, call_attempt_.get(), i); |
|
|
|
|
} |
|
|
|
|
closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error), |
|
|
|
|
"failing on_complete for pending batch"); |
|
|
|
@ -1352,8 +1391,10 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady( |
|
|
|
|
CallData* calld = call_attempt->calld_; |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s", |
|
|
|
|
calld->chand_, calld, grpc_error_std_string(error).c_str()); |
|
|
|
|
"chand=%p calld=%p attempt=%p: got recv_trailing_metadata_ready, " |
|
|
|
|
"error=%s", |
|
|
|
|
calld->chand_, calld, call_attempt, |
|
|
|
|
grpc_error_std_string(error).c_str()); |
|
|
|
|
} |
|
|
|
|
call_attempt->completed_recv_trailing_metadata_ = true; |
|
|
|
|
// Get the call's status and check for server pushback metadata.
|
|
|
|
@ -1366,8 +1407,10 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady( |
|
|
|
|
&server_pushback_md, &is_lb_drop); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_INFO, "chand=%p calld=%p: call finished, status=%s is_lb_drop=%d", |
|
|
|
|
calld->chand_, calld, grpc_status_code_to_string(status), is_lb_drop); |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p calld=%p attempt=%p: call finished, status=%s is_lb_drop=%d", |
|
|
|
|
calld->chand_, calld, call_attempt, grpc_status_code_to_string(status), |
|
|
|
|
is_lb_drop); |
|
|
|
|
} |
|
|
|
|
// Check if we should retry.
|
|
|
|
|
if (batch_data->MaybeRetry(status, server_pushback_md, is_lb_drop)) { |
|
|
|
@ -1443,8 +1486,9 @@ void RetryFilter::CallData::CallAttempt::BatchData:: |
|
|
|
|
if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: starting next batch for pending send op(s)", |
|
|
|
|
calld->chand_, calld); |
|
|
|
|
"chand=%p calld=%p attempt=%p: starting next batch for pending " |
|
|
|
|
"send op(s)", |
|
|
|
|
calld->chand_, calld, call_attempt_.get()); |
|
|
|
|
} |
|
|
|
|
call_attempt_->AddRetriableBatches(closures); |
|
|
|
|
} |
|
|
|
@ -1456,8 +1500,10 @@ void RetryFilter::CallData::CallAttempt::BatchData::OnComplete( |
|
|
|
|
CallAttempt* call_attempt = batch_data->call_attempt_.get(); |
|
|
|
|
CallData* calld = call_attempt->calld_; |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s", |
|
|
|
|
calld->chand_, calld, grpc_error_std_string(error).c_str(), |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p attempt=%p: got on_complete, error=%s, batch=%s", |
|
|
|
|
calld->chand_, calld, call_attempt, |
|
|
|
|
grpc_error_std_string(error).c_str(), |
|
|
|
|
grpc_transport_stream_op_batch_string(&batch_data->batch_).c_str()); |
|
|
|
|
} |
|
|
|
|
// Update bookkeeping in call_attempt.
|
|
|
|
@ -1552,9 +1598,12 @@ void RetryFilter::CallData::CallAttempt::BatchData:: |
|
|
|
|
AddRetriableSendMessageOp() { |
|
|
|
|
auto* calld = call_attempt_->calld_; |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]", |
|
|
|
|
calld->chand_, calld, call_attempt_->started_send_message_count_); |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"chand=%p calld=%p attempt=%p: starting calld->send_messages[%" PRIuPTR |
|
|
|
|
"]", |
|
|
|
|
calld->chand_, calld, call_attempt_.get(), |
|
|
|
|
call_attempt_->started_send_message_count_); |
|
|
|
|
} |
|
|
|
|
ByteStreamCache* cache = |
|
|
|
|
calld->send_messages_[call_attempt_->started_send_message_count_]; |
|
|
|
@ -1635,7 +1684,8 @@ grpc_error_handle RetryFilter::CallData::Init( |
|
|
|
|
auto* chand = static_cast<RetryFilter*>(elem->channel_data); |
|
|
|
|
new (elem->call_data) CallData(chand, *args); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p: created call=%p", chand, elem->call_data); |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: created call", chand, |
|
|
|
|
elem->call_data); |
|
|
|
|
} |
|
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
|
} |
|
|
|
@ -1824,16 +1874,13 @@ void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) { |
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
void RetryFilter::CallData::AddClosureForBatch( |
|
|
|
|
grpc_transport_stream_op_batch* batch, CallCombinerClosureList* closures) { |
|
|
|
|
batch->handler_private.extra_arg = call_attempt_->lb_call(); |
|
|
|
|
grpc_transport_stream_op_batch* batch, |
|
|
|
|
ClientChannel::LoadBalancedCall* lb_call, const char* reason, |
|
|
|
|
CallCombinerClosureList* closures) { |
|
|
|
|
batch->handler_private.extra_arg = lb_call; |
|
|
|
|
GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner, |
|
|
|
|
batch, grpc_schedule_on_exec_ctx); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on LB call: %s", |
|
|
|
|
chand_, this, grpc_transport_stream_op_batch_string(batch).c_str()); |
|
|
|
|
} |
|
|
|
|
closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, |
|
|
|
|
"start_batch_on_lb_call"); |
|
|
|
|
closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, reason); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -1898,7 +1945,7 @@ void RetryFilter::CallData::FreeCachedSendMessage(size_t idx) { |
|
|
|
|
|
|
|
|
|
void RetryFilter::CallData::FreeCachedSendTrailingMetadata() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "chand_=%p calld=%p: destroying send_trailing_metadata", |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: destroying send_trailing_metadata", |
|
|
|
|
chand_, this); |
|
|
|
|
} |
|
|
|
|
grpc_metadata_batch_destroy(&send_trailing_metadata_); |
|
|
|
@ -1937,7 +1984,7 @@ RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchesAdd( |
|
|
|
|
const size_t idx = GetBatchIndex(batch); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand_=%p calld=%p: adding pending batch at index %" PRIuPTR, |
|
|
|
|
"chand=%p calld=%p: adding pending batch at index %" PRIuPTR, |
|
|
|
|
chand_, this, idx); |
|
|
|
|
} |
|
|
|
|
PendingBatch* pending = &pending_batches_[idx]; |
|
|
|
|