From 1514a1cf896bb1a889cc078aece541ee7b866f45 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 5 Aug 2024 10:53:20 -0700 Subject: [PATCH] [call-v3] Expand suite of passing tests: enable cancel_after_client_done (#37326) Closes #37326 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37326 from ctiller:terrible-tommy 846fa76522e34b902cc88548970650c16a9890ee PiperOrigin-RevId: 659607451 --- .../chaotic_good/client_transport.cc | 41 +++++++------ src/core/ext/transport/chaotic_good/frame.h | 3 + .../chaotic_good/server_transport.cc | 58 ++++++++++++------- .../transport/chaotic_good/server_transport.h | 5 +- src/core/lib/surface/call.cc | 6 +- src/core/lib/transport/call_spine.h | 36 ++++++++---- .../end2end/tests/cancel_after_client_done.cc | 2 - 7 files changed, 90 insertions(+), 61 deletions(-) diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc index 1164ce71c3b..f4d6a3ab6c2 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.cc +++ b/src/core/ext/transport/chaotic_good/client_transport.cc @@ -254,7 +254,11 @@ uint32_t ChaoticGoodClientTransport::MakeStream(CallHandler call_handler) { const uint32_t stream_id = next_stream_id_++; stream_map_.emplace(stream_id, call_handler); lock.Release(); - call_handler.OnDone([this, stream_id]() { + call_handler.OnDone([this, stream_id](bool cancelled) { + if (cancelled) { + outgoing_frames_.MakeSender().UnbufferedImmediateSend( + CancelFrame{stream_id}); + } MutexLock lock(&mu_); stream_map_.erase(stream_id); }); @@ -317,24 +321,23 @@ void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) { "outbound_loop", [self = RefAsSubclass(), call_handler]() mutable { const uint32_t stream_id = self->MakeStream(call_handler); - return Map(self->CallOutboundLoop(stream_id, call_handler), - [stream_id, sender = self->outgoing_frames_.MakeSender()]( - absl::Status result) mutable { - GRPC_TRACE_LOG(chaotic_good, INFO) - << "CHAOTIC_GOOD: Call " << stream_id - << " finished with " << result.ToString(); - if (!result.ok()) { - GRPC_TRACE_LOG(chaotic_good, INFO) - << "CHAOTIC_GOOD: Send cancel"; - CancelFrame frame; - frame.stream_id = stream_id; - if (!sender.UnbufferedImmediateSend(std::move(frame))) { - GRPC_TRACE_LOG(chaotic_good, INFO) - << "CHAOTIC_GOOD: Send cancel failed"; - } - } - return result; - }); + return Map( + self->CallOutboundLoop(stream_id, call_handler), + [stream_id, sender = self->outgoing_frames_.MakeSender()]( + absl::Status result) mutable { + GRPC_TRACE_LOG(chaotic_good, INFO) + << "CHAOTIC_GOOD: Call " << stream_id << " finished with " + << result.ToString(); + if (!result.ok()) { + GRPC_TRACE_LOG(chaotic_good, INFO) + << "CHAOTIC_GOOD: Send cancel"; + if (!sender.UnbufferedImmediateSend(CancelFrame{stream_id})) { + GRPC_TRACE_LOG(chaotic_good, INFO) + << "CHAOTIC_GOOD: Send cancel failed"; + } + } + return result; + }); }); } diff --git a/src/core/ext/transport/chaotic_good/frame.h b/src/core/ext/transport/chaotic_good/frame.h index d521a483101..548280858bf 100644 --- a/src/core/ext/transport/chaotic_good/frame.h +++ b/src/core/ext/transport/chaotic_good/frame.h @@ -156,6 +156,9 @@ struct ServerFragmentFrame final : public FrameInterface { }; struct CancelFrame final : public FrameInterface { + CancelFrame() = default; + explicit CancelFrame(uint32_t stream_id) : stream_id(stream_id) {} + absl::Status Deserialize(HPackParser* parser, const FrameHeader& header, absl::BitGenRef bitsrc, Arena* arena, BufferPair buffers, FrameLimits limits) override; diff --git a/src/core/ext/transport/chaotic_good/server_transport.cc b/src/core/ext/transport/chaotic_good/server_transport.cc index 3e83d5a4c0e..21fa69022cf 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.cc +++ b/src/core/ext/transport/chaotic_good/server_transport.cc @@ -72,8 +72,7 @@ auto ChaoticGoodServerTransport::TransportWriteLoop( } auto ChaoticGoodServerTransport::PushFragmentIntoCall( - CallInitiator call_initiator, ClientFragmentFrame frame, - uint32_t stream_id) { + CallInitiator call_initiator, ClientFragmentFrame frame) { DCHECK(frame.headers == nullptr); GRPC_TRACE_LOG(chaotic_good, INFO) << "CHAOTIC_GOOD: PushFragmentIntoCall: frame=" << frame.ToString(); @@ -84,17 +83,15 @@ auto ChaoticGoodServerTransport::PushFragmentIntoCall( std::move(frame.message->message)); }, []() -> StatusFlag { return Success{}; }), - [this, call_initiator, end_of_stream = frame.end_of_stream, - stream_id](StatusFlag status) mutable -> StatusFlag { + [call_initiator, end_of_stream = frame.end_of_stream]( + StatusFlag status) mutable -> StatusFlag { if (!status.ok() && GRPC_TRACE_FLAG_ENABLED(chaotic_good)) { LOG(INFO) << "CHAOTIC_GOOD: Failed PushFragmentIntoCall"; } if (end_of_stream || !status.ok()) { call_initiator.FinishSends(); - // We have received end_of_stream. It is now safe to remove - // the call from the stream map. - MutexLock lock(&mu_); - stream_map_.erase(stream_id); + // Note that we cannot remove from the stream map yet, as we + // may yet receive a cancellation. } return Success{}; }); @@ -102,17 +99,16 @@ auto ChaoticGoodServerTransport::PushFragmentIntoCall( auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall( absl::optional call_initiator, absl::Status error, - ClientFragmentFrame frame, uint32_t stream_id) { + ClientFragmentFrame frame) { return If( call_initiator.has_value() && error.ok(), - [this, &call_initiator, &frame, &stream_id]() { + [this, &call_initiator, &frame]() { return Map( call_initiator->SpawnWaitable( "push-fragment", - [call_initiator, frame = std::move(frame), stream_id, - this]() mutable { - return call_initiator->CancelIfFails(PushFragmentIntoCall( - *call_initiator, std::move(frame), stream_id)); + [call_initiator, frame = std::move(frame), this]() mutable { + return call_initiator->CancelIfFails( + PushFragmentIntoCall(*call_initiator, std::move(frame))); }), [](StatusFlag status) { return StatusCast(status); }); }, @@ -255,8 +251,7 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall( } } return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status), - std::move(fragment_frame), - frame_header.stream_id); + std::move(fragment_frame)); } auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToExistingCall( @@ -271,8 +266,7 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToExistingCall( frame_header, std::move(buffers), arena, fragment_frame, FrameLimits{1024 * 1024 * 1024, aligned_bytes_ - 1}); return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status), - std::move(fragment_frame), - frame_header.stream_id); + std::move(fragment_frame)); } auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) { @@ -305,6 +299,10 @@ auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) { [this, &frame_header]() { absl::optional call_initiator = ExtractStream(frame_header.stream_id); + GRPC_TRACE_LOG(chaotic_good, INFO) + << "Cancel stream " << frame_header.stream_id + << (call_initiator.has_value() ? " (active)" + : " (not found)"); return If( call_initiator.has_value(), [&call_initiator]() { @@ -410,6 +408,8 @@ void ChaoticGoodServerTransport::AbortWithError() { absl::optional ChaoticGoodServerTransport::LookupStream( uint32_t stream_id) { + GRPC_TRACE_LOG(chaotic_good, INFO) + << "CHAOTIC_GOOD " << this << " LookupStream " << stream_id; MutexLock lock(&mu_); auto it = stream_map_.find(stream_id); if (it == stream_map_.end()) return absl::nullopt; @@ -418,6 +418,8 @@ absl::optional ChaoticGoodServerTransport::LookupStream( absl::optional ChaoticGoodServerTransport::ExtractStream( uint32_t stream_id) { + GRPC_TRACE_LOG(chaotic_good, INFO) + << "CHAOTIC_GOOD " << this << " ExtractStream " << stream_id; MutexLock lock(&mu_); auto it = stream_map_.find(stream_id); if (it == stream_map_.end()) return absl::nullopt; @@ -428,6 +430,8 @@ absl::optional ChaoticGoodServerTransport::ExtractStream( absl::Status ChaoticGoodServerTransport::NewStream( uint32_t stream_id, CallInitiator call_initiator) { + GRPC_TRACE_LOG(chaotic_good, INFO) + << "CHAOTIC_GOOD " << this << " NewStream " << stream_id; MutexLock lock(&mu_); auto it = stream_map_.find(stream_id); if (it != stream_map_.end()) { @@ -437,10 +441,20 @@ absl::Status ChaoticGoodServerTransport::NewStream( return absl::InternalError("Stream id is not increasing"); } stream_map_.emplace(stream_id, call_initiator); - call_initiator.OnDone([this, stream_id]() { - MutexLock lock(&mu_); - stream_map_.erase(stream_id); - }); + call_initiator.OnDone( + [self = RefAsSubclass(), stream_id](bool) { + GRPC_TRACE_LOG(chaotic_good, INFO) + << "CHAOTIC_GOOD " << self.get() << " OnDone " << stream_id; + absl::optional call_initiator = + self->ExtractStream(stream_id); + if (call_initiator.has_value()) { + auto c = std::move(*call_initiator); + c.SpawnInfallible("cancel", [c]() mutable { + c.Cancel(); + return Empty{}; + }); + } + }); return absl::OkStatus(); } diff --git a/src/core/ext/transport/chaotic_good/server_transport.h b/src/core/ext/transport/chaotic_good/server_transport.h index 2cd1fb5974c..e343b245506 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.h +++ b/src/core/ext/transport/chaotic_good/server_transport.h @@ -131,10 +131,9 @@ class ChaoticGoodServerTransport final : public ServerTransport { FrameHeader frame_header, BufferPair buffers, ChaoticGoodTransport& transport); auto MaybePushFragmentIntoCall(absl::optional call_initiator, - absl::Status error, ClientFragmentFrame frame, - uint32_t stream_id); + absl::Status error, ClientFragmentFrame frame); auto PushFragmentIntoCall(CallInitiator call_initiator, - ClientFragmentFrame frame, uint32_t stream_id); + ClientFragmentFrame frame); RefCountedPtr call_destination_; const RefCountedPtr call_arena_allocator_; diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index c2db94ee4d7..d7a359db68e 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -337,9 +337,9 @@ void Call::HandleCompressionAlgorithmDisabled( void Call::UpdateDeadline(Timestamp deadline) { ReleasableMutexLock lock(&deadline_mu_); if (GRPC_TRACE_FLAG_ENABLED(call)) { - VLOG(2) << "[call " << this - << "] UpdateDeadline from=" << deadline_.ToString() - << " to=" << deadline.ToString(); + LOG(INFO) << "[call " << this + << "] UpdateDeadline from=" << deadline_.ToString() + << " to=" << deadline.ToString(); } if (deadline >= deadline_) return; if (deadline < Timestamp::Now()) { diff --git a/src/core/lib/transport/call_spine.h b/src/core/lib/transport/call_spine.h index 4b3d9c6993e..580d94067b9 100644 --- a/src/core/lib/transport/call_spine.h +++ b/src/core/lib/transport/call_spine.h @@ -51,23 +51,24 @@ class CallSpine final : public Party { std::move(client_initial_metadata), std::move(arena))); } - ~CallSpine() override {} + ~CallSpine() override { CallOnDone(true); } CallFilters& call_filters() { return call_filters_; } // Add a callback to be called when server trailing metadata is received. - void OnDone(absl::AnyInvocable fn) { + void OnDone(absl::AnyInvocable fn) { if (on_done_ == nullptr) { on_done_ = std::move(fn); return; } - on_done_ = [first = std::move(fn), next = std::move(on_done_)]() mutable { - first(); - next(); + on_done_ = [first = std::move(fn), + next = std::move(on_done_)](bool cancelled) mutable { + first(cancelled); + next(cancelled); }; } - void CallOnDone() { - if (on_done_ != nullptr) std::exchange(on_done_, nullptr)(); + void CallOnDone(bool cancelled) { + if (on_done_ != nullptr) std::exchange(on_done_, nullptr)(cancelled); } auto PullServerInitialMetadata() { @@ -75,7 +76,12 @@ class CallSpine final : public Party { } auto PullServerTrailingMetadata() { - return call_filters().PullServerTrailingMetadata(); + return Map( + call_filters().PullServerTrailingMetadata(), + [this](ServerMetadataHandle result) { + CallOnDone(result->get(GrpcCallWasCancelled()).value_or(false)); + return result; + }); } auto PushClientToServerMessage(MessageHandle message) { @@ -190,7 +196,7 @@ class CallSpine final : public Party { // Call filters/pipes part of the spine CallFilters call_filters_; - absl::AnyInvocable on_done_{nullptr}; + absl::AnyInvocable on_done_{nullptr}; }; class CallInitiator { @@ -227,7 +233,9 @@ class CallInitiator { spine_->PushServerTrailingMetadata(std::move(status)); } - void OnDone(absl::AnyInvocable fn) { spine_->OnDone(std::move(fn)); } + void OnDone(absl::AnyInvocable fn) { + spine_->OnDone(std::move(fn)); + } template void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) { @@ -274,7 +282,9 @@ class CallHandler { spine_->PushServerTrailingMetadata(std::move(status)); } - void OnDone(absl::AnyInvocable fn) { spine_->OnDone(std::move(fn)); } + void OnDone(absl::AnyInvocable fn) { + spine_->OnDone(std::move(fn)); + } template auto CancelIfFails(Promise promise) { @@ -327,7 +337,9 @@ class UnstartedCallHandler { spine_->PushServerTrailingMetadata(std::move(status)); } - void OnDone(absl::AnyInvocable fn) { spine_->OnDone(std::move(fn)); } + void OnDone(absl::AnyInvocable fn) { + spine_->OnDone(std::move(fn)); + } template auto CancelIfFails(Promise promise) { diff --git a/test/core/end2end/tests/cancel_after_client_done.cc b/test/core/end2end/tests/cancel_after_client_done.cc index d4a0f8d0ccf..c1255ae7587 100644 --- a/test/core/end2end/tests/cancel_after_client_done.cc +++ b/test/core/end2end/tests/cancel_after_client_done.cc @@ -67,12 +67,10 @@ void CancelAfterClientDone( } CORE_END2END_TEST(CoreEnd2endTest, CancelAfterClientDone) { - SKIP_IF_V3(); CancelAfterClientDone(*this, std::make_unique()); } CORE_END2END_TEST(CoreDeadlineTest, DeadlineAfterClientDone) { - SKIP_IF_V3(); CancelAfterClientDone(*this, std::make_unique()); }