From d8f0f64ee61ea57bb07a33491c10702ae645b24f Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Thu, 14 Mar 2024 15:20:19 -0700 Subject: [PATCH] [chaotic-good] Remove ref to server side call early (#36090) Closes #36090 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36090 from Vignesh2208:chaotic-good-fix 984ec1cdc637569b847936f6769cf93ff0122dfe PiperOrigin-RevId: 615920404 --- .../chaotic_good/server_transport.cc | 108 ++++++++++++------ .../transport/chaotic_good/server_transport.h | 8 +- src/core/lib/transport/call_spine.h | 2 + 3 files changed, 77 insertions(+), 41 deletions(-) diff --git a/src/core/ext/transport/chaotic_good/server_transport.cc b/src/core/ext/transport/chaotic_good/server_transport.cc index c6e07ffd151..50e6ee3f9dd 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.cc +++ b/src/core/ext/transport/chaotic_good/server_transport.cc @@ -71,7 +71,8 @@ auto ChaoticGoodServerTransport::TransportWriteLoop( } auto ChaoticGoodServerTransport::PushFragmentIntoCall( - CallInitiator call_initiator, ClientFragmentFrame frame) { + CallInitiator call_initiator, ClientFragmentFrame frame, + uint32_t stream_id) { auto& headers = frame.headers; return TrySeq( If( @@ -88,44 +89,61 @@ auto ChaoticGoodServerTransport::PushFragmentIntoCall( }, []() -> StatusFlag { return Success{}; }); }, - [call_initiator, - end_of_stream = frame.end_of_stream]() mutable -> StatusFlag { - if (end_of_stream) call_initiator.FinishSends(); + [this, call_initiator, end_of_stream = frame.end_of_stream, + stream_id]() mutable -> StatusFlag { + if (end_of_stream) { + call_initiator.FinishSends(); + // We have received end_of_stream. It is now safe to remove the call + // from the stream map. + MutexLock lock(&mu_); + stream_map_.erase(stream_id); + } return Success{}; }); } auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall( absl::optional call_initiator, absl::Status error, - ClientFragmentFrame frame) { + ClientFragmentFrame frame, uint32_t stream_id) { return If( call_initiator.has_value() && error.ok(), - [this, &call_initiator, &frame]() { + [this, &call_initiator, &frame, &stream_id]() { return Map( call_initiator->SpawnWaitable( "push-fragment", - [call_initiator, frame = std::move(frame), this]() mutable { - return call_initiator->CancelIfFails( - PushFragmentIntoCall(*call_initiator, std::move(frame))); + [call_initiator, frame = std::move(frame), stream_id, + this]() mutable { + return call_initiator->CancelIfFails(PushFragmentIntoCall( + *call_initiator, std::move(frame), stream_id)); }), [](StatusFlag status) { return StatusCast(status); }); }, [&error, &frame]() { - gpr_log(GPR_INFO, - "CHAOTIC_GOOD: Cannot pass frame to stream. Error:%s Frame:%s", - error.ToString().c_str(), frame.ToString().c_str()); + // EOF frames may arrive after the call_initiator's OnDone callback + // has been invoked. In that case, the call_initiator would have + // already been removed from the stream_map and hence the EOF frame + // cannot be pushed into the call. No need to log such frames. + if (!frame.end_of_stream) { + gpr_log( + GPR_INFO, + "CHAOTIC_GOOD: Cannot pass frame to stream. Error:%s Frame:%s", + error.ToString().c_str(), frame.ToString().c_str()); + } return Immediate(std::move(error)); }); } auto ChaoticGoodServerTransport::SendFragment( - ServerFragmentFrame frame, MpscSender outgoing_frames) { + ServerFragmentFrame frame, MpscSender outgoing_frames, + CallInitiator call_initiator) { if (grpc_chaotic_good_trace.enabled()) { gpr_log(GPR_INFO, "CHAOTIC_GOOD: SendFragment: frame=%s", frame.ToString().c_str()); } + // Capture the call_initiator to ensure the underlying call spine is alive + // until the outgoing_frames.Send promise completes. return Map(outgoing_frames.Send(std::move(frame)), - [](bool success) -> absl::Status { + [call_initiator](bool success) -> absl::Status { if (!success) { // Failed to send outgoing frame. return absl::UnavailableError("Transport closed."); @@ -139,24 +157,27 @@ auto ChaoticGoodServerTransport::SendCallBody( CallInitiator call_initiator) { // Continuously send client frame with client to server // messages. - return ForEach(OutgoingMessages(call_initiator), - [stream_id, outgoing_frames, aligned_bytes = aligned_bytes_]( - MessageHandle message) mutable { - ServerFragmentFrame frame; - // Construct frame header (flags, header_length - // and trailer_length will be added in - // serialization). - const uint32_t message_length = message->payload()->Length(); - const uint32_t padding = - message_length % aligned_bytes == 0 - ? 0 - : aligned_bytes - message_length % aligned_bytes; - GPR_ASSERT((message_length + padding) % aligned_bytes == 0); - frame.message = FragmentMessage(std::move(message), padding, - message_length); - frame.stream_id = stream_id; - return SendFragment(std::move(frame), outgoing_frames); - }); + return ForEach( + OutgoingMessages(call_initiator), + // Capture the call_initator to ensure the underlying call + // spine is alive until the SendFragment promise completes. + [stream_id, outgoing_frames, call_initiator, + aligned_bytes = aligned_bytes_](MessageHandle message) mutable { + ServerFragmentFrame frame; + // Construct frame header (flags, header_length + // and trailer_length will be added in + // serialization). + const uint32_t message_length = message->payload()->Length(); + const uint32_t padding = + message_length % aligned_bytes == 0 + ? 0 + : aligned_bytes - message_length % aligned_bytes; + GPR_ASSERT((message_length + padding) % aligned_bytes == 0); + frame.message = + FragmentMessage(std::move(message), padding, message_length); + frame.stream_id = stream_id; + return SendFragment(std::move(frame), outgoing_frames, call_initiator); + }); } auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody( @@ -179,7 +200,8 @@ auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody( frame.headers = std::move(*md); frame.stream_id = stream_id; return TrySeq( - SendFragment(std::move(frame), outgoing_frames), + SendFragment(std::move(frame), outgoing_frames, + call_initiator), SendCallBody(stream_id, outgoing_frames, call_initiator)); }, []() { return absl::OkStatus(); }); @@ -201,11 +223,15 @@ auto ChaoticGoodServerTransport::CallOutboundLoop( return Empty{}; }), call_initiator.PullServerTrailingMetadata(), - [stream_id, outgoing_frames](ServerMetadataHandle md) mutable { + // Capture the call_initator to ensure the underlying call_spine + // is alive until the SendFragment promise completes. + [stream_id, outgoing_frames, + call_initiator](ServerMetadataHandle md) mutable { ServerFragmentFrame frame; frame.trailers = std::move(md); frame.stream_id = stream_id; - return SendFragment(std::move(frame), outgoing_frames); + return SendFragment(std::move(frame), outgoing_frames, + call_initiator); }); } @@ -247,7 +273,8 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall( } } return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status), - std::move(fragment_frame)); + std::move(fragment_frame), + frame_header.stream_id); } auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToExistingCall( @@ -262,7 +289,8 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToExistingCall( frame_header, std::move(buffers), arena, fragment_frame, FrameLimits{1024 * 1024 * 1024, aligned_bytes_ - 1}); return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status), - std::move(fragment_frame)); + std::move(fragment_frame), + frame_header.stream_id); } auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) { @@ -421,7 +449,11 @@ absl::Status ChaoticGoodServerTransport::NewStream( if (stream_id <= last_seen_new_stream_id_) { return absl::InternalError("Stream id is not increasing"); } - stream_map_.emplace(stream_id, std::move(call_initiator)); + stream_map_.emplace(stream_id, call_initiator); + call_initiator.OnDone([this, stream_id]() { + MutexLock lock(&mu_); + stream_map_.erase(stream_id); + }); return absl::OkStatus(); } diff --git a/src/core/ext/transport/chaotic_good/server_transport.h b/src/core/ext/transport/chaotic_good/server_transport.h index 021975d9f15..385dc6cf0b7 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.h +++ b/src/core/ext/transport/chaotic_good/server_transport.h @@ -115,7 +115,8 @@ class ChaoticGoodServerTransport final : public Transport, auto SendCallBody(uint32_t stream_id, MpscSender outgoing_frames, CallInitiator call_initiator); static auto SendFragment(ServerFragmentFrame frame, - MpscSender outgoing_frames); + MpscSender outgoing_frames, + CallInitiator call_initiator); auto CallOutboundLoop(uint32_t stream_id, CallInitiator call_initiator); auto OnTransportActivityDone(absl::string_view activity); auto TransportReadLoop(RefCountedPtr transport); @@ -133,9 +134,10 @@ class ChaoticGoodServerTransport final : public Transport, FrameHeader frame_header, BufferPair buffers, ChaoticGoodTransport& transport); auto MaybePushFragmentIntoCall(absl::optional call_initiator, - absl::Status error, ClientFragmentFrame frame); + absl::Status error, ClientFragmentFrame frame, + uint32_t stream_id); auto PushFragmentIntoCall(CallInitiator call_initiator, - ClientFragmentFrame frame); + ClientFragmentFrame frame, uint32_t stream_id); Acceptor* acceptor_ = nullptr; InterActivityLatch got_acceptor_; diff --git a/src/core/lib/transport/call_spine.h b/src/core/lib/transport/call_spine.h index 1c38575c8b2..51568b4160c 100644 --- a/src/core/lib/transport/call_spine.h +++ b/src/core/lib/transport/call_spine.h @@ -296,6 +296,8 @@ class CallInitiator { spine_->Cancel(ServerMetadataFromStatus(absl::CancelledError())); } + void OnDone(absl::AnyInvocable fn) { spine_->OnDone(std::move(fn)); } + template void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) { spine_->SpawnGuarded(name, std::move(promise_factory));