clean up retry filter refcount handling (#26382)

pull/26383/head
Mark D. Roth 4 years ago committed by GitHub
parent a99ead4bf4
commit 3c55491043
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 128
      src/core/ext/filters/client_channel/retry_filter.cc

@ -235,7 +235,7 @@ class RetryFilter::CallData {
// We allocate one struct on the arena for each attempt at starting a
// batch on a given LB call.
class BatchData
: public RefCounted<CallAttempt, PolymorphicRefCount, kUnrefCallDtor> {
: public RefCounted<BatchData, PolymorphicRefCount, kUnrefCallDtor> {
public:
BatchData(RefCountedPtr<CallAttempt> call_attempt, int refcount,
bool set_on_complete);
@ -389,11 +389,11 @@ class RetryFilter::CallData {
bool started_recv_trailing_metadata_ : 1;
bool completed_recv_trailing_metadata_ : 1;
// State for callback processing.
BatchData* recv_initial_metadata_ready_deferred_batch_ = nullptr;
RefCountedPtr<BatchData> recv_initial_metadata_ready_deferred_batch_;
grpc_error_handle recv_initial_metadata_error_ = GRPC_ERROR_NONE;
BatchData* recv_message_ready_deferred_batch_ = nullptr;
RefCountedPtr<BatchData> recv_message_ready_deferred_batch_;
grpc_error_handle recv_message_error_ = GRPC_ERROR_NONE;
BatchData* recv_trailing_metadata_internal_batch_ = nullptr;
RefCountedPtr<BatchData> recv_trailing_metadata_internal_batch_;
// NOTE: Do not move this next to the metadata bitfields above. That would
// save space but will also result in a data race because compiler
// will generate a 2 byte store which overwrites the meta-data
@ -493,16 +493,6 @@ class RetryFilter::CallData {
grpc_timer retry_timer_ ABSL_GUARDED_BY(timer_mu_);
grpc_closure retry_closure_;
// The number of batches containing send ops that are currently in-flight
// on any call attempt.
// We hold a ref to the call stack while this is non-zero, since replay
// batches may not complete until after all callbacks have been returned
// to the surface, and we need to make sure that the call is not destroyed
// until all of these batches have completed.
// Note that we actually only need to track replay batches, but it's
// easier to track all batches with send ops.
int num_in_flight_call_attempt_send_batches_ = 0;
// Cached data for retrying send ops.
// send_initial_metadata
bool seen_send_initial_metadata_ = false;
@ -696,7 +686,7 @@ void RetryFilter::CallData::CallAttempt::StartInternalRecvTrailingMetadata() {
// op from the surface.
BatchData* batch_data = CreateBatch(2, false /* set_on_complete */);
batch_data->AddRetriableRecvTrailingMetadataOp();
recv_trailing_metadata_internal_batch_ = batch_data;
recv_trailing_metadata_internal_batch_.reset(batch_data);
// Note: This will release the call combiner.
lb_call_->StartTransportStreamOpBatch(batch_data->batch());
}
@ -811,10 +801,11 @@ void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches(
&recv_trailing_metadata_ready_, GRPC_ERROR_NONE,
"re-executing recv_trailing_metadata_ready to propagate "
"internally triggered result");
// Ref will be released by callback.
recv_trailing_metadata_internal_batch_.release();
} else {
recv_trailing_metadata_internal_batch_->Unref();
recv_trailing_metadata_internal_batch_.reset();
}
recv_trailing_metadata_internal_batch_ = nullptr;
}
continue;
}
@ -831,7 +822,7 @@ void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches(
const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
batch->recv_message +
batch->recv_trailing_metadata;
CallAttempt::BatchData* batch_data =
BatchData* batch_data =
CreateBatch(num_callbacks, has_send_ops /* set_on_complete */);
// Cache send ops if needed.
calld_->MaybeCacheSendOpsForBatch(pending);
@ -862,15 +853,6 @@ void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches(
batch_data->AddRetriableRecvTrailingMetadataOp();
}
calld_->AddClosureForBatch(batch_data->batch(), closures);
// Track number of in-flight send batches.
// If this is the first one, take a ref to the call stack.
if (batch->send_initial_metadata || batch->send_message ||
batch->send_trailing_metadata) {
if (calld_->num_in_flight_call_attempt_send_batches_ == 0) {
GRPC_CALL_STACK_REF(calld_->owning_call_, "retriable_send_batches");
}
++calld_->num_in_flight_call_attempt_send_batches_;
}
}
}
@ -880,12 +862,6 @@ void RetryFilter::CallData::CallAttempt::AddRetriableBatches(
BatchData* replay_batch_data = MaybeCreateBatchForReplay();
if (replay_batch_data != nullptr) {
calld_->AddClosureForBatch(replay_batch_data->batch(), closures);
// Track number of pending send batches.
// If this is the first one, take a ref to the call stack.
if (calld_->num_in_flight_call_attempt_send_batches_ == 0) {
GRPC_CALL_STACK_REF(calld_->owning_call_, "retriable_send_batches");
}
++calld_->num_in_flight_call_attempt_send_batches_;
}
// Now add pending batches.
AddBatchesForPendingBatches(closures);
@ -917,14 +893,14 @@ void RetryFilter::CallData::CallAttempt::StartRetriableBatches() {
RetryFilter::CallData::CallAttempt::BatchData::BatchData(
RefCountedPtr<CallAttempt> attempt, int refcount, bool set_on_complete)
: RefCounted(nullptr, refcount), call_attempt_(std::move(attempt)) {
// TODO(roth): Consider holding this ref on the call stack in
// CallAttempt instead of here in BatchData. This would eliminate the
// need for CallData::num_in_flight_call_attempt_send_batches_.
// But it would require having a way to unref CallAttempt when it is
// no longer needed (i.e., when the call is committed and all cached
// send ops have been replayed and the LB call is moved into
// CallData::committed_call_).
GRPC_CALL_STACK_REF(call_attempt_->calld_->owning_call_, "CallAttempt");
// We hold a ref to the call stack for every batch sent on a call attempt.
// This is because some batches on the call attempt may not complete
// until after all of the batches are completed at the surface (because
// each batch that is pending at the surface holds a ref). This
// can happen for replayed send ops, and it can happen for
// recv_initial_metadata and recv_message ops on a call attempt that has
// been abandoned.
GRPC_CALL_STACK_REF(call_attempt_->calld_->owning_call_, "Retry BatchData");
batch_.payload = &call_attempt_->batch_payload_;
if (set_on_complete) {
GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
@ -946,7 +922,7 @@ RetryFilter::CallData::CallAttempt::BatchData::~BatchData() {
if (batch_.recv_trailing_metadata) {
grpc_metadata_batch_destroy(&call_attempt_->recv_trailing_metadata_);
}
GRPC_CALL_STACK_UNREF(call_attempt_->calld_->owning_call_, "CallAttempt");
GRPC_CALL_STACK_UNREF(call_attempt_->calld_->owning_call_, "Retry BatchData");
}
void RetryFilter::CallData::CallAttempt::BatchData::
@ -1069,7 +1045,7 @@ bool RetryFilter::CallData::CallAttempt::BatchData::MaybeRetry(
void RetryFilter::CallData::CallAttempt::BatchData::
InvokeRecvInitialMetadataCallback(void* arg, grpc_error_handle error) {
auto* batch_data = static_cast<CallAttempt::BatchData*>(arg);
auto* batch_data = static_cast<BatchData*>(arg);
auto* call_attempt = batch_data->call_attempt_.get();
// Find pending batch.
PendingBatch* pending = call_attempt->calld_->PendingBatchFind(
@ -1101,8 +1077,7 @@ void RetryFilter::CallData::CallAttempt::BatchData::
void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
void* arg, grpc_error_handle error) {
CallAttempt::BatchData* batch_data =
static_cast<CallAttempt::BatchData*>(arg);
RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
CallAttempt* call_attempt = batch_data->call_attempt_.get();
CallData* calld = call_attempt->calld_;
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
@ -1133,7 +1108,8 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
"(Trailers-Only)",
calld->chand_, calld);
}
call_attempt->recv_initial_metadata_ready_deferred_batch_ = batch_data;
call_attempt->recv_initial_metadata_ready_deferred_batch_ =
std::move(batch_data);
call_attempt->recv_initial_metadata_error_ = GRPC_ERROR_REF(error);
if (!call_attempt->started_recv_trailing_metadata_) {
// recv_trailing_metadata not yet started by application; start it
@ -1151,7 +1127,7 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
}
// Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
InvokeRecvInitialMetadataCallback(batch_data, error);
InvokeRecvInitialMetadataCallback(batch_data.release(), error);
}
//
@ -1160,8 +1136,7 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
void RetryFilter::CallData::CallAttempt::BatchData::InvokeRecvMessageCallback(
void* arg, grpc_error_handle error) {
CallAttempt::BatchData* batch_data =
static_cast<CallAttempt::BatchData*>(arg);
auto* batch_data = static_cast<BatchData*>(arg);
CallAttempt* call_attempt = batch_data->call_attempt_.get();
CallData* calld = call_attempt->calld_;
// Find pending op.
@ -1189,8 +1164,7 @@ void RetryFilter::CallData::CallAttempt::BatchData::InvokeRecvMessageCallback(
void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady(
void* arg, grpc_error_handle error) {
CallAttempt::BatchData* batch_data =
static_cast<CallAttempt::BatchData*>(arg);
RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
CallAttempt* call_attempt = batch_data->call_attempt_.get();
CallData* calld = call_attempt->calld_;
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
@ -1219,7 +1193,7 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady(
"message and recv_trailing_metadata pending)",
calld->chand_, calld);
}
call_attempt->recv_message_ready_deferred_batch_ = batch_data;
call_attempt->recv_message_ready_deferred_batch_ = std::move(batch_data);
call_attempt->recv_message_error_ = GRPC_ERROR_REF(error);
if (!call_attempt->started_recv_trailing_metadata_) {
// recv_trailing_metadata not yet started by application; start it
@ -1236,7 +1210,7 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady(
}
// Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
InvokeRecvMessageCallback(batch_data, error);
InvokeRecvMessageCallback(batch_data.release(), error);
}
//
@ -1312,24 +1286,22 @@ void RetryFilter::CallData::CallAttempt::BatchData::
GRPC_CLOSURE_INIT(
&call_attempt_->recv_initial_metadata_ready_,
InvokeRecvInitialMetadataCallback,
call_attempt_->recv_initial_metadata_ready_deferred_batch_,
call_attempt_->recv_initial_metadata_ready_deferred_batch_.release(),
grpc_schedule_on_exec_ctx);
closures->Add(&call_attempt_->recv_initial_metadata_ready_,
call_attempt_->recv_initial_metadata_error_,
"resuming recv_initial_metadata_ready");
call_attempt_->recv_initial_metadata_ready_deferred_batch_ = nullptr;
}
// Add closure for deferred recv_message_ready.
if (GPR_UNLIKELY(call_attempt_->recv_message_ready_deferred_batch_ !=
nullptr)) {
GRPC_CLOSURE_INIT(&call_attempt_->recv_message_ready_,
InvokeRecvMessageCallback,
call_attempt_->recv_message_ready_deferred_batch_,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(
&call_attempt_->recv_message_ready_, InvokeRecvMessageCallback,
call_attempt_->recv_message_ready_deferred_batch_.release(),
grpc_schedule_on_exec_ctx);
closures->Add(&call_attempt_->recv_message_ready_,
call_attempt_->recv_message_error_,
"resuming recv_message_ready");
call_attempt_->recv_message_ready_deferred_batch_ = nullptr;
}
}
}
@ -1370,15 +1342,12 @@ void RetryFilter::CallData::CallAttempt::BatchData::RunClosuresForCompletedCall(
// Schedule all of the closures identified above.
// Note: This will release the call combiner.
closures.RunClosures(call_attempt_->calld_->call_combiner_);
// Don't need batch_data anymore.
Unref();
GRPC_ERROR_UNREF(error);
}
void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
void* arg, grpc_error_handle error) {
CallAttempt::BatchData* batch_data =
static_cast<CallAttempt::BatchData*>(arg);
RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
CallAttempt* call_attempt = batch_data->call_attempt_.get();
CallData* calld = call_attempt->calld_;
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
@ -1402,17 +1371,14 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
}
// Check if we should retry.
if (batch_data->MaybeRetry(status, server_pushback_md, is_lb_drop)) {
// Unref batch_data for deferred recv_initial_metadata_ready or
// Unref BatchData objects for deferred recv_initial_metadata_ready or
// recv_message_ready callbacks, if any.
if (call_attempt->recv_initial_metadata_ready_deferred_batch_ != nullptr) {
GRPC_ERROR_UNREF(call_attempt->recv_initial_metadata_error_);
batch_data->Unref();
}
if (call_attempt->recv_message_ready_deferred_batch_ != nullptr) {
GRPC_ERROR_UNREF(call_attempt->recv_message_error_);
batch_data->Unref();
}
batch_data->Unref();
call_attempt->recv_initial_metadata_ready_deferred_batch_.reset();
GRPC_ERROR_UNREF(call_attempt->recv_initial_metadata_error_);
call_attempt->recv_initial_metadata_error_ = GRPC_ERROR_NONE;
call_attempt->recv_message_ready_deferred_batch_.reset();
GRPC_ERROR_UNREF(call_attempt->recv_message_error_);
call_attempt->recv_message_error_ = GRPC_ERROR_NONE;
return;
}
// Not retrying, so commit the call.
@ -1486,8 +1452,7 @@ void RetryFilter::CallData::CallAttempt::BatchData::
void RetryFilter::CallData::CallAttempt::BatchData::OnComplete(
void* arg, grpc_error_handle error) {
CallAttempt::BatchData* batch_data =
static_cast<CallAttempt::BatchData*>(arg);
RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
CallAttempt* call_attempt = batch_data->call_attempt_.get();
CallData* calld = call_attempt->calld_;
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
@ -1525,20 +1490,9 @@ void RetryFilter::CallData::CallAttempt::BatchData::OnComplete(
batch_data->AddClosuresForReplayOrPendingSendOps(&closures);
}
}
// Track number of in-flight send batches and determine if this was the
// last one.
--calld->num_in_flight_call_attempt_send_batches_;
const bool last_send_batch_complete =
calld->num_in_flight_call_attempt_send_batches_ == 0;
// Don't need batch_data anymore.
batch_data->Unref();
// Schedule all of the closures identified above.
// Note: This yields the call combiner.
closures.RunClosures(calld->call_combiner_);
// If this was the last in-flight send batch, unref the call stack.
if (last_send_batch_complete) {
GRPC_CALL_STACK_UNREF(calld->owning_call_, "retriable_send_batches");
}
}
//

Loading…
Cancel
Save