|
|
|
@ -579,8 +579,12 @@ const char* BaseCallData::ReceiveMessage::StateString(State state) { |
|
|
|
|
return "CANCELLED"; |
|
|
|
|
case State::kCancelledWhilstForwarding: |
|
|
|
|
return "CANCELLED_WHILST_FORWARDING"; |
|
|
|
|
case State::kCancelledWhilstForwardingNoPipe: |
|
|
|
|
return "CANCELLED_WHILST_FORWARDING_NO_PIPE"; |
|
|
|
|
case State::kBatchCompletedButCancelled: |
|
|
|
|
return "BATCH_COMPLETED_BUT_CANCELLED"; |
|
|
|
|
case State::kBatchCompletedButCancelledNoPipe: |
|
|
|
|
return "BATCH_COMPLETED_BUT_CANCELLED_NO_PIPE"; |
|
|
|
|
case State::kCancelledWhilstIdle: |
|
|
|
|
return "CANCELLED_WHILST_IDLE"; |
|
|
|
|
case State::kCompletedWhilePulledFromPipe: |
|
|
|
@ -606,7 +610,9 @@ void BaseCallData::ReceiveMessage::StartOp(CapturedBatch& batch) { |
|
|
|
|
state_ = State::kForwardedBatch; |
|
|
|
|
break; |
|
|
|
|
case State::kCancelledWhilstForwarding: |
|
|
|
|
case State::kCancelledWhilstForwardingNoPipe: |
|
|
|
|
case State::kBatchCompletedButCancelled: |
|
|
|
|
case State::kBatchCompletedButCancelledNoPipe: |
|
|
|
|
case State::kForwardedBatch: |
|
|
|
|
case State::kForwardedBatchNoPipe: |
|
|
|
|
case State::kBatchCompleted: |
|
|
|
@ -657,8 +663,10 @@ void BaseCallData::ReceiveMessage::GotPipe(T* pipe_end) { |
|
|
|
|
case State::kCompletedWhilePushedToPipe: |
|
|
|
|
case State::kCompletedWhileBatchCompleted: |
|
|
|
|
case State::kCancelledWhilstForwarding: |
|
|
|
|
case State::kCancelledWhilstForwardingNoPipe: |
|
|
|
|
case State::kCancelledWhilstIdle: |
|
|
|
|
case State::kBatchCompletedButCancelled: |
|
|
|
|
case State::kBatchCompletedButCancelledNoPipe: |
|
|
|
|
Crash(absl::StrFormat("ILLEGAL STATE: %s", StateString(state_))); |
|
|
|
|
case State::kCancelled: |
|
|
|
|
return; |
|
|
|
@ -682,6 +690,7 @@ void BaseCallData::ReceiveMessage::OnComplete(absl::Status status) { |
|
|
|
|
case State::kBatchCompletedNoPipe: |
|
|
|
|
case State::kCancelled: |
|
|
|
|
case State::kBatchCompletedButCancelled: |
|
|
|
|
case State::kBatchCompletedButCancelledNoPipe: |
|
|
|
|
case State::kCancelledWhilstIdle: |
|
|
|
|
case State::kCompletedWhilePulledFromPipe: |
|
|
|
|
case State::kCompletedWhilePushedToPipe: |
|
|
|
@ -695,6 +704,9 @@ void BaseCallData::ReceiveMessage::OnComplete(absl::Status status) { |
|
|
|
|
case State::kCancelledWhilstForwarding: |
|
|
|
|
state_ = State::kBatchCompletedButCancelled; |
|
|
|
|
break; |
|
|
|
|
case State::kCancelledWhilstForwardingNoPipe: |
|
|
|
|
state_ = State::kBatchCompletedButCancelledNoPipe; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
completed_status_ = status; |
|
|
|
|
Flusher flusher(base_); |
|
|
|
@ -747,9 +759,11 @@ void BaseCallData::ReceiveMessage::Done(const ServerMetadata& metadata, |
|
|
|
|
} break; |
|
|
|
|
case State::kBatchCompletedNoPipe: |
|
|
|
|
case State::kBatchCompletedButCancelled: |
|
|
|
|
case State::kBatchCompletedButCancelledNoPipe: |
|
|
|
|
Crash(absl::StrFormat("ILLEGAL STATE: %s", StateString(state_))); |
|
|
|
|
case State::kCancelledWhilstIdle: |
|
|
|
|
case State::kCancelledWhilstForwarding: |
|
|
|
|
case State::kCancelledWhilstForwardingNoPipe: |
|
|
|
|
case State::kCancelled: |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
@ -772,6 +786,7 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher, |
|
|
|
|
case State::kForwardedBatch: |
|
|
|
|
case State::kCancelled: |
|
|
|
|
case State::kCancelledWhilstForwarding: |
|
|
|
|
case State::kCancelledWhilstForwardingNoPipe: |
|
|
|
|
case State::kBatchCompletedNoPipe: |
|
|
|
|
break; |
|
|
|
|
case State::kCancelledWhilstIdle: |
|
|
|
@ -785,6 +800,11 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher, |
|
|
|
|
flusher->AddClosure(std::exchange(intercepted_on_complete_, nullptr), |
|
|
|
|
completed_status_, "recv_message"); |
|
|
|
|
break; |
|
|
|
|
case State::kBatchCompletedButCancelledNoPipe: |
|
|
|
|
state_ = State::kCancelled; |
|
|
|
|
flusher->AddClosure(std::exchange(intercepted_on_complete_, nullptr), |
|
|
|
|
completed_status_, "recv_message"); |
|
|
|
|
break; |
|
|
|
|
case State::kBatchCompleted: |
|
|
|
|
if (completed_status_.ok() && intercepted_slice_buffer_->has_value()) { |
|
|
|
|
if (!allow_push_to_pipe) break; |
|
|
|
|