From 6dbc0bdb106b3b2c319198194ca7b6f18d88f8ff Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 27 Sep 2024 12:20:36 -0700 Subject: [PATCH] [chaotic-good] Use a party for internal activities (#37078) Allows use of the party <-> party wakeup batching stuff, which reduces threadhops drastically for this transport. Closes #37078 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37078 from ctiller:chaotic-party-3 75c32e6a6467f41e24d3786980a4aed3e8c93a4c PiperOrigin-RevId: 679685211 --- src/core/BUILD | 7 +-- .../chaotic_good/client_transport.cc | 45 ++++++------------- .../transport/chaotic_good/client_transport.h | 6 +-- .../chaotic_good/server_transport.cc | 33 +++++++------- .../transport/chaotic_good/server_transport.h | 3 +- src/core/lib/promise/party.cc | 21 ++++++--- src/core/lib/promise/party.h | 2 + .../chaotic_good/client_transport_test.cc | 12 ++--- .../chaotic_good/mock_promise_endpoint.h | 15 +++++++ .../chaotic_good/server_transport_test.cc | 4 +- 10 files changed, 76 insertions(+), 72 deletions(-) diff --git a/src/core/BUILD b/src/core/BUILD index 13091d6bd38..a7ae31fe639 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -7946,7 +7946,7 @@ grpc_cc_library( "absl/base:core_headers", "absl/container:flat_hash_map", "absl/log:check", - "absl/log:log", + "absl/log", "absl/random", "absl/random:bit_gen_ref", "absl/status", @@ -7957,20 +7957,18 @@ grpc_cc_library( language = "c++", deps = [ "activity", - "all_ok", "arena", "chaotic_good_frame", "chaotic_good_frame_header", "chaotic_good_transport", "context", - "event_engine_wakeup_scheduler", + "event_engine_context", "for_each", "grpc_promise_endpoint", "if", "inter_activity_pipe", "loop", "map", - "match", "memory_quota", "metadata_batch", "mpsc", @@ -7987,7 +7985,6 @@ grpc_cc_library( "//:grpc_base", "//:hpack_encoder", "//:hpack_parser", - "//:promise", "//:ref_counted_ptr", ], ) diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc index 17aff601d45..28af0d5ba98 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.cc +++ b/src/core/ext/transport/chaotic_good/client_transport.cc @@ -36,22 +36,15 @@ #include "src/core/ext/transport/chaotic_good/frame.h" #include "src/core/ext/transport/chaotic_good/frame_header.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" +#include "src/core/lib/event_engine/event_engine_context.h" #include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/promise/activity.h" -#include "src/core/lib/promise/all_ok.h" -#include "src/core/lib/promise/event_engine_wakeup_scheduler.h" #include "src/core/lib/promise/loop.h" #include "src/core/lib/promise/map.h" -#include "src/core/lib/promise/promise.h" -#include "src/core/lib/promise/try_join.h" #include "src/core/lib/promise/try_seq.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/resource_quota/resource_quota.h" -#include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" -#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/promise_endpoint.h" -#include "src/core/util/match.h" #include "src/core/util/ref_counted_ptr.h" namespace grpc_core { @@ -59,15 +52,12 @@ namespace chaotic_good { void ChaoticGoodClientTransport::Orphan() { AbortWithError(); - ActivityPtr writer; - ActivityPtr reader; + RefCountedPtr party; { MutexLock lock(&mu_); - writer = std::move(writer_); - reader = std::move(reader_); + party = std::move(party_); } - writer.reset(); - reader.reset(); + party.reset(); Unref(); } @@ -211,25 +201,18 @@ ChaoticGoodClientTransport::ChaoticGoodClientTransport( auto transport = MakeRefCounted( std::move(control_endpoint), std::move(data_endpoint), std::move(hpack_parser), std::move(hpack_encoder)); - writer_ = MakeActivity( - // Continuously write next outgoing frames to promise endpoints. - TransportWriteLoop(transport), EventEngineWakeupScheduler(event_engine), - OnTransportActivityDone("write_loop")); - reader_ = MakeActivity( - // Continuously read next incoming frames from promise endpoints. - TransportReadLoop(std::move(transport)), - EventEngineWakeupScheduler(event_engine), - OnTransportActivityDone("read_loop")); + auto party_arena = SimpleArenaAllocator(0)->MakeArena(); + party_arena->SetContext( + event_engine.get()); + party_ = Party::Make(std::move(party_arena)); + party_->Spawn("client-chaotic-writer", TransportWriteLoop(transport), + OnTransportActivityDone("write_loop")); + party_->Spawn("client-chaotic-reader", + TransportReadLoop(std::move(transport)), + OnTransportActivityDone("read_loop")); } -ChaoticGoodClientTransport::~ChaoticGoodClientTransport() { - if (writer_ != nullptr) { - writer_.reset(); - } - if (reader_ != nullptr) { - reader_.reset(); - } -} +ChaoticGoodClientTransport::~ChaoticGoodClientTransport() { party_.reset(); } void ChaoticGoodClientTransport::AbortWithError() { // Mark transport as unavailable when the endpoint write/read failed. diff --git a/src/core/ext/transport/chaotic_good/client_transport.h b/src/core/ext/transport/chaotic_good/client_transport.h index 8f2941947d3..f223c551002 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.h +++ b/src/core/ext/transport/chaotic_good/client_transport.h @@ -88,9 +88,6 @@ class ChaoticGoodClientTransport final : public ClientTransport { void AbortWithError(); private: - // Queue size of each stream pipe is set to 2, so that for each stream read it - // will queue at most 2 frames. - static const size_t kServerFrameQueueSize = 2; using StreamMap = absl::flat_hash_map; uint32_t MakeStream(CallHandler call_handler); @@ -112,8 +109,7 @@ class ChaoticGoodClientTransport final : public ClientTransport { uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1; // Map of stream incoming server frames, key is stream_id. StreamMap stream_map_ ABSL_GUARDED_BY(mu_); - ActivityPtr writer_; - ActivityPtr reader_; + RefCountedPtr party_; ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(mu_){ "chaotic_good_client", GRPC_CHANNEL_READY}; }; diff --git a/src/core/ext/transport/chaotic_good/server_transport.cc b/src/core/ext/transport/chaotic_good/server_transport.cc index 4110af2fbcc..6588227a264 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.cc +++ b/src/core/ext/transport/chaotic_good/server_transport.cc @@ -356,12 +356,14 @@ ChaoticGoodServerTransport::ChaoticGoodServerTransport( auto transport = MakeRefCounted( std::move(control_endpoint), std::move(data_endpoint), std::move(hpack_parser), std::move(hpack_encoder)); - writer_ = MakeActivity(TransportWriteLoop(transport), - EventEngineWakeupScheduler(event_engine), - OnTransportActivityDone("writer")); - reader_ = MakeActivity(TransportReadLoop(std::move(transport)), - EventEngineWakeupScheduler(event_engine), - OnTransportActivityDone("reader")); + auto party_arena = SimpleArenaAllocator(0)->MakeArena(); + party_arena->SetContext( + event_engine.get()); + party_ = Party::Make(std::move(party_arena)); + party_->Spawn("server-chaotic-writer", TransportWriteLoop(transport), + OnTransportActivityDone("writer")); + party_->Spawn("server-chaotic-reader", TransportReadLoop(transport), + OnTransportActivityDone("reader")); } void ChaoticGoodServerTransport::SetCallDestination( @@ -373,15 +375,13 @@ void ChaoticGoodServerTransport::SetCallDestination( } void ChaoticGoodServerTransport::Orphan() { - ActivityPtr writer; - ActivityPtr reader; + AbortWithError(); + RefCountedPtr party; { MutexLock lock(&mu_); - writer = std::move(writer_); - reader = std::move(reader_); + party = std::move(party_); } - writer.reset(); - reader.reset(); + party.reset(); Unref(); } @@ -461,7 +461,7 @@ absl::Status ChaoticGoodServerTransport::NewStream( } void ChaoticGoodServerTransport::PerformOp(grpc_transport_op* op) { - std::vector cancelled; + RefCountedPtr cancelled_party; MutexLock lock(&mu_); bool did_stuff = false; if (op->start_connectivity_watch != nullptr) { @@ -482,8 +482,11 @@ void ChaoticGoodServerTransport::PerformOp(grpc_transport_op* op) { did_stuff = true; } if (!op->goaway_error.ok() || !op->disconnect_with_error.ok()) { - cancelled.push_back(std::move(writer_)); - cancelled.push_back(std::move(reader_)); + cancelled_party = std::move(party_); + outgoing_frames_.MarkClosed(); + state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN, + absl::UnavailableError("transport closed"), + "transport closed"); did_stuff = true; } if (!did_stuff) { diff --git a/src/core/ext/transport/chaotic_good/server_transport.h b/src/core/ext/transport/chaotic_good/server_transport.h index 23909768b68..d8244ab32fe 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.h +++ b/src/core/ext/transport/chaotic_good/server_transport.h @@ -146,8 +146,7 @@ class ChaoticGoodServerTransport final : public ServerTransport { // Map of stream incoming server frames, key is stream_id. StreamMap stream_map_ ABSL_GUARDED_BY(mu_); uint32_t last_seen_new_stream_id_ = 0; - ActivityPtr writer_ ABSL_GUARDED_BY(mu_); - ActivityPtr reader_ ABSL_GUARDED_BY(mu_); + RefCountedPtr party_; ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(mu_){ "chaotic_good_server", GRPC_CHANNEL_READY}; }; diff --git a/src/core/lib/promise/party.cc b/src/core/lib/promise/party.cc index cfc34ebedee..79c310c9806 100644 --- a/src/core/lib/promise/party.cc +++ b/src/core/lib/promise/party.cc @@ -149,15 +149,24 @@ Party::Participant::~Participant() { Party::~Party() {} void Party::CancelRemainingParticipants() { - if ((state_.load(std::memory_order_relaxed) & kAllocatedMask) == 0) return; + uint64_t prev_state = state_.load(std::memory_order_relaxed); + if ((prev_state & kAllocatedMask) == 0) return; ScopedActivity activity(this); promise_detail::Context arena_ctx(arena_.get()); - for (size_t i = 0; i < party_detail::kMaxParticipants; i++) { - if (auto* p = - participants_[i].exchange(nullptr, std::memory_order_acquire)) { - p->Destroy(); + uint64_t clear_state = 0; + do { + for (size_t i = 0; i < party_detail::kMaxParticipants; i++) { + if (auto* p = + participants_[i].exchange(nullptr, std::memory_order_acquire)) { + clear_state |= 1ull << i << kAllocatedShift; + p->Destroy(); + } } - } + if (clear_state == 0) return; + } while (!state_.compare_exchange_weak(prev_state, prev_state & ~clear_state, + std::memory_order_acq_rel)); + LogStateChange("CancelRemainingParticipants", prev_state, + prev_state & ~clear_state); } std::string Party::ActivityDebugTag(WakeupMask wakeup_mask) const { diff --git a/src/core/lib/promise/party.h b/src/core/lib/promise/party.h index a38467a924b..d2051aa6083 100644 --- a/src/core/lib/promise/party.h +++ b/src/core/lib/promise/party.h @@ -430,12 +430,14 @@ void Party::BulkSpawner::Spawn(absl::string_view name, Factory promise_factory, template void Party::Spawn(absl::string_view name, Factory promise_factory, OnComplete on_complete) { + GRPC_TRACE_LOG(party_state, INFO) << "PARTY[" << this << "]: spawn " << name; AddParticipant(new ParticipantImpl( name, std::move(promise_factory), std::move(on_complete))); } template auto Party::SpawnWaitable(absl::string_view name, Factory promise_factory) { + GRPC_TRACE_LOG(party_state, INFO) << "PARTY[" << this << "]: spawn " << name; auto participant = MakeRefCounted>( name, std::move(promise_factory)); Participant* p = participant->Ref().release(); diff --git a/test/core/transport/chaotic_good/client_transport_test.cc b/test/core/transport/chaotic_good/client_transport_test.cc index d4f7037b733..aad18ee1601 100644 --- a/test/core/transport/chaotic_good/client_transport_test.cc +++ b/test/core/transport/chaotic_good/client_transport_test.cc @@ -102,8 +102,8 @@ ChannelArgs MakeChannelArgs() { } TEST_F(TransportTest, AddOneStream) { - MockPromiseEndpoint control_endpoint; - MockPromiseEndpoint data_endpoint; + MockPromiseEndpoint control_endpoint(1000); + MockPromiseEndpoint data_endpoint(1001); control_endpoint.ExpectRead( {SerializedFrameHeader(FrameType::kFragment, 7, 1, 26, 8, 56, 15), EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep, @@ -120,7 +120,6 @@ TEST_F(TransportTest, AddOneStream) { std::move(data_endpoint.promise_endpoint), MakeChannelArgs(), event_engine(), HPackParser(), HPackCompressor()); auto call = MakeCall(TestInitialMetadata()); - transport->StartCall(call.handler.StartCall()); StrictMock> on_done; EXPECT_CALL(on_done, Call()); control_endpoint.ExpectWrite( @@ -136,6 +135,7 @@ TEST_F(TransportTest, AddOneStream) { {EventEngineSlice::FromCopiedString("0"), Zeros(63)}, nullptr); control_endpoint.ExpectWrite( {SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr); + transport->StartCall(call.handler.StartCall()); call.initiator.SpawnGuarded("test-send", [initiator = call.initiator]() mutable { return SendClientToServerMessages(initiator, 1); @@ -183,8 +183,8 @@ TEST_F(TransportTest, AddOneStream) { } TEST_F(TransportTest, AddOneStreamMultipleMessages) { - MockPromiseEndpoint control_endpoint; - MockPromiseEndpoint data_endpoint; + MockPromiseEndpoint control_endpoint(1000); + MockPromiseEndpoint data_endpoint(1001); control_endpoint.ExpectRead( {SerializedFrameHeader(FrameType::kFragment, 3, 1, 26, 8, 56, 0), EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep, @@ -206,7 +206,6 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) { std::move(data_endpoint.promise_endpoint), MakeChannelArgs(), event_engine(), HPackParser(), HPackCompressor()); auto call = MakeCall(TestInitialMetadata()); - transport->StartCall(call.handler.StartCall()); StrictMock> on_done; EXPECT_CALL(on_done, Call()); control_endpoint.ExpectWrite( @@ -227,6 +226,7 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) { {EventEngineSlice::FromCopiedString("1"), Zeros(63)}, nullptr); control_endpoint.ExpectWrite( {SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr); + transport->StartCall(call.handler.StartCall()); call.initiator.SpawnGuarded("test-send", [initiator = call.initiator]() mutable { return SendClientToServerMessages(initiator, 2); diff --git a/test/core/transport/chaotic_good/mock_promise_endpoint.h b/test/core/transport/chaotic_good/mock_promise_endpoint.h index d90133feb9d..bf073e4644f 100644 --- a/test/core/transport/chaotic_good/mock_promise_endpoint.h +++ b/test/core/transport/chaotic_good/mock_promise_endpoint.h @@ -20,6 +20,7 @@ #include +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/transport/promise_endpoint.h" namespace grpc_core { @@ -54,6 +55,20 @@ class MockEndpoint }; struct MockPromiseEndpoint { + explicit MockPromiseEndpoint(int port) { + if (GRPC_TRACE_FLAG_ENABLED(chaotic_good)) { + EXPECT_CALL(*endpoint, GetPeerAddress) + .WillRepeatedly( + [peer_address = + std::make_shared( + grpc_event_engine::experimental::URIToResolvedAddress( + absl::StrCat("ipv4:127.0.0.1:", port)) + .value())]() + -> const grpc_event_engine::experimental::EventEngine:: + ResolvedAddress& { return *peer_address; }); + } + } ::testing::StrictMock* endpoint = new ::testing::StrictMock(); PromiseEndpoint promise_endpoint = PromiseEndpoint( diff --git a/test/core/transport/chaotic_good/server_transport_test.cc b/test/core/transport/chaotic_good/server_transport_test.cc index af5b223c2fb..f3ab7f3a350 100644 --- a/test/core/transport/chaotic_good/server_transport_test.cc +++ b/test/core/transport/chaotic_good/server_transport_test.cc @@ -91,8 +91,8 @@ class MockCallDestination : public UnstartedCallDestination { }; TEST_F(TransportTest, ReadAndWriteOneMessage) { - MockPromiseEndpoint control_endpoint; - MockPromiseEndpoint data_endpoint; + MockPromiseEndpoint control_endpoint(1); + MockPromiseEndpoint data_endpoint(2); auto call_destination = MakeRefCounted>(); EXPECT_CALL(*call_destination, Orphaned()).Times(1); auto transport = MakeOrphanable(