Revert "[chaotic-good] Fix some ref leaks (#37865)"

This reverts commit e8aa408bba.
pull/37882/head
Craig Tiller 2 months ago
parent 4cf2759096
commit 6e666cd985
  1. 88
      src/core/ext/transport/chaotic_good/server_transport.cc
  2. 4
      src/core/ext/transport/chaotic_good/server_transport.h
  3. 103
      src/core/server/server.cc

@ -125,13 +125,14 @@ auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall(
}
auto ChaoticGoodServerTransport::SendFragment(
ServerFragmentFrame frame, MpscSender<ServerFrame> outgoing_frames) {
ServerFragmentFrame frame, MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: SendFragment: frame=" << frame.ToString();
// Capture the call_initiator to ensure the underlying call spine is alive
// until the outgoing_frames.Send promise completes.
return Map(outgoing_frames.Send(std::move(frame)),
[](bool success) -> absl::Status {
[call_initiator](bool success) -> absl::Status {
if (!success) {
// Failed to send outgoing frame.
return absl::UnavailableError("Transport closed.");
@ -145,26 +146,27 @@ auto ChaoticGoodServerTransport::SendCallBody(
CallInitiator call_initiator) {
// Continuously send client frame with client to server
// messages.
return ForEach(OutgoingMessages(call_initiator),
// Capture the call_initator to ensure the underlying call
// spine is alive until the SendFragment promise completes.
[stream_id, outgoing_frames, aligned_bytes = aligned_bytes_](
MessageHandle message) mutable {
ServerFragmentFrame frame;
// Construct frame header (flags, header_length
// and trailer_length will be added in
// serialization).
const uint32_t message_length = message->payload()->Length();
const uint32_t padding =
message_length % aligned_bytes == 0
? 0
: aligned_bytes - (message_length % aligned_bytes);
CHECK_EQ((message_length + padding) % aligned_bytes, 0u);
frame.message = FragmentMessage(std::move(message), padding,
message_length);
frame.stream_id = stream_id;
return SendFragment(std::move(frame), outgoing_frames);
});
return ForEach(
OutgoingMessages(call_initiator),
// Capture the call_initator to ensure the underlying call
// spine is alive until the SendFragment promise completes.
[stream_id, outgoing_frames, call_initiator,
aligned_bytes = aligned_bytes_](MessageHandle message) mutable {
ServerFragmentFrame frame;
// Construct frame header (flags, header_length
// and trailer_length will be added in
// serialization).
const uint32_t message_length = message->payload()->Length();
const uint32_t padding =
message_length % aligned_bytes == 0
? 0
: aligned_bytes - message_length % aligned_bytes;
CHECK_EQ((message_length + padding) % aligned_bytes, 0u);
frame.message =
FragmentMessage(std::move(message), padding, message_length);
frame.stream_id = stream_id;
return SendFragment(std::move(frame), outgoing_frames, call_initiator);
});
}
auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody(
@ -185,7 +187,8 @@ auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody(
frame.headers = std::move(*md);
frame.stream_id = stream_id;
return TrySeq(
SendFragment(std::move(frame), outgoing_frames),
SendFragment(std::move(frame), outgoing_frames,
call_initiator),
SendCallBody(stream_id, outgoing_frames, call_initiator));
},
[]() { return absl::OkStatus(); });
@ -206,11 +209,15 @@ auto ChaoticGoodServerTransport::CallOutboundLoop(
return Empty{};
}),
call_initiator.PullServerTrailingMetadata(),
[stream_id, outgoing_frames](ServerMetadataHandle md) mutable {
// Capture the call_initator to ensure the underlying call_spine
// is alive until the SendFragment promise completes.
[stream_id, outgoing_frames,
call_initiator](ServerMetadataHandle md) mutable {
ServerFragmentFrame frame;
frame.trailers = std::move(md);
frame.stream_id = stream_id;
return SendFragment(std::move(frame), outgoing_frames);
return SendFragment(std::move(frame), outgoing_frames,
call_initiator);
}));
}
@ -302,11 +309,10 @@ auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) {
call_initiator.has_value(),
[&call_initiator]() {
auto c = std::move(*call_initiator);
return c.SpawnWaitable("cancel_from_read",
[c]() mutable {
c.Cancel();
return absl::OkStatus();
});
return c.SpawnWaitable("cancel", [c]() mutable {
c.Cancel();
return absl::OkStatus();
});
},
[]() -> absl::Status { return absl::OkStatus(); });
}),
@ -391,8 +397,6 @@ void ChaoticGoodServerTransport::AbortWithError() {
// Close all the available pipes.
outgoing_frames_.MarkClosed();
ReleasableMutexLock lock(&mu_);
if (aborted_with_error_) return;
aborted_with_error_ = true;
StreamMap stream_map = std::move(stream_map_);
stream_map_.clear();
state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN,
@ -401,19 +405,18 @@ void ChaoticGoodServerTransport::AbortWithError() {
lock.Release();
for (const auto& pair : stream_map) {
auto call_initiator = pair.second;
call_initiator.SpawnInfallible("cancel_from_transport_closed",
[call_initiator]() mutable {
call_initiator.Cancel();
return Empty{};
});
call_initiator.SpawnInfallible("cancel", [call_initiator]() mutable {
call_initiator.Cancel();
return Empty{};
});
}
}
absl::optional<CallInitiator> ChaoticGoodServerTransport::LookupStream(
uint32_t stream_id) {
MutexLock lock(&mu_);
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD " << this << " LookupStream " << stream_id;
MutexLock lock(&mu_);
auto it = stream_map_.find(stream_id);
if (it == stream_map_.end()) return absl::nullopt;
return it->second;
@ -421,9 +424,9 @@ absl::optional<CallInitiator> ChaoticGoodServerTransport::LookupStream(
absl::optional<CallInitiator> ChaoticGoodServerTransport::ExtractStream(
uint32_t stream_id) {
MutexLock lock(&mu_);
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD " << this << " ExtractStream " << stream_id;
MutexLock lock(&mu_);
auto it = stream_map_.find(stream_id);
if (it == stream_map_.end()) return absl::nullopt;
auto r = std::move(it->second);
@ -433,12 +436,9 @@ absl::optional<CallInitiator> ChaoticGoodServerTransport::ExtractStream(
absl::Status ChaoticGoodServerTransport::NewStream(
uint32_t stream_id, CallInitiator call_initiator) {
MutexLock lock(&mu_);
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD " << this << " NewStream " << stream_id;
if (aborted_with_error_) {
return absl::UnavailableError("Transport closed");
}
MutexLock lock(&mu_);
auto it = stream_map_.find(stream_id);
if (it != stream_map_.end()) {
return absl::InternalError("Stream already exists");
@ -454,7 +454,7 @@ absl::Status ChaoticGoodServerTransport::NewStream(
self->ExtractStream(stream_id);
if (call_initiator.has_value()) {
auto c = std::move(*call_initiator);
c.SpawnInfallible("cancel_from_on_done", [c]() mutable {
c.SpawnInfallible("cancel", [c]() mutable {
c.Cancel();
return Empty{};
});

@ -109,7 +109,8 @@ class ChaoticGoodServerTransport final : public ServerTransport {
auto SendCallBody(uint32_t stream_id, MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator);
static auto SendFragment(ServerFragmentFrame frame,
MpscSender<ServerFrame> outgoing_frames);
MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator);
auto CallOutboundLoop(uint32_t stream_id, CallInitiator call_initiator);
auto OnTransportActivityDone(absl::string_view activity);
auto TransportReadLoop(RefCountedPtr<ChaoticGoodTransport> transport);
@ -143,7 +144,6 @@ class ChaoticGoodServerTransport final : public ServerTransport {
// Map of stream incoming server frames, key is stream_id.
StreamMap stream_map_ ABSL_GUARDED_BY(mu_);
uint32_t last_seen_new_stream_id_ = 0;
bool aborted_with_error_ ABSL_GUARDED_BY(mu_) = false;
RefCountedPtr<Party> party_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(mu_){
"chaotic_good_server", GRPC_CHANNEL_READY};

@ -813,60 +813,55 @@ absl::StatusOr<ClientMetadataHandle> CheckClientMetadata(
} // namespace
auto Server::MatchAndPublishCall(CallHandler call_handler) {
call_handler.SpawnGuardedUntilCallCompletes(
"request_matcher", [this, call_handler]() mutable {
return TrySeq(
// Wait for initial metadata to pass through all filters
Map(call_handler.PullClientInitialMetadata(), CheckClientMetadata),
// Match request with requested call
[this, call_handler](ClientMetadataHandle md) mutable {
auto* registered_method = static_cast<RegisteredMethod*>(
md->get(GrpcRegisteredMethod()).value_or(nullptr));
RequestMatcherInterface* rm;
grpc_server_register_method_payload_handling payload_handling =
GRPC_SRM_PAYLOAD_NONE;
if (registered_method == nullptr) {
rm = unregistered_request_matcher_.get();
} else {
payload_handling = registered_method->payload_handling;
rm = registered_method->matcher.get();
}
auto maybe_read_first_message = If(
payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
[call_handler]() mutable {
return call_handler.PullMessage();
},
[]() -> ValueOrFailure<absl::optional<MessageHandle>> {
return ValueOrFailure<absl::optional<MessageHandle>>(
absl::nullopt);
});
return TryJoin<absl::StatusOr>(
std::move(maybe_read_first_message), rm->MatchRequest(0),
[md = std::move(md)]() mutable {
return ValueOrFailure<ClientMetadataHandle>(std::move(md));
});
},
// Publish call to cq
[call_handler,
this](std::tuple<absl::optional<MessageHandle>,
RequestMatcherInterface::MatchResult,
ClientMetadataHandle>
r) {
RequestMatcherInterface::MatchResult& mr = std::get<1>(r);
auto md = std::move(std::get<2>(r));
auto* rc = mr.TakeCall();
rc->Complete(std::move(std::get<0>(r)), *md);
grpc_call* call =
MakeServerCall(call_handler, std::move(md), this,
rc->cq_bound_to_call, rc->initial_metadata);
*rc->call = call;
return Map(
WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()),
[rc = std::unique_ptr<RequestedCall>(rc)](Empty) {
return absl::OkStatus();
});
});
});
call_handler.SpawnGuarded("request_matcher", [this, call_handler]() mutable {
return TrySeq(
// Wait for initial metadata to pass through all filters
Map(call_handler.PullClientInitialMetadata(), CheckClientMetadata),
// Match request with requested call
[this, call_handler](ClientMetadataHandle md) mutable {
auto* registered_method = static_cast<RegisteredMethod*>(
md->get(GrpcRegisteredMethod()).value_or(nullptr));
RequestMatcherInterface* rm;
grpc_server_register_method_payload_handling payload_handling =
GRPC_SRM_PAYLOAD_NONE;
if (registered_method == nullptr) {
rm = unregistered_request_matcher_.get();
} else {
payload_handling = registered_method->payload_handling;
rm = registered_method->matcher.get();
}
auto maybe_read_first_message = If(
payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
[call_handler]() mutable { return call_handler.PullMessage(); },
[]() -> ValueOrFailure<absl::optional<MessageHandle>> {
return ValueOrFailure<absl::optional<MessageHandle>>(
absl::nullopt);
});
return TryJoin<absl::StatusOr>(
std::move(maybe_read_first_message), rm->MatchRequest(0),
[md = std::move(md)]() mutable {
return ValueOrFailure<ClientMetadataHandle>(std::move(md));
});
},
// Publish call to cq
[call_handler, this](std::tuple<absl::optional<MessageHandle>,
RequestMatcherInterface::MatchResult,
ClientMetadataHandle>
r) {
RequestMatcherInterface::MatchResult& mr = std::get<1>(r);
auto md = std::move(std::get<2>(r));
auto* rc = mr.TakeCall();
rc->Complete(std::move(std::get<0>(r)), *md);
grpc_call* call =
MakeServerCall(call_handler, std::move(md), this,
rc->cq_bound_to_call, rc->initial_metadata);
*rc->call = call;
return Map(WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()),
[rc = std::unique_ptr<RequestedCall>(rc)](Empty) {
return absl::OkStatus();
});
});
});
}
absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>

Loading…
Cancel
Save