[promises] Fix ordering problems shown up deploying experiment internally (#33920)

pull/33910/head^2
Craig Tiller 1 year ago committed by GitHub
parent c5246fc059
commit b4cf62725e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      src/core/lib/surface/call.cc

@ -2227,6 +2227,10 @@ class PromiseBasedCall : public Call,
} }
} }
void set_failed_before_recv_message() {
failed_before_recv_message_.store(true, std::memory_order_relaxed);
}
private: private:
union CompletionInfo { union CompletionInfo {
static constexpr uint32_t kOpFailed = 0x8000'0000u; static constexpr uint32_t kOpFailed = 0x8000'0000u;
@ -2600,7 +2604,7 @@ void PromiseBasedCall::StartRecvMessage(
"finishes: received end-of-stream with error", "finishes: received end-of-stream with error",
DebugTag().c_str()); DebugTag().c_str());
} }
failed_before_recv_message_.store(true); set_failed_before_recv_message();
FailCompletion(completion); FailCompletion(completion);
if (cancel_on_error) CancelWithError(absl::CancelledError()); if (cancel_on_error) CancelWithError(absl::CancelledError());
*recv_message_ = nullptr; *recv_message_ = nullptr;
@ -3244,11 +3248,13 @@ void ServerPromiseBasedCall::Finish(ServerMetadataHandle result) {
channelz_node->RecordCallFailed(); channelz_node->RecordCallFailed();
} }
} }
bool was_cancelled = result->get(GrpcCallWasCancelled()).value_or(true);
if (recv_close_op_cancel_state_.CompleteCallWithCancelledSetTo( if (recv_close_op_cancel_state_.CompleteCallWithCancelledSetTo(
result->get(GrpcCallWasCancelled()).value_or(true))) { was_cancelled)) {
FinishOpOnCompletion(&recv_close_completion_, FinishOpOnCompletion(&recv_close_completion_,
PendingOp::kReceiveCloseOnServer); PendingOp::kReceiveCloseOnServer);
} }
if (was_cancelled) set_failed_before_recv_message();
if (server_initial_metadata_ != nullptr) { if (server_initial_metadata_ != nullptr) {
server_initial_metadata_->Close(); server_initial_metadata_->Close();
} }
@ -3324,7 +3330,10 @@ void ServerPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
[this, [this,
completion = AddOpToCompletion( completion = AddOpToCompletion(
completion, PendingOp::kSendInitialMetadata)](bool r) mutable { completion, PendingOp::kSendInitialMetadata)](bool r) mutable {
if (!r) FailCompletion(completion); if (!r) {
set_failed_before_recv_message();
FailCompletion(completion);
}
FinishOpOnCompletion(&completion, FinishOpOnCompletion(&completion,
PendingOp::kSendInitialMetadata); PendingOp::kSendInitialMetadata);
}); });
@ -3374,7 +3383,10 @@ void ServerPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
[this, completion = AddOpToCompletion( [this, completion = AddOpToCompletion(
completion, PendingOp::kSendStatusFromServer)]( completion, PendingOp::kSendStatusFromServer)](
bool ok) mutable { bool ok) mutable {
if (!ok) FailCompletion(completion); if (!ok) {
set_failed_before_recv_message();
FailCompletion(completion);
}
FinishOpOnCompletion(&completion, FinishOpOnCompletion(&completion,
PendingOp::kSendStatusFromServer); PendingOp::kSendStatusFromServer);
}); });

Loading…
Cancel
Save