[chaotic-good] Remove ref to server side call early (#36090)

Closes #36090

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36090 from Vignesh2208:chaotic-good-fix 984ec1cdc6
PiperOrigin-RevId: 615920404
pull/36129/head
Vignesh Babu 9 months ago committed by Copybara-Service
parent db51a3f69a
commit d8f0f64ee6
  1. 108
      src/core/ext/transport/chaotic_good/server_transport.cc
  2. 8
      src/core/ext/transport/chaotic_good/server_transport.h
  3. 2
      src/core/lib/transport/call_spine.h

@ -71,7 +71,8 @@ auto ChaoticGoodServerTransport::TransportWriteLoop(
}
auto ChaoticGoodServerTransport::PushFragmentIntoCall(
CallInitiator call_initiator, ClientFragmentFrame frame) {
CallInitiator call_initiator, ClientFragmentFrame frame,
uint32_t stream_id) {
auto& headers = frame.headers;
return TrySeq(
If(
@ -88,44 +89,61 @@ auto ChaoticGoodServerTransport::PushFragmentIntoCall(
},
[]() -> StatusFlag { return Success{}; });
},
[call_initiator,
end_of_stream = frame.end_of_stream]() mutable -> StatusFlag {
if (end_of_stream) call_initiator.FinishSends();
[this, call_initiator, end_of_stream = frame.end_of_stream,
stream_id]() mutable -> StatusFlag {
if (end_of_stream) {
call_initiator.FinishSends();
// We have received end_of_stream. It is now safe to remove the call
// from the stream map.
MutexLock lock(&mu_);
stream_map_.erase(stream_id);
}
return Success{};
});
}
auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall(
absl::optional<CallInitiator> call_initiator, absl::Status error,
ClientFragmentFrame frame) {
ClientFragmentFrame frame, uint32_t stream_id) {
return If(
call_initiator.has_value() && error.ok(),
[this, &call_initiator, &frame]() {
[this, &call_initiator, &frame, &stream_id]() {
return Map(
call_initiator->SpawnWaitable(
"push-fragment",
[call_initiator, frame = std::move(frame), this]() mutable {
return call_initiator->CancelIfFails(
PushFragmentIntoCall(*call_initiator, std::move(frame)));
[call_initiator, frame = std::move(frame), stream_id,
this]() mutable {
return call_initiator->CancelIfFails(PushFragmentIntoCall(
*call_initiator, std::move(frame), stream_id));
}),
[](StatusFlag status) { return StatusCast<absl::Status>(status); });
},
[&error, &frame]() {
gpr_log(GPR_INFO,
"CHAOTIC_GOOD: Cannot pass frame to stream. Error:%s Frame:%s",
error.ToString().c_str(), frame.ToString().c_str());
// EOF frames may arrive after the call_initiator's OnDone callback
// has been invoked. In that case, the call_initiator would have
// already been removed from the stream_map and hence the EOF frame
// cannot be pushed into the call. No need to log such frames.
if (!frame.end_of_stream) {
gpr_log(
GPR_INFO,
"CHAOTIC_GOOD: Cannot pass frame to stream. Error:%s Frame:%s",
error.ToString().c_str(), frame.ToString().c_str());
}
return Immediate(std::move(error));
});
}
auto ChaoticGoodServerTransport::SendFragment(
ServerFragmentFrame frame, MpscSender<ServerFrame> outgoing_frames) {
ServerFragmentFrame frame, MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator) {
if (grpc_chaotic_good_trace.enabled()) {
gpr_log(GPR_INFO, "CHAOTIC_GOOD: SendFragment: frame=%s",
frame.ToString().c_str());
}
// Capture the call_initiator to ensure the underlying call spine is alive
// until the outgoing_frames.Send promise completes.
return Map(outgoing_frames.Send(std::move(frame)),
[](bool success) -> absl::Status {
[call_initiator](bool success) -> absl::Status {
if (!success) {
// Failed to send outgoing frame.
return absl::UnavailableError("Transport closed.");
@ -139,24 +157,27 @@ auto ChaoticGoodServerTransport::SendCallBody(
CallInitiator call_initiator) {
// Continuously send client frame with client to server
// messages.
return ForEach(OutgoingMessages(call_initiator),
[stream_id, outgoing_frames, aligned_bytes = aligned_bytes_](
MessageHandle message) mutable {
ServerFragmentFrame frame;
// Construct frame header (flags, header_length
// and trailer_length will be added in
// serialization).
const uint32_t message_length = message->payload()->Length();
const uint32_t padding =
message_length % aligned_bytes == 0
? 0
: aligned_bytes - message_length % aligned_bytes;
GPR_ASSERT((message_length + padding) % aligned_bytes == 0);
frame.message = FragmentMessage(std::move(message), padding,
message_length);
frame.stream_id = stream_id;
return SendFragment(std::move(frame), outgoing_frames);
});
return ForEach(
OutgoingMessages(call_initiator),
// Capture the call_initator to ensure the underlying call
// spine is alive until the SendFragment promise completes.
[stream_id, outgoing_frames, call_initiator,
aligned_bytes = aligned_bytes_](MessageHandle message) mutable {
ServerFragmentFrame frame;
// Construct frame header (flags, header_length
// and trailer_length will be added in
// serialization).
const uint32_t message_length = message->payload()->Length();
const uint32_t padding =
message_length % aligned_bytes == 0
? 0
: aligned_bytes - message_length % aligned_bytes;
GPR_ASSERT((message_length + padding) % aligned_bytes == 0);
frame.message =
FragmentMessage(std::move(message), padding, message_length);
frame.stream_id = stream_id;
return SendFragment(std::move(frame), outgoing_frames, call_initiator);
});
}
auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody(
@ -179,7 +200,8 @@ auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody(
frame.headers = std::move(*md);
frame.stream_id = stream_id;
return TrySeq(
SendFragment(std::move(frame), outgoing_frames),
SendFragment(std::move(frame), outgoing_frames,
call_initiator),
SendCallBody(stream_id, outgoing_frames, call_initiator));
},
[]() { return absl::OkStatus(); });
@ -201,11 +223,15 @@ auto ChaoticGoodServerTransport::CallOutboundLoop(
return Empty{};
}),
call_initiator.PullServerTrailingMetadata(),
[stream_id, outgoing_frames](ServerMetadataHandle md) mutable {
// Capture the call_initator to ensure the underlying call_spine
// is alive until the SendFragment promise completes.
[stream_id, outgoing_frames,
call_initiator](ServerMetadataHandle md) mutable {
ServerFragmentFrame frame;
frame.trailers = std::move(md);
frame.stream_id = stream_id;
return SendFragment(std::move(frame), outgoing_frames);
return SendFragment(std::move(frame), outgoing_frames,
call_initiator);
});
}
@ -247,7 +273,8 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall(
}
}
return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status),
std::move(fragment_frame));
std::move(fragment_frame),
frame_header.stream_id);
}
auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToExistingCall(
@ -262,7 +289,8 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToExistingCall(
frame_header, std::move(buffers), arena, fragment_frame,
FrameLimits{1024 * 1024 * 1024, aligned_bytes_ - 1});
return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status),
std::move(fragment_frame));
std::move(fragment_frame),
frame_header.stream_id);
}
auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) {
@ -421,7 +449,11 @@ absl::Status ChaoticGoodServerTransport::NewStream(
if (stream_id <= last_seen_new_stream_id_) {
return absl::InternalError("Stream id is not increasing");
}
stream_map_.emplace(stream_id, std::move(call_initiator));
stream_map_.emplace(stream_id, call_initiator);
call_initiator.OnDone([this, stream_id]() {
MutexLock lock(&mu_);
stream_map_.erase(stream_id);
});
return absl::OkStatus();
}

@ -115,7 +115,8 @@ class ChaoticGoodServerTransport final : public Transport,
auto SendCallBody(uint32_t stream_id, MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator);
static auto SendFragment(ServerFragmentFrame frame,
MpscSender<ServerFrame> outgoing_frames);
MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator);
auto CallOutboundLoop(uint32_t stream_id, CallInitiator call_initiator);
auto OnTransportActivityDone(absl::string_view activity);
auto TransportReadLoop(RefCountedPtr<ChaoticGoodTransport> transport);
@ -133,9 +134,10 @@ class ChaoticGoodServerTransport final : public Transport,
FrameHeader frame_header, BufferPair buffers,
ChaoticGoodTransport& transport);
auto MaybePushFragmentIntoCall(absl::optional<CallInitiator> call_initiator,
absl::Status error, ClientFragmentFrame frame);
absl::Status error, ClientFragmentFrame frame,
uint32_t stream_id);
auto PushFragmentIntoCall(CallInitiator call_initiator,
ClientFragmentFrame frame);
ClientFragmentFrame frame, uint32_t stream_id);
Acceptor* acceptor_ = nullptr;
InterActivityLatch<void> got_acceptor_;

@ -296,6 +296,8 @@ class CallInitiator {
spine_->Cancel(ServerMetadataFromStatus(absl::CancelledError()));
}
void OnDone(absl::AnyInvocable<void()> fn) { spine_->OnDone(std::move(fn)); }
template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnGuarded(name, std::move(promise_factory));

Loading…
Cancel
Save