|
|
|
@ -168,8 +168,8 @@ RetryFilter::LegacyCallData::CallAttempt::CallAttempt( |
|
|
|
|
|
|
|
|
|
RetryFilter::LegacyCallData::CallAttempt::~CallAttempt() { |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) |
|
|
|
|
<< "chand=" << calld_->chand_ << " calld=" << calld_ < < < < |
|
|
|
|
" attempt=" << this << ": destroying call attempt"; |
|
|
|
|
<< "chand=" << calld_->chand_ << " calld=" << calld_ |
|
|
|
|
<< " attempt=" << this << ": destroying call attempt"; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RetryFilter::LegacyCallData::CallAttempt:: |
|
|
|
@ -520,8 +520,8 @@ void RetryFilter::LegacyCallData::CallAttempt::AddRetriableBatches( |
|
|
|
|
|
|
|
|
|
void RetryFilter::LegacyCallData::CallAttempt::StartRetriableBatches() { |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) |
|
|
|
|
<< "chand=" << calld_->chand_ << " calld=" << calld_ < < < < |
|
|
|
|
" attempt=" << this << ": constructing retriable batches"; |
|
|
|
|
<< "chand=" << calld_->chand_ << " calld=" << calld_ |
|
|
|
|
<< " attempt=" << this << ": constructing retriable batches"; |
|
|
|
|
// Construct list of closures to execute, one for each pending batch.
|
|
|
|
|
CallCombinerClosureList closures; |
|
|
|
|
AddRetriableBatches(&closures); |
|
|
|
@ -555,8 +555,8 @@ bool RetryFilter::LegacyCallData::CallAttempt::ShouldRetry( |
|
|
|
|
calld_->retry_throttle_data_->RecordSuccess(); |
|
|
|
|
} |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) |
|
|
|
|
<< "chand=" << calld_->chand_ << " calld=" << calld_ < < < < |
|
|
|
|
" attempt=" << this << ": call succeeded"; |
|
|
|
|
<< "chand=" << calld_->chand_ << " calld=" << calld_ |
|
|
|
|
<< " attempt=" << this << ": call succeeded"; |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
// Status is not OK. Check whether the status is retryable.
|
|
|
|
@ -580,15 +580,15 @@ bool RetryFilter::LegacyCallData::CallAttempt::ShouldRetry( |
|
|
|
|
if (calld_->retry_throttle_data_ != nullptr && |
|
|
|
|
!calld_->retry_throttle_data_->RecordFailure()) { |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) |
|
|
|
|
<< "chand=" << calld_->chand_ << " calld=" << calld_ < < < < |
|
|
|
|
" attempt=" << this << ": retries throttled"; |
|
|
|
|
<< "chand=" << calld_->chand_ << " calld=" << calld_ |
|
|
|
|
<< " attempt=" << this << ": retries throttled"; |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
// Check whether the call is committed.
|
|
|
|
|
if (calld_->retry_committed_) { |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) |
|
|
|
|
<< "chand=" << calld_->chand_ << " calld=" << calld_ < < < < |
|
|
|
|
" attempt=" << this << ": retries already committed"; |
|
|
|
|
<< "chand=" << calld_->chand_ << " calld=" << calld_ |
|
|
|
|
<< " attempt=" << this << ": retries already committed"; |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
// Check whether we have retries remaining.
|
|
|
|
@ -1291,8 +1291,8 @@ void RetryFilter::LegacyCallData::CallAttempt::BatchData::OnComplete( |
|
|
|
|
if (GPR_UNLIKELY(!calld->retry_committed_ && !error.ok() && |
|
|
|
|
!call_attempt->completed_recv_trailing_metadata_)) { |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) |
|
|
|
|
<< "chand=" << calld->chand_ << " calld=" << calld < < < < |
|
|
|
|
" attempt=" << call_attempt << ": deferring on_complete"; |
|
|
|
|
<< "chand=" << calld->chand_ << " calld=" << calld |
|
|
|
|
<< " attempt=" << call_attempt << ": deferring on_complete"; |
|
|
|
|
call_attempt->on_complete_deferred_batches_.emplace_back( |
|
|
|
|
std::move(batch_data), error); |
|
|
|
|
CallCombinerClosureList closures; |
|
|
|
@ -1474,8 +1474,7 @@ grpc_error_handle RetryFilter::LegacyCallData::Init( |
|
|
|
|
auto* chand = static_cast<RetryFilter*>(elem->channel_data); |
|
|
|
|
new (elem->call_data) RetryFilter::LegacyCallData(chand, *args); |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) |
|
|
|
|
<< "chand=" << chand << " calld=" << elem->call_data < < < < |
|
|
|
|
": created call"; |
|
|
|
|
<< "chand=" << chand << " calld=" << elem->call_data << ": created call"; |
|
|
|
|
return absl::OkStatus(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1601,9 +1600,8 @@ void RetryFilter::LegacyCallData::StartTransportStreamOpBatch( |
|
|
|
|
} |
|
|
|
|
// Cancel retry timer if needed.
|
|
|
|
|
if (retry_timer_handle_.has_value()) { |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) |
|
|
|
|
<< "chand=" << chand_ << " calld=" << this < < < < |
|
|
|
|
": cancelling retry timer"; |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) << "chand=" << chand_ << " calld=" << this |
|
|
|
|
<< ": cancelling retry timer"; |
|
|
|
|
if (chand_->event_engine()->Cancel(*retry_timer_handle_)) { |
|
|
|
|
GRPC_CALL_STACK_UNREF(owning_call_, "OnRetryTimer"); |
|
|
|
|
} |
|
|
|
@ -1665,15 +1663,15 @@ void RetryFilter::LegacyCallData::StartTransportStreamOpBatch( |
|
|
|
|
// The attempt will automatically start any necessary replays or
|
|
|
|
|
// pending batches.
|
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) |
|
|
|
|
<< "chand=" << chand_ << " calld=" << this < < < < |
|
|
|
|
": creating call attempt"; |
|
|
|
|
<< "chand=" << chand_ << " calld=" << this << ": creating call attempt"; |
|
|
|
|
retry_codepath_started_ = true; |
|
|
|
|
CreateCallAttempt(/*is_transparent_retry=*/false); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Send batches to call attempt.
|
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) << "chand=" << chand_ << " calld=" << this < < < < |
|
|
|
|
": starting batch on attempt=" << call_attempt_.get(); |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) |
|
|
|
|
<< "chand=" << chand_ << " calld=" << this |
|
|
|
|
<< ": starting batch on attempt=" << call_attempt_.get(); |
|
|
|
|
call_attempt_->StartRetriableBatches(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1728,23 +1726,22 @@ void RetryFilter::LegacyCallData::MaybeCacheSendOpsForBatch( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RetryFilter::LegacyCallData::FreeCachedSendInitialMetadata() { |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) << "chand=" << chand_ << " calld=" << this < < < < |
|
|
|
|
": destroying send_initial_metadata"; |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) << "chand=" << chand_ << " calld=" << this |
|
|
|
|
<< ": destroying send_initial_metadata"; |
|
|
|
|
send_initial_metadata_.Clear(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RetryFilter::LegacyCallData::FreeCachedSendMessage(size_t idx) { |
|
|
|
|
if (send_messages_[idx].slices != nullptr) { |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) |
|
|
|
|
<< "chand=" << chand_ << " calld=" << this < < < < |
|
|
|
|
": destroying send_messages[" << idx << "]"; |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) << "chand=" << chand_ << " calld=" << this |
|
|
|
|
<< ": destroying send_messages[" << idx << "]"; |
|
|
|
|
Destruct(std::exchange(send_messages_[idx].slices, nullptr)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RetryFilter::LegacyCallData::FreeCachedSendTrailingMetadata() { |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) << "chand=" << chand_ << " calld=" << this < < < < |
|
|
|
|
": destroying send_trailing_metadata"; |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) << "chand=" << chand_ << " calld=" << this |
|
|
|
|
<< ": destroying send_trailing_metadata"; |
|
|
|
|
send_trailing_metadata_.Clear(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1780,8 +1777,8 @@ RetryFilter::LegacyCallData::PendingBatch* |
|
|
|
|
RetryFilter::LegacyCallData::PendingBatchesAdd( |
|
|
|
|
grpc_transport_stream_op_batch* batch) { |
|
|
|
|
const size_t idx = GetBatchIndex(batch); |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) << "chand=" << chand_ << " calld=" << this < < < < |
|
|
|
|
": adding pending batch at index " << idx; |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) << "chand=" << chand_ << " calld=" << this |
|
|
|
|
<< ": adding pending batch at index " << idx; |
|
|
|
|
PendingBatch* pending = &pending_batches_[idx]; |
|
|
|
|
CHECK_EQ(pending->batch, nullptr); |
|
|
|
|
pending->batch = batch; |
|
|
|
@ -1808,9 +1805,8 @@ RetryFilter::LegacyCallData::PendingBatchesAdd( |
|
|
|
|
// ops have already been sent, and we commit to that attempt.
|
|
|
|
|
if (GPR_UNLIKELY(bytes_buffered_for_retry_ > |
|
|
|
|
chand_->per_rpc_retry_buffer_size())) { |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) |
|
|
|
|
<< "chand=" << chand_ << " calld=" << this < < < < |
|
|
|
|
": exceeded retry buffer size, committing"; |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) << "chand=" << chand_ << " calld=" << this |
|
|
|
|
<< ": exceeded retry buffer size, committing"; |
|
|
|
|
RetryCommit(call_attempt_.get()); |
|
|
|
|
} |
|
|
|
|
return pending; |
|
|
|
@ -1843,9 +1839,8 @@ void RetryFilter::LegacyCallData::MaybeClearPendingBatch( |
|
|
|
|
(!batch->recv_trailing_metadata || |
|
|
|
|
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready == |
|
|
|
|
nullptr)) { |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) |
|
|
|
|
<< "chand=" << chand_ << " calld=" << this < < < < |
|
|
|
|
": clearing pending batch"; |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) << "chand=" << chand_ << " calld=" << this |
|
|
|
|
<< ": clearing pending batch"; |
|
|
|
|
PendingBatchClear(pending); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1899,8 +1894,8 @@ RetryFilter::LegacyCallData::PendingBatchFind(const char* log_message, |
|
|
|
|
grpc_transport_stream_op_batch* batch = pending->batch; |
|
|
|
|
if (batch != nullptr && predicate(batch)) { |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) |
|
|
|
|
<< "chand=" << chand_ << " calld=" << this << ": " < < < < |
|
|
|
|
log_message << " pending batch at index " << i; |
|
|
|
|
<< "chand=" << chand_ << " calld=" << this << ": " << log_message |
|
|
|
|
<< " pending batch at index " << i; |
|
|
|
|
return pending; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1914,8 +1909,8 @@ RetryFilter::LegacyCallData::PendingBatchFind(const char* log_message, |
|
|
|
|
void RetryFilter::LegacyCallData::RetryCommit(CallAttempt* call_attempt) { |
|
|
|
|
if (retry_committed_) return; |
|
|
|
|
retry_committed_ = true; |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) << "chand=" << chand_ << " calld=" << this < < < < |
|
|
|
|
": committing retries"; |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) |
|
|
|
|
<< "chand=" << chand_ << " calld=" << this << ": committing retries"; |
|
|
|
|
if (call_attempt != nullptr) { |
|
|
|
|
// If the call attempt's LB call has been committed, invoke the
|
|
|
|
|
// call's on_commit callback.
|
|
|
|
@ -1980,8 +1975,8 @@ void RetryFilter::LegacyCallData::OnRetryTimerLocked( |
|
|
|
|
|
|
|
|
|
void RetryFilter::LegacyCallData::AddClosureToStartTransparentRetry( |
|
|
|
|
CallCombinerClosureList* closures) { |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) << "chand=" << chand_ << " calld=" << this < < < < |
|
|
|
|
": scheduling transparent retry"; |
|
|
|
|
GRPC_TRACE_LOG(retry, INFO) << "chand=" << chand_ << " calld=" << this |
|
|
|
|
<< ": scheduling transparent retry"; |
|
|
|
|
GRPC_CALL_STACK_REF(owning_call_, "OnRetryTimer"); |
|
|
|
|
GRPC_CLOSURE_INIT(&retry_closure_, StartTransparentRetry, this, nullptr); |
|
|
|
|
closures->Add(&retry_closure_, absl::OkStatus(), "start transparent retry"); |
|
|
|
|