From 193601579817204ebf9a461e61a2e43bf2dc93f4 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 3 Feb 2023 12:00:58 -0800 Subject: [PATCH] [promises] Account for another edge case in promise based filter (#32282) --- src/core/lib/channel/promise_based_filter.cc | 20 ++++++++++++++++++++ src/core/lib/channel/promise_based_filter.h | 4 ++++ 2 files changed, 24 insertions(+) diff --git a/src/core/lib/channel/promise_based_filter.cc b/src/core/lib/channel/promise_based_filter.cc index 6a5b57c1373..95cb90301d3 100644 --- a/src/core/lib/channel/promise_based_filter.cc +++ b/src/core/lib/channel/promise_based_filter.cc @@ -579,8 +579,12 @@ const char* BaseCallData::ReceiveMessage::StateString(State state) { return "CANCELLED"; case State::kCancelledWhilstForwarding: return "CANCELLED_WHILST_FORWARDING"; + case State::kCancelledWhilstForwardingNoPipe: + return "CANCELLED_WHILST_FORWARDING_NO_PIPE"; case State::kBatchCompletedButCancelled: return "BATCH_COMPLETED_BUT_CANCELLED"; + case State::kBatchCompletedButCancelledNoPipe: + return "BATCH_COMPLETED_BUT_CANCELLED_NO_PIPE"; case State::kCancelledWhilstIdle: return "CANCELLED_WHILST_IDLE"; case State::kCompletedWhilePulledFromPipe: @@ -606,7 +610,9 @@ void BaseCallData::ReceiveMessage::StartOp(CapturedBatch& batch) { state_ = State::kForwardedBatch; break; case State::kCancelledWhilstForwarding: + case State::kCancelledWhilstForwardingNoPipe: case State::kBatchCompletedButCancelled: + case State::kBatchCompletedButCancelledNoPipe: case State::kForwardedBatch: case State::kForwardedBatchNoPipe: case State::kBatchCompleted: @@ -657,8 +663,10 @@ void BaseCallData::ReceiveMessage::GotPipe(T* pipe_end) { case State::kCompletedWhilePushedToPipe: case State::kCompletedWhileBatchCompleted: case State::kCancelledWhilstForwarding: + case State::kCancelledWhilstForwardingNoPipe: case State::kCancelledWhilstIdle: case State::kBatchCompletedButCancelled: + case State::kBatchCompletedButCancelledNoPipe: Crash(absl::StrFormat("ILLEGAL STATE: %s", StateString(state_))); case State::kCancelled: return; @@ -682,6 +690,7 @@ void BaseCallData::ReceiveMessage::OnComplete(absl::Status status) { case State::kBatchCompletedNoPipe: case State::kCancelled: case State::kBatchCompletedButCancelled: + case State::kBatchCompletedButCancelledNoPipe: case State::kCancelledWhilstIdle: case State::kCompletedWhilePulledFromPipe: case State::kCompletedWhilePushedToPipe: @@ -695,6 +704,9 @@ void BaseCallData::ReceiveMessage::OnComplete(absl::Status status) { case State::kCancelledWhilstForwarding: state_ = State::kBatchCompletedButCancelled; break; + case State::kCancelledWhilstForwardingNoPipe: + state_ = State::kBatchCompletedButCancelledNoPipe; + break; } completed_status_ = status; Flusher flusher(base_); @@ -747,9 +759,11 @@ void BaseCallData::ReceiveMessage::Done(const ServerMetadata& metadata, } break; case State::kBatchCompletedNoPipe: case State::kBatchCompletedButCancelled: + case State::kBatchCompletedButCancelledNoPipe: Crash(absl::StrFormat("ILLEGAL STATE: %s", StateString(state_))); case State::kCancelledWhilstIdle: case State::kCancelledWhilstForwarding: + case State::kCancelledWhilstForwardingNoPipe: case State::kCancelled: break; } @@ -772,6 +786,7 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher, case State::kForwardedBatch: case State::kCancelled: case State::kCancelledWhilstForwarding: + case State::kCancelledWhilstForwardingNoPipe: case State::kBatchCompletedNoPipe: break; case State::kCancelledWhilstIdle: @@ -785,6 +800,11 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher, flusher->AddClosure(std::exchange(intercepted_on_complete_, nullptr), completed_status_, "recv_message"); break; + case State::kBatchCompletedButCancelledNoPipe: + state_ = State::kCancelled; + flusher->AddClosure(std::exchange(intercepted_on_complete_, nullptr), + completed_status_, "recv_message"); + break; case State::kBatchCompleted: if (completed_status_.ok() && intercepted_slice_buffer_->has_value()) { if (!allow_push_to_pipe) break; diff --git a/src/core/lib/channel/promise_based_filter.h b/src/core/lib/channel/promise_based_filter.h index 9b6a705990d..64983d86c09 100644 --- a/src/core/lib/channel/promise_based_filter.h +++ b/src/core/lib/channel/promise_based_filter.h @@ -479,11 +479,15 @@ class BaseCallData : public Activity, private Wakeable { // stack: we need to keep track of that until we get the completion so // that we do the right thing in OnComplete. kCancelledWhilstForwarding, + // The same, but before we got the pipe + kCancelledWhilstForwardingNoPipe, // Call got terminated whilst we had a recv_message batch completed, and // we've now received the completion. // On the next poll we'll close things out and forward on completions, // then transition to cancelled. kBatchCompletedButCancelled, + // The same, but before we got the pipe + kBatchCompletedButCancelledNoPipe, // Completed successfully while we're processing a recv message - see // kPushedToPipe. kCompletedWhilePushedToPipe,