From f9a4ee1019afdb4a44dae50256f24bd470e55037 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 10 Oct 2024 14:25:12 -0700 Subject: [PATCH] Revert "[chaotic-good] Fix some ref leaks (#37865)" (#37882) This reverts commit e8aa408bba78552d995ec8d953690b42712c808b. It looks like this increased flakiness... I'm going to roll forward the separate pieces after this. Closes #37882 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37882 from ctiller:flake-fightas-17 6e666cd9856381c77f586cb833bbbbd52bb7834f PiperOrigin-RevId: 684577945 --- .../chaotic_good/server_transport.cc | 88 +++++++-------- .../transport/chaotic_good/server_transport.h | 4 +- src/core/server/server.cc | 103 +++++++++--------- 3 files changed, 95 insertions(+), 100 deletions(-) diff --git a/src/core/ext/transport/chaotic_good/server_transport.cc b/src/core/ext/transport/chaotic_good/server_transport.cc index 42c2ddb501e..b166d79380f 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.cc +++ b/src/core/ext/transport/chaotic_good/server_transport.cc @@ -125,13 +125,14 @@ auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall( } auto ChaoticGoodServerTransport::SendFragment( - ServerFragmentFrame frame, MpscSender outgoing_frames) { + ServerFragmentFrame frame, MpscSender outgoing_frames, + CallInitiator call_initiator) { GRPC_TRACE_LOG(chaotic_good, INFO) << "CHAOTIC_GOOD: SendFragment: frame=" << frame.ToString(); // Capture the call_initiator to ensure the underlying call spine is alive // until the outgoing_frames.Send promise completes. return Map(outgoing_frames.Send(std::move(frame)), - [](bool success) -> absl::Status { + [call_initiator](bool success) -> absl::Status { if (!success) { // Failed to send outgoing frame. return absl::UnavailableError("Transport closed."); @@ -145,26 +146,27 @@ auto ChaoticGoodServerTransport::SendCallBody( CallInitiator call_initiator) { // Continuously send client frame with client to server // messages. - return ForEach(OutgoingMessages(call_initiator), - // Capture the call_initator to ensure the underlying call - // spine is alive until the SendFragment promise completes. - [stream_id, outgoing_frames, aligned_bytes = aligned_bytes_]( - MessageHandle message) mutable { - ServerFragmentFrame frame; - // Construct frame header (flags, header_length - // and trailer_length will be added in - // serialization). - const uint32_t message_length = message->payload()->Length(); - const uint32_t padding = - message_length % aligned_bytes == 0 - ? 0 - : aligned_bytes - (message_length % aligned_bytes); - CHECK_EQ((message_length + padding) % aligned_bytes, 0u); - frame.message = FragmentMessage(std::move(message), padding, - message_length); - frame.stream_id = stream_id; - return SendFragment(std::move(frame), outgoing_frames); - }); + return ForEach( + OutgoingMessages(call_initiator), + // Capture the call_initator to ensure the underlying call + // spine is alive until the SendFragment promise completes. + [stream_id, outgoing_frames, call_initiator, + aligned_bytes = aligned_bytes_](MessageHandle message) mutable { + ServerFragmentFrame frame; + // Construct frame header (flags, header_length + // and trailer_length will be added in + // serialization). + const uint32_t message_length = message->payload()->Length(); + const uint32_t padding = + message_length % aligned_bytes == 0 + ? 0 + : aligned_bytes - message_length % aligned_bytes; + CHECK_EQ((message_length + padding) % aligned_bytes, 0u); + frame.message = + FragmentMessage(std::move(message), padding, message_length); + frame.stream_id = stream_id; + return SendFragment(std::move(frame), outgoing_frames, call_initiator); + }); } auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody( @@ -185,7 +187,8 @@ auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody( frame.headers = std::move(*md); frame.stream_id = stream_id; return TrySeq( - SendFragment(std::move(frame), outgoing_frames), + SendFragment(std::move(frame), outgoing_frames, + call_initiator), SendCallBody(stream_id, outgoing_frames, call_initiator)); }, []() { return absl::OkStatus(); }); @@ -206,11 +209,15 @@ auto ChaoticGoodServerTransport::CallOutboundLoop( return Empty{}; }), call_initiator.PullServerTrailingMetadata(), - [stream_id, outgoing_frames](ServerMetadataHandle md) mutable { + // Capture the call_initator to ensure the underlying call_spine + // is alive until the SendFragment promise completes. + [stream_id, outgoing_frames, + call_initiator](ServerMetadataHandle md) mutable { ServerFragmentFrame frame; frame.trailers = std::move(md); frame.stream_id = stream_id; - return SendFragment(std::move(frame), outgoing_frames); + return SendFragment(std::move(frame), outgoing_frames, + call_initiator); })); } @@ -302,11 +309,10 @@ auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) { call_initiator.has_value(), [&call_initiator]() { auto c = std::move(*call_initiator); - return c.SpawnWaitable("cancel_from_read", - [c]() mutable { - c.Cancel(); - return absl::OkStatus(); - }); + return c.SpawnWaitable("cancel", [c]() mutable { + c.Cancel(); + return absl::OkStatus(); + }); }, []() -> absl::Status { return absl::OkStatus(); }); }), @@ -391,8 +397,6 @@ void ChaoticGoodServerTransport::AbortWithError() { // Close all the available pipes. outgoing_frames_.MarkClosed(); ReleasableMutexLock lock(&mu_); - if (aborted_with_error_) return; - aborted_with_error_ = true; StreamMap stream_map = std::move(stream_map_); stream_map_.clear(); state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN, @@ -401,19 +405,18 @@ void ChaoticGoodServerTransport::AbortWithError() { lock.Release(); for (const auto& pair : stream_map) { auto call_initiator = pair.second; - call_initiator.SpawnInfallible("cancel_from_transport_closed", - [call_initiator]() mutable { - call_initiator.Cancel(); - return Empty{}; - }); + call_initiator.SpawnInfallible("cancel", [call_initiator]() mutable { + call_initiator.Cancel(); + return Empty{}; + }); } } absl::optional ChaoticGoodServerTransport::LookupStream( uint32_t stream_id) { - MutexLock lock(&mu_); GRPC_TRACE_LOG(chaotic_good, INFO) << "CHAOTIC_GOOD " << this << " LookupStream " << stream_id; + MutexLock lock(&mu_); auto it = stream_map_.find(stream_id); if (it == stream_map_.end()) return absl::nullopt; return it->second; @@ -421,9 +424,9 @@ absl::optional ChaoticGoodServerTransport::LookupStream( absl::optional ChaoticGoodServerTransport::ExtractStream( uint32_t stream_id) { - MutexLock lock(&mu_); GRPC_TRACE_LOG(chaotic_good, INFO) << "CHAOTIC_GOOD " << this << " ExtractStream " << stream_id; + MutexLock lock(&mu_); auto it = stream_map_.find(stream_id); if (it == stream_map_.end()) return absl::nullopt; auto r = std::move(it->second); @@ -433,12 +436,9 @@ absl::optional ChaoticGoodServerTransport::ExtractStream( absl::Status ChaoticGoodServerTransport::NewStream( uint32_t stream_id, CallInitiator call_initiator) { - MutexLock lock(&mu_); GRPC_TRACE_LOG(chaotic_good, INFO) << "CHAOTIC_GOOD " << this << " NewStream " << stream_id; - if (aborted_with_error_) { - return absl::UnavailableError("Transport closed"); - } + MutexLock lock(&mu_); auto it = stream_map_.find(stream_id); if (it != stream_map_.end()) { return absl::InternalError("Stream already exists"); @@ -454,7 +454,7 @@ absl::Status ChaoticGoodServerTransport::NewStream( self->ExtractStream(stream_id); if (call_initiator.has_value()) { auto c = std::move(*call_initiator); - c.SpawnInfallible("cancel_from_on_done", [c]() mutable { + c.SpawnInfallible("cancel", [c]() mutable { c.Cancel(); return Empty{}; }); diff --git a/src/core/ext/transport/chaotic_good/server_transport.h b/src/core/ext/transport/chaotic_good/server_transport.h index 65929533f68..49fd5ae015f 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.h +++ b/src/core/ext/transport/chaotic_good/server_transport.h @@ -109,7 +109,8 @@ class ChaoticGoodServerTransport final : public ServerTransport { auto SendCallBody(uint32_t stream_id, MpscSender outgoing_frames, CallInitiator call_initiator); static auto SendFragment(ServerFragmentFrame frame, - MpscSender outgoing_frames); + MpscSender outgoing_frames, + CallInitiator call_initiator); auto CallOutboundLoop(uint32_t stream_id, CallInitiator call_initiator); auto OnTransportActivityDone(absl::string_view activity); auto TransportReadLoop(RefCountedPtr transport); @@ -143,7 +144,6 @@ class ChaoticGoodServerTransport final : public ServerTransport { // Map of stream incoming server frames, key is stream_id. StreamMap stream_map_ ABSL_GUARDED_BY(mu_); uint32_t last_seen_new_stream_id_ = 0; - bool aborted_with_error_ ABSL_GUARDED_BY(mu_) = false; RefCountedPtr party_; ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(mu_){ "chaotic_good_server", GRPC_CHANNEL_READY}; diff --git a/src/core/server/server.cc b/src/core/server/server.cc index 87bb72a0dee..5ec784a5c96 100644 --- a/src/core/server/server.cc +++ b/src/core/server/server.cc @@ -813,60 +813,55 @@ absl::StatusOr CheckClientMetadata( } // namespace auto Server::MatchAndPublishCall(CallHandler call_handler) { - call_handler.SpawnGuardedUntilCallCompletes( - "request_matcher", [this, call_handler]() mutable { - return TrySeq( - // Wait for initial metadata to pass through all filters - Map(call_handler.PullClientInitialMetadata(), CheckClientMetadata), - // Match request with requested call - [this, call_handler](ClientMetadataHandle md) mutable { - auto* registered_method = static_cast( - md->get(GrpcRegisteredMethod()).value_or(nullptr)); - RequestMatcherInterface* rm; - grpc_server_register_method_payload_handling payload_handling = - GRPC_SRM_PAYLOAD_NONE; - if (registered_method == nullptr) { - rm = unregistered_request_matcher_.get(); - } else { - payload_handling = registered_method->payload_handling; - rm = registered_method->matcher.get(); - } - auto maybe_read_first_message = If( - payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, - [call_handler]() mutable { - return call_handler.PullMessage(); - }, - []() -> ValueOrFailure> { - return ValueOrFailure>( - absl::nullopt); - }); - return TryJoin( - std::move(maybe_read_first_message), rm->MatchRequest(0), - [md = std::move(md)]() mutable { - return ValueOrFailure(std::move(md)); - }); - }, - // Publish call to cq - [call_handler, - this](std::tuple, - RequestMatcherInterface::MatchResult, - ClientMetadataHandle> - r) { - RequestMatcherInterface::MatchResult& mr = std::get<1>(r); - auto md = std::move(std::get<2>(r)); - auto* rc = mr.TakeCall(); - rc->Complete(std::move(std::get<0>(r)), *md); - grpc_call* call = - MakeServerCall(call_handler, std::move(md), this, - rc->cq_bound_to_call, rc->initial_metadata); - *rc->call = call; - return Map( - WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()), - [rc = std::unique_ptr(rc)](Empty) { - return absl::OkStatus(); - }); - }); - }); + call_handler.SpawnGuarded("request_matcher", [this, call_handler]() mutable { + return TrySeq( + // Wait for initial metadata to pass through all filters + Map(call_handler.PullClientInitialMetadata(), CheckClientMetadata), + // Match request with requested call + [this, call_handler](ClientMetadataHandle md) mutable { + auto* registered_method = static_cast( + md->get(GrpcRegisteredMethod()).value_or(nullptr)); + RequestMatcherInterface* rm; + grpc_server_register_method_payload_handling payload_handling = + GRPC_SRM_PAYLOAD_NONE; + if (registered_method == nullptr) { + rm = unregistered_request_matcher_.get(); + } else { + payload_handling = registered_method->payload_handling; + rm = registered_method->matcher.get(); + } + auto maybe_read_first_message = If( + payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, + [call_handler]() mutable { return call_handler.PullMessage(); }, + []() -> ValueOrFailure> { + return ValueOrFailure>( + absl::nullopt); + }); + return TryJoin( + std::move(maybe_read_first_message), rm->MatchRequest(0), + [md = std::move(md)]() mutable { + return ValueOrFailure(std::move(md)); + }); + }, + // Publish call to cq + [call_handler, this](std::tuple, + RequestMatcherInterface::MatchResult, + ClientMetadataHandle> + r) { + RequestMatcherInterface::MatchResult& mr = std::get<1>(r); + auto md = std::move(std::get<2>(r)); + auto* rc = mr.TakeCall(); + rc->Complete(std::move(std::get<0>(r)), *md); + grpc_call* call = + MakeServerCall(call_handler, std::move(md), this, + rc->cq_bound_to_call, rc->initial_metadata); + *rc->call = call; + return Map(WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()), + [rc = std::unique_ptr(rc)](Empty) { + return absl::OkStatus(); + }); + }); + }); } absl::StatusOr>