[chaotic-good] Use a party for internal activities (#37078)

Allows use of the party <-> party wakeup batching stuff, which reduces threadhops drastically for this transport.

Closes #37078

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37078 from ctiller:chaotic-party-3 75c32e6a64
PiperOrigin-RevId: 679685211
pull/37584/head
Craig Tiller 2 months ago committed by Copybara-Service
parent 40ff7aa617
commit 6dbc0bdb10
  1. 7
      src/core/BUILD
  2. 39
      src/core/ext/transport/chaotic_good/client_transport.cc
  3. 6
      src/core/ext/transport/chaotic_good/client_transport.h
  4. 29
      src/core/ext/transport/chaotic_good/server_transport.cc
  5. 3
      src/core/ext/transport/chaotic_good/server_transport.h
  6. 11
      src/core/lib/promise/party.cc
  7. 2
      src/core/lib/promise/party.h
  8. 12
      test/core/transport/chaotic_good/client_transport_test.cc
  9. 15
      test/core/transport/chaotic_good/mock_promise_endpoint.h
  10. 4
      test/core/transport/chaotic_good/server_transport_test.cc

@ -7946,7 +7946,7 @@ grpc_cc_library(
"absl/base:core_headers",
"absl/container:flat_hash_map",
"absl/log:check",
"absl/log:log",
"absl/log",
"absl/random",
"absl/random:bit_gen_ref",
"absl/status",
@ -7957,20 +7957,18 @@ grpc_cc_library(
language = "c++",
deps = [
"activity",
"all_ok",
"arena",
"chaotic_good_frame",
"chaotic_good_frame_header",
"chaotic_good_transport",
"context",
"event_engine_wakeup_scheduler",
"event_engine_context",
"for_each",
"grpc_promise_endpoint",
"if",
"inter_activity_pipe",
"loop",
"map",
"match",
"memory_quota",
"metadata_batch",
"mpsc",
@ -7987,7 +7985,6 @@ grpc_cc_library(
"//:grpc_base",
"//:hpack_encoder",
"//:hpack_parser",
"//:promise",
"//:ref_counted_ptr",
],
)

@ -36,22 +36,15 @@
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/lib/event_engine/event_engine_context.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/all_ok.h"
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/try_join.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/promise_endpoint.h"
#include "src/core/util/match.h"
#include "src/core/util/ref_counted_ptr.h"
namespace grpc_core {
@ -59,15 +52,12 @@ namespace chaotic_good {
void ChaoticGoodClientTransport::Orphan() {
AbortWithError();
ActivityPtr writer;
ActivityPtr reader;
RefCountedPtr<Party> party;
{
MutexLock lock(&mu_);
writer = std::move(writer_);
reader = std::move(reader_);
party = std::move(party_);
}
writer.reset();
reader.reset();
party.reset();
Unref();
}
@ -211,25 +201,18 @@ ChaoticGoodClientTransport::ChaoticGoodClientTransport(
auto transport = MakeRefCounted<ChaoticGoodTransport>(
std::move(control_endpoint), std::move(data_endpoint),
std::move(hpack_parser), std::move(hpack_encoder));
writer_ = MakeActivity(
// Continuously write next outgoing frames to promise endpoints.
TransportWriteLoop(transport), EventEngineWakeupScheduler(event_engine),
auto party_arena = SimpleArenaAllocator(0)->MakeArena();
party_arena->SetContext<grpc_event_engine::experimental::EventEngine>(
event_engine.get());
party_ = Party::Make(std::move(party_arena));
party_->Spawn("client-chaotic-writer", TransportWriteLoop(transport),
OnTransportActivityDone("write_loop"));
reader_ = MakeActivity(
// Continuously read next incoming frames from promise endpoints.
party_->Spawn("client-chaotic-reader",
TransportReadLoop(std::move(transport)),
EventEngineWakeupScheduler(event_engine),
OnTransportActivityDone("read_loop"));
}
ChaoticGoodClientTransport::~ChaoticGoodClientTransport() {
if (writer_ != nullptr) {
writer_.reset();
}
if (reader_ != nullptr) {
reader_.reset();
}
}
ChaoticGoodClientTransport::~ChaoticGoodClientTransport() { party_.reset(); }
void ChaoticGoodClientTransport::AbortWithError() {
// Mark transport as unavailable when the endpoint write/read failed.

@ -88,9 +88,6 @@ class ChaoticGoodClientTransport final : public ClientTransport {
void AbortWithError();
private:
// Queue size of each stream pipe is set to 2, so that for each stream read it
// will queue at most 2 frames.
static const size_t kServerFrameQueueSize = 2;
using StreamMap = absl::flat_hash_map<uint32_t, CallHandler>;
uint32_t MakeStream(CallHandler call_handler);
@ -112,8 +109,7 @@ class ChaoticGoodClientTransport final : public ClientTransport {
uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1;
// Map of stream incoming server frames, key is stream_id.
StreamMap stream_map_ ABSL_GUARDED_BY(mu_);
ActivityPtr writer_;
ActivityPtr reader_;
RefCountedPtr<Party> party_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(mu_){
"chaotic_good_client", GRPC_CHANNEL_READY};
};

@ -356,11 +356,13 @@ ChaoticGoodServerTransport::ChaoticGoodServerTransport(
auto transport = MakeRefCounted<ChaoticGoodTransport>(
std::move(control_endpoint), std::move(data_endpoint),
std::move(hpack_parser), std::move(hpack_encoder));
writer_ = MakeActivity(TransportWriteLoop(transport),
EventEngineWakeupScheduler(event_engine),
auto party_arena = SimpleArenaAllocator(0)->MakeArena();
party_arena->SetContext<grpc_event_engine::experimental::EventEngine>(
event_engine.get());
party_ = Party::Make(std::move(party_arena));
party_->Spawn("server-chaotic-writer", TransportWriteLoop(transport),
OnTransportActivityDone("writer"));
reader_ = MakeActivity(TransportReadLoop(std::move(transport)),
EventEngineWakeupScheduler(event_engine),
party_->Spawn("server-chaotic-reader", TransportReadLoop(transport),
OnTransportActivityDone("reader"));
}
@ -373,15 +375,13 @@ void ChaoticGoodServerTransport::SetCallDestination(
}
void ChaoticGoodServerTransport::Orphan() {
ActivityPtr writer;
ActivityPtr reader;
AbortWithError();
RefCountedPtr<Party> party;
{
MutexLock lock(&mu_);
writer = std::move(writer_);
reader = std::move(reader_);
party = std::move(party_);
}
writer.reset();
reader.reset();
party.reset();
Unref();
}
@ -461,7 +461,7 @@ absl::Status ChaoticGoodServerTransport::NewStream(
}
void ChaoticGoodServerTransport::PerformOp(grpc_transport_op* op) {
std::vector<ActivityPtr> cancelled;
RefCountedPtr<Party> cancelled_party;
MutexLock lock(&mu_);
bool did_stuff = false;
if (op->start_connectivity_watch != nullptr) {
@ -482,8 +482,11 @@ void ChaoticGoodServerTransport::PerformOp(grpc_transport_op* op) {
did_stuff = true;
}
if (!op->goaway_error.ok() || !op->disconnect_with_error.ok()) {
cancelled.push_back(std::move(writer_));
cancelled.push_back(std::move(reader_));
cancelled_party = std::move(party_);
outgoing_frames_.MarkClosed();
state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN,
absl::UnavailableError("transport closed"),
"transport closed");
did_stuff = true;
}
if (!did_stuff) {

@ -146,8 +146,7 @@ class ChaoticGoodServerTransport final : public ServerTransport {
// Map of stream incoming server frames, key is stream_id.
StreamMap stream_map_ ABSL_GUARDED_BY(mu_);
uint32_t last_seen_new_stream_id_ = 0;
ActivityPtr writer_ ABSL_GUARDED_BY(mu_);
ActivityPtr reader_ ABSL_GUARDED_BY(mu_);
RefCountedPtr<Party> party_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(mu_){
"chaotic_good_server", GRPC_CHANNEL_READY};
};

@ -149,15 +149,24 @@ Party::Participant::~Participant() {
Party::~Party() {}
void Party::CancelRemainingParticipants() {
if ((state_.load(std::memory_order_relaxed) & kAllocatedMask) == 0) return;
uint64_t prev_state = state_.load(std::memory_order_relaxed);
if ((prev_state & kAllocatedMask) == 0) return;
ScopedActivity activity(this);
promise_detail::Context<Arena> arena_ctx(arena_.get());
uint64_t clear_state = 0;
do {
for (size_t i = 0; i < party_detail::kMaxParticipants; i++) {
if (auto* p =
participants_[i].exchange(nullptr, std::memory_order_acquire)) {
clear_state |= 1ull << i << kAllocatedShift;
p->Destroy();
}
}
if (clear_state == 0) return;
} while (!state_.compare_exchange_weak(prev_state, prev_state & ~clear_state,
std::memory_order_acq_rel));
LogStateChange("CancelRemainingParticipants", prev_state,
prev_state & ~clear_state);
}
std::string Party::ActivityDebugTag(WakeupMask wakeup_mask) const {

@ -430,12 +430,14 @@ void Party::BulkSpawner::Spawn(absl::string_view name, Factory promise_factory,
template <typename Factory, typename OnComplete>
void Party::Spawn(absl::string_view name, Factory promise_factory,
OnComplete on_complete) {
GRPC_TRACE_LOG(party_state, INFO) << "PARTY[" << this << "]: spawn " << name;
AddParticipant(new ParticipantImpl<Factory, OnComplete>(
name, std::move(promise_factory), std::move(on_complete)));
}
template <typename Factory>
auto Party::SpawnWaitable(absl::string_view name, Factory promise_factory) {
GRPC_TRACE_LOG(party_state, INFO) << "PARTY[" << this << "]: spawn " << name;
auto participant = MakeRefCounted<PromiseParticipantImpl<Factory>>(
name, std::move(promise_factory));
Participant* p = participant->Ref().release();

@ -102,8 +102,8 @@ ChannelArgs MakeChannelArgs() {
}
TEST_F(TransportTest, AddOneStream) {
MockPromiseEndpoint control_endpoint;
MockPromiseEndpoint data_endpoint;
MockPromiseEndpoint control_endpoint(1000);
MockPromiseEndpoint data_endpoint(1001);
control_endpoint.ExpectRead(
{SerializedFrameHeader(FrameType::kFragment, 7, 1, 26, 8, 56, 15),
EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep,
@ -120,7 +120,6 @@ TEST_F(TransportTest, AddOneStream) {
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call = MakeCall(TestInitialMetadata());
transport->StartCall(call.handler.StartCall());
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
control_endpoint.ExpectWrite(
@ -136,6 +135,7 @@ TEST_F(TransportTest, AddOneStream) {
{EventEngineSlice::FromCopiedString("0"), Zeros(63)}, nullptr);
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr);
transport->StartCall(call.handler.StartCall());
call.initiator.SpawnGuarded("test-send",
[initiator = call.initiator]() mutable {
return SendClientToServerMessages(initiator, 1);
@ -183,8 +183,8 @@ TEST_F(TransportTest, AddOneStream) {
}
TEST_F(TransportTest, AddOneStreamMultipleMessages) {
MockPromiseEndpoint control_endpoint;
MockPromiseEndpoint data_endpoint;
MockPromiseEndpoint control_endpoint(1000);
MockPromiseEndpoint data_endpoint(1001);
control_endpoint.ExpectRead(
{SerializedFrameHeader(FrameType::kFragment, 3, 1, 26, 8, 56, 0),
EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep,
@ -206,7 +206,6 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) {
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call = MakeCall(TestInitialMetadata());
transport->StartCall(call.handler.StartCall());
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
control_endpoint.ExpectWrite(
@ -227,6 +226,7 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) {
{EventEngineSlice::FromCopiedString("1"), Zeros(63)}, nullptr);
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr);
transport->StartCall(call.handler.StartCall());
call.initiator.SpawnGuarded("test-send",
[initiator = call.initiator]() mutable {
return SendClientToServerMessages(initiator, 2);

@ -20,6 +20,7 @@
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/transport/promise_endpoint.h"
namespace grpc_core {
@ -54,6 +55,20 @@ class MockEndpoint
};
struct MockPromiseEndpoint {
explicit MockPromiseEndpoint(int port) {
if (GRPC_TRACE_FLAG_ENABLED(chaotic_good)) {
EXPECT_CALL(*endpoint, GetPeerAddress)
.WillRepeatedly(
[peer_address =
std::make_shared<grpc_event_engine::experimental::
EventEngine::ResolvedAddress>(
grpc_event_engine::experimental::URIToResolvedAddress(
absl::StrCat("ipv4:127.0.0.1:", port))
.value())]()
-> const grpc_event_engine::experimental::EventEngine::
ResolvedAddress& { return *peer_address; });
}
}
::testing::StrictMock<MockEndpoint>* endpoint =
new ::testing::StrictMock<MockEndpoint>();
PromiseEndpoint promise_endpoint = PromiseEndpoint(

@ -91,8 +91,8 @@ class MockCallDestination : public UnstartedCallDestination {
};
TEST_F(TransportTest, ReadAndWriteOneMessage) {
MockPromiseEndpoint control_endpoint;
MockPromiseEndpoint data_endpoint;
MockPromiseEndpoint control_endpoint(1);
MockPromiseEndpoint data_endpoint(2);
auto call_destination = MakeRefCounted<StrictMock<MockCallDestination>>();
EXPECT_CALL(*call_destination, Orphaned()).Times(1);
auto transport = MakeOrphanable<ChaoticGoodServerTransport>(

Loading…
Cancel
Save