From 68a7e8a7d83bdec531ecc555243acfbb6728d2b2 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 3 Jun 2021 07:38:59 -0700 Subject: [PATCH] optimize retry per-call-attempt memory usage (#26386) --- .../filters/client_channel/retry_filter.cc | 154 ++++++++++++------ 1 file changed, 101 insertions(+), 53 deletions(-) diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc index e8b0f8694bf..fd24641c472 100644 --- a/src/core/ext/filters/client_channel/retry_filter.cc +++ b/src/core/ext/filters/client_channel/retry_filter.cc @@ -211,11 +211,10 @@ class RetryFilter::CallData { }; // State associated with each call attempt. - // Allocated on the arena. - class CallAttempt - : public RefCounted { + class CallAttempt : public RefCounted { public: explicit CallAttempt(CallData* calld); + ~CallAttempt() override; ClientChannel::LoadBalancedCall* lb_call() const { return lb_call_.get(); } @@ -343,6 +342,14 @@ class RetryFilter::CallData { // Returns true if any op in the batch was not yet started on this attempt. bool PendingBatchIsUnstarted(PendingBatch* pending); + // Returns true if there are cached send ops to replay. + bool HaveSendOpsToReplay(); + + // If our retry state is no longer needed, switch to fast path by moving + // our LB call into calld_->committed_call_ and having calld_ drop + // its ref to us. + void MaybeSwitchToFastPath(); + // Helper function used to start a recv_trailing_metadata batch. This // is used in the case where a recv_initial_metadata or recv_message // op fails in a way that we know the call is over but when the application @@ -445,13 +452,6 @@ class RetryFilter::CallData { void CreateCallAttempt(); - // Adds a closure to closures that will execute batch on lb_call in the - // call combiner. - void AddClosureForBatch(grpc_transport_stream_op_batch* batch, - ClientChannel::LoadBalancedCall* lb_call, - const char* reason, - CallCombinerClosureList* closures); - RetryFilter* chand_; grpc_polling_entity* pollent_; RefCountedPtr retry_throttle_data_; @@ -473,12 +473,9 @@ class RetryFilter::CallData { // gets cancelled. RefCountedPtr call_attempt_; - // LB call used when the call is commited before any CallAttempt is - // created. - // TODO(roth): Change CallAttempt logic such that once we've committed - // and all cached send ops have been replayed, we move the LB call - // from the CallAttempt here, thus creating a fast path for the - // remainder of the streaming call. + // LB call used when we've committed to a call attempt and the retry + // state for that attempt is no longer needed. This provides a fast + // path for long-running streaming calls that minimizes overhead. RefCountedPtr committed_call_; // When are are not yet fully committed to a particular call (i.e., @@ -609,6 +606,13 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld) } } +RetryFilter::CallData::CallAttempt::~CallAttempt() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: destroying call attempt", + calld_->chand_, calld_, this); + } +} + void RetryFilter::CallData::CallAttempt::FreeCachedSendOpDataAfterCommit() { // TODO(roth): When we implement hedging, this logic will need to get // a bit more complex, because there may be other (now abandoned) call @@ -647,6 +651,34 @@ bool RetryFilter::CallData::CallAttempt::PendingBatchIsUnstarted( return false; } +bool RetryFilter::CallData::CallAttempt::HaveSendOpsToReplay() { + // We don't check send_initial_metadata here, because that op will always + // be started as soon as it is received from the surface, so it will + // never need to be started at this point. + return started_send_message_count_ < calld_->send_messages_.size() || + (calld_->seen_send_trailing_metadata_ && + !started_send_trailing_metadata_); +} + +void RetryFilter::CallData::CallAttempt::MaybeSwitchToFastPath() { + GPR_ASSERT(calld_->retry_committed_); + // We still need to start new batches on this call attempt if (a) there + // are still send ops to replay or (b) we started an internal batch for + // recv_trailing_metadata but have not yet seen that op from the surface. + if (calld_->committed_call_ == nullptr && !HaveSendOpsToReplay() && + recv_trailing_metadata_internal_batch_ == nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, + "chand=%p calld=%p attempt=%p: call committed and no more " + "send ops to replay; moving LB call to parent and unreffing " + "the call attempt", + calld_->chand_, calld_, this); + } + calld_->committed_call_ = std::move(lb_call_); + calld_->call_attempt_.reset(); + } +} + void RetryFilter::CallData::CallAttempt::StartInternalRecvTrailingMetadata() { if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, @@ -721,6 +753,19 @@ RetryFilter::CallData::CallAttempt::MaybeCreateBatchForReplay() { return replay_batch_data; } +namespace { + +void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) { + grpc_transport_stream_op_batch* batch = + static_cast(arg); + auto* lb_call = static_cast( + batch->handler_private.extra_arg); + // Note: This will release the call combiner. + lb_call->StartTransportStreamOpBatch(batch); +} + +} // namespace + void RetryFilter::CallData::CallAttempt::AddClosureForBatch( grpc_transport_stream_op_batch* batch, const char* reason, CallCombinerClosureList* closures) { @@ -729,7 +774,10 @@ void RetryFilter::CallData::CallAttempt::AddClosureForBatch( calld_->chand_, calld_, this, reason, grpc_transport_stream_op_batch_string(batch).c_str()); } - calld_->AddClosureForBatch(batch, lb_call_.get(), reason, closures); + batch->handler_private.extra_arg = lb_call_.get(); + GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner, + batch, grpc_schedule_on_exec_ctx); + closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, reason); } void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches( @@ -1128,6 +1176,9 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady( } // Received valid initial metadata, so commit the call. calld->RetryCommit(call_attempt); + // If retry state is no longer needed, switch to fast path for + // subsequent batches. + call_attempt->MaybeSwitchToFastPath(); } // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. @@ -1213,6 +1264,9 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady( } // Received a valid message, so commit the call. calld->RetryCommit(call_attempt); + // If retry state is no longer needed, switch to fast path for + // subsequent batches. + call_attempt->MaybeSwitchToFastPath(); } // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. @@ -1393,6 +1447,9 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady( } // Not retrying, so commit the call. calld->RetryCommit(call_attempt); + // If retry state is no longer needed, switch to fast path for + // subsequent batches. + call_attempt->MaybeSwitchToFastPath(); // Run any necessary closures. batch_data->RunClosuresForCompletedCall(GRPC_ERROR_REF(error)); } @@ -1430,27 +1487,22 @@ void RetryFilter::CallData::CallAttempt::BatchData:: void RetryFilter::CallData::CallAttempt::BatchData:: AddClosuresForReplayOrPendingSendOps(CallCombinerClosureList* closures) { auto* calld = call_attempt_->calld_; + bool have_pending_send_ops = call_attempt_->HaveSendOpsToReplay(); // We don't check send_initial_metadata here, because that op will always // be started as soon as it is received from the surface, so it will // never need to be started at this point. - bool have_pending_send_message_ops = - call_attempt_->started_send_message_count_ < calld->send_messages_.size(); - bool have_pending_send_trailing_metadata_op = - calld->seen_send_trailing_metadata_ && - !call_attempt_->started_send_trailing_metadata_; - if (!have_pending_send_message_ops && - !have_pending_send_trailing_metadata_op) { + if (!have_pending_send_ops) { for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches_); ++i) { PendingBatch* pending = &calld->pending_batches_[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch == nullptr || pending->send_ops_cached) continue; - if (batch->send_message) have_pending_send_message_ops = true; - if (batch->send_trailing_metadata) { - have_pending_send_trailing_metadata_op = true; + if (batch->send_message || batch->send_trailing_metadata) { + have_pending_send_ops = true; + break; } } } - if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) { + if (have_pending_send_ops) { if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: starting next batch for pending " @@ -1466,6 +1518,9 @@ void RetryFilter::CallData::CallAttempt::BatchData::OnComplete( RefCountedPtr batch_data(static_cast(arg)); CallAttempt* call_attempt = batch_data->call_attempt_.get(); CallData* calld = call_attempt->calld_; + // TODO(roth): If error is not GRPC_ERROR_NONE, then we should defer + // sending the completion of this batch to the surface until we see + // recv_trailing_metadata_ready and decide whether we want to retry. if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: got on_complete, error=%s, batch=%s", @@ -1502,6 +1557,13 @@ void RetryFilter::CallData::CallAttempt::BatchData::OnComplete( if (!call_attempt->completed_recv_trailing_metadata_) { batch_data->AddClosuresForReplayOrPendingSendOps(&closures); } + // If the call is committed and retry state is no longer needed, switch + // to fast path for subsequent batches. + // TODO(roth): As part of implementing hedging, this logic needs to + // check that *this* call attempt is the one that we've committed to. + // Might need to replace retry_dispatched_ with an enum indicating + // whether we're in flight, abandoned, or the winning call attempt. + if (calld->retry_committed_) call_attempt->MaybeSwitchToFastPath(); } // Schedule all of the closures identified above. // Note: This yields the call combiner. @@ -1797,6 +1859,13 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( // immediately create an LB call and delegate the batch to it. This // avoids the overhead of unnecessarily allocating a CallAttempt // object or caching any of the send op data. + // Note that we would ideally like to do this also on subsequent + // attempts (e.g., if a batch puts the call above the buffer size + // limit since the last attempt was complete), but in practice that's + // not really worthwhile, because we will almost always have cached and + // completed at least the send_initial_metadata op on the previous + // attempt, which means that we'd need special logic to replay the + // batch anyway, which is exactly what the CallAttempt object provides. if (num_attempts_completed_ == 0 && retry_committed_) { if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, @@ -1809,7 +1878,9 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( committed_call_->StartTransportStreamOpBatch(batch); return; } - // We do not yet have a call attempt, so create one. + // Otherwise, create a call attempt. + // The attempt will automatically start any necessary replays or + // pending batches. if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: creating call attempt", chand_, this); @@ -1839,35 +1910,12 @@ RetryFilter::CallData::CreateLoadBalancedCall() { } void RetryFilter::CallData::CreateCallAttempt() { - call_attempt_.reset(arena_->New(this)); + call_attempt_ = MakeRefCounted(this); call_attempt_->StartRetriableBatches(); // TODO(roth): When implementing hedging, change this to start a timer // for the next hedging attempt. } -namespace { - -void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) { - grpc_transport_stream_op_batch* batch = - static_cast(arg); - auto* lb_call = static_cast( - batch->handler_private.extra_arg); - // Note: This will release the call combiner. - lb_call->StartTransportStreamOpBatch(batch); -} - -} // namespace - -void RetryFilter::CallData::AddClosureForBatch( - grpc_transport_stream_op_batch* batch, - ClientChannel::LoadBalancedCall* lb_call, const char* reason, - CallCombinerClosureList* closures) { - batch->handler_private.extra_arg = lb_call; - GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner, - batch, grpc_schedule_on_exec_ctx); - closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, reason); -} - // // send op data caching //