optimize retry per-call-attempt memory usage (#26386)

reviewable/pr26134/r7
Mark D. Roth 4 years ago committed by GitHub
parent bb994526ba
commit 68a7e8a7d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 154
      src/core/ext/filters/client_channel/retry_filter.cc

@ -211,11 +211,10 @@ class RetryFilter::CallData {
};
// State associated with each call attempt.
// Allocated on the arena.
class CallAttempt
: public RefCounted<CallAttempt, PolymorphicRefCount, kUnrefCallDtor> {
class CallAttempt : public RefCounted<CallAttempt> {
public:
explicit CallAttempt(CallData* calld);
~CallAttempt() override;
ClientChannel::LoadBalancedCall* lb_call() const { return lb_call_.get(); }
@ -343,6 +342,14 @@ class RetryFilter::CallData {
// Returns true if any op in the batch was not yet started on this attempt.
bool PendingBatchIsUnstarted(PendingBatch* pending);
// Returns true if there are cached send ops to replay.
bool HaveSendOpsToReplay();
// If our retry state is no longer needed, switch to fast path by moving
// our LB call into calld_->committed_call_ and having calld_ drop
// its ref to us.
void MaybeSwitchToFastPath();
// Helper function used to start a recv_trailing_metadata batch. This
// is used in the case where a recv_initial_metadata or recv_message
// op fails in a way that we know the call is over but when the application
@ -445,13 +452,6 @@ class RetryFilter::CallData {
void CreateCallAttempt();
// Adds a closure to closures that will execute batch on lb_call in the
// call combiner.
void AddClosureForBatch(grpc_transport_stream_op_batch* batch,
ClientChannel::LoadBalancedCall* lb_call,
const char* reason,
CallCombinerClosureList* closures);
RetryFilter* chand_;
grpc_polling_entity* pollent_;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
@ -473,12 +473,9 @@ class RetryFilter::CallData {
// gets cancelled.
RefCountedPtr<CallAttempt> call_attempt_;
// LB call used when the call is commited before any CallAttempt is
// created.
// TODO(roth): Change CallAttempt logic such that once we've committed
// and all cached send ops have been replayed, we move the LB call
// from the CallAttempt here, thus creating a fast path for the
// remainder of the streaming call.
// LB call used when we've committed to a call attempt and the retry
// state for that attempt is no longer needed. This provides a fast
// path for long-running streaming calls that minimizes overhead.
RefCountedPtr<ClientChannel::LoadBalancedCall> committed_call_;
// When are are not yet fully committed to a particular call (i.e.,
@ -609,6 +606,13 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld)
}
}
RetryFilter::CallData::CallAttempt::~CallAttempt() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: destroying call attempt",
calld_->chand_, calld_, this);
}
}
void RetryFilter::CallData::CallAttempt::FreeCachedSendOpDataAfterCommit() {
// TODO(roth): When we implement hedging, this logic will need to get
// a bit more complex, because there may be other (now abandoned) call
@ -647,6 +651,34 @@ bool RetryFilter::CallData::CallAttempt::PendingBatchIsUnstarted(
return false;
}
bool RetryFilter::CallData::CallAttempt::HaveSendOpsToReplay() {
// We don't check send_initial_metadata here, because that op will always
// be started as soon as it is received from the surface, so it will
// never need to be started at this point.
return started_send_message_count_ < calld_->send_messages_.size() ||
(calld_->seen_send_trailing_metadata_ &&
!started_send_trailing_metadata_);
}
void RetryFilter::CallData::CallAttempt::MaybeSwitchToFastPath() {
GPR_ASSERT(calld_->retry_committed_);
// We still need to start new batches on this call attempt if (a) there
// are still send ops to replay or (b) we started an internal batch for
// recv_trailing_metadata but have not yet seen that op from the surface.
if (calld_->committed_call_ == nullptr && !HaveSendOpsToReplay() &&
recv_trailing_metadata_internal_batch_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p attempt=%p: call committed and no more "
"send ops to replay; moving LB call to parent and unreffing "
"the call attempt",
calld_->chand_, calld_, this);
}
calld_->committed_call_ = std::move(lb_call_);
calld_->call_attempt_.reset();
}
}
void RetryFilter::CallData::CallAttempt::StartInternalRecvTrailingMetadata() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO,
@ -721,6 +753,19 @@ RetryFilter::CallData::CallAttempt::MaybeCreateBatchForReplay() {
return replay_batch_data;
}
namespace {
void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
auto* lb_call = static_cast<ClientChannel::LoadBalancedCall*>(
batch->handler_private.extra_arg);
// Note: This will release the call combiner.
lb_call->StartTransportStreamOpBatch(batch);
}
} // namespace
void RetryFilter::CallData::CallAttempt::AddClosureForBatch(
grpc_transport_stream_op_batch* batch, const char* reason,
CallCombinerClosureList* closures) {
@ -729,7 +774,10 @@ void RetryFilter::CallData::CallAttempt::AddClosureForBatch(
calld_->chand_, calld_, this, reason,
grpc_transport_stream_op_batch_string(batch).c_str());
}
calld_->AddClosureForBatch(batch, lb_call_.get(), reason, closures);
batch->handler_private.extra_arg = lb_call_.get();
GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
batch, grpc_schedule_on_exec_ctx);
closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, reason);
}
void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches(
@ -1128,6 +1176,9 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
}
// Received valid initial metadata, so commit the call.
calld->RetryCommit(call_attempt);
// If retry state is no longer needed, switch to fast path for
// subsequent batches.
call_attempt->MaybeSwitchToFastPath();
}
// Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
@ -1213,6 +1264,9 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady(
}
// Received a valid message, so commit the call.
calld->RetryCommit(call_attempt);
// If retry state is no longer needed, switch to fast path for
// subsequent batches.
call_attempt->MaybeSwitchToFastPath();
}
// Invoke the callback to return the result to the surface.
// Manually invoking a callback function; it does not take ownership of error.
@ -1393,6 +1447,9 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
}
// Not retrying, so commit the call.
calld->RetryCommit(call_attempt);
// If retry state is no longer needed, switch to fast path for
// subsequent batches.
call_attempt->MaybeSwitchToFastPath();
// Run any necessary closures.
batch_data->RunClosuresForCompletedCall(GRPC_ERROR_REF(error));
}
@ -1430,27 +1487,22 @@ void RetryFilter::CallData::CallAttempt::BatchData::
void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForReplayOrPendingSendOps(CallCombinerClosureList* closures) {
auto* calld = call_attempt_->calld_;
bool have_pending_send_ops = call_attempt_->HaveSendOpsToReplay();
// We don't check send_initial_metadata here, because that op will always
// be started as soon as it is received from the surface, so it will
// never need to be started at this point.
bool have_pending_send_message_ops =
call_attempt_->started_send_message_count_ < calld->send_messages_.size();
bool have_pending_send_trailing_metadata_op =
calld->seen_send_trailing_metadata_ &&
!call_attempt_->started_send_trailing_metadata_;
if (!have_pending_send_message_ops &&
!have_pending_send_trailing_metadata_op) {
if (!have_pending_send_ops) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches_); ++i) {
PendingBatch* pending = &calld->pending_batches_[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch == nullptr || pending->send_ops_cached) continue;
if (batch->send_message) have_pending_send_message_ops = true;
if (batch->send_trailing_metadata) {
have_pending_send_trailing_metadata_op = true;
if (batch->send_message || batch->send_trailing_metadata) {
have_pending_send_ops = true;
break;
}
}
}
if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
if (have_pending_send_ops) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p attempt=%p: starting next batch for pending "
@ -1466,6 +1518,9 @@ void RetryFilter::CallData::CallAttempt::BatchData::OnComplete(
RefCountedPtr<BatchData> batch_data(static_cast<BatchData*>(arg));
CallAttempt* call_attempt = batch_data->call_attempt_.get();
CallData* calld = call_attempt->calld_;
// TODO(roth): If error is not GRPC_ERROR_NONE, then we should defer
// sending the completion of this batch to the surface until we see
// recv_trailing_metadata_ready and decide whether we want to retry.
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO,
"chand=%p calld=%p attempt=%p: got on_complete, error=%s, batch=%s",
@ -1502,6 +1557,13 @@ void RetryFilter::CallData::CallAttempt::BatchData::OnComplete(
if (!call_attempt->completed_recv_trailing_metadata_) {
batch_data->AddClosuresForReplayOrPendingSendOps(&closures);
}
// If the call is committed and retry state is no longer needed, switch
// to fast path for subsequent batches.
// TODO(roth): As part of implementing hedging, this logic needs to
// check that *this* call attempt is the one that we've committed to.
// Might need to replace retry_dispatched_ with an enum indicating
// whether we're in flight, abandoned, or the winning call attempt.
if (calld->retry_committed_) call_attempt->MaybeSwitchToFastPath();
}
// Schedule all of the closures identified above.
// Note: This yields the call combiner.
@ -1797,6 +1859,13 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
// immediately create an LB call and delegate the batch to it. This
// avoids the overhead of unnecessarily allocating a CallAttempt
// object or caching any of the send op data.
// Note that we would ideally like to do this also on subsequent
// attempts (e.g., if a batch puts the call above the buffer size
// limit since the last attempt was complete), but in practice that's
// not really worthwhile, because we will almost always have cached and
// completed at least the send_initial_metadata op on the previous
// attempt, which means that we'd need special logic to replay the
// batch anyway, which is exactly what the CallAttempt object provides.
if (num_attempts_completed_ == 0 && retry_committed_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO,
@ -1809,7 +1878,9 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
committed_call_->StartTransportStreamOpBatch(batch);
return;
}
// We do not yet have a call attempt, so create one.
// Otherwise, create a call attempt.
// The attempt will automatically start any necessary replays or
// pending batches.
if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: creating call attempt", chand_,
this);
@ -1839,35 +1910,12 @@ RetryFilter::CallData::CreateLoadBalancedCall() {
}
void RetryFilter::CallData::CreateCallAttempt() {
call_attempt_.reset(arena_->New<CallAttempt>(this));
call_attempt_ = MakeRefCounted<CallAttempt>(this);
call_attempt_->StartRetriableBatches();
// TODO(roth): When implementing hedging, change this to start a timer
// for the next hedging attempt.
}
namespace {
void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
auto* lb_call = static_cast<ClientChannel::LoadBalancedCall*>(
batch->handler_private.extra_arg);
// Note: This will release the call combiner.
lb_call->StartTransportStreamOpBatch(batch);
}
} // namespace
void RetryFilter::CallData::AddClosureForBatch(
grpc_transport_stream_op_batch* batch,
ClientChannel::LoadBalancedCall* lb_call, const char* reason,
CallCombinerClosureList* closures) {
batch->handler_private.extra_arg = lb_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
batch, grpc_schedule_on_exec_ctx);
closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, reason);
}
//
// send op data caching
//

Loading…
Cancel
Save