From 51aa3d49517004c327a4e38f7f29a2d3409f9fbb Mon Sep 17 00:00:00 2001 From: Nana Pang Date: Fri, 10 Nov 2023 05:16:32 +0000 Subject: [PATCH] Rewrite the server transport with gRPC Call 3.0 design. --- build_autogenerated.yaml | 1 - src/core/BUILD | 4 +- .../chaotic_good/server_transport.cc | 43 +--- .../transport/chaotic_good/server_transport.h | 169 ++++++++----- src/core/lib/promise/inter_activity_pipe.h | 6 + test/core/transport/chaotic_good/BUILD | 1 - .../chaotic_good/server_transport_test.cc | 230 ++++++++---------- 7 files changed, 220 insertions(+), 234 deletions(-) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index c4aaa0c3728..a799f3aa835 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -14674,7 +14674,6 @@ targets: - src/core/ext/transport/chaotic_good/server_transport.h - src/core/lib/promise/detail/join_state.h - src/core/lib/promise/event_engine_wakeup_scheduler.h - - src/core/lib/promise/join.h - src/core/lib/promise/mpsc.h - src/core/lib/promise/try_join.h - src/core/lib/promise/wait_set.h diff --git a/src/core/BUILD b/src/core/BUILD index 12ef854f5d4..8d1077ab546 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -6275,7 +6275,6 @@ grpc_cc_library( "ext/transport/chaotic_good/server_transport.h", ], external_deps = [ - "absl/base:core_headers", "absl/functional:any_invocable", "absl/random", "absl/random:bit_gen_ref", @@ -6287,18 +6286,17 @@ grpc_cc_library( deps = [ "activity", "arena", - "arena_promise", "chaotic_good_frame", "chaotic_good_frame_header", "context", "event_engine_wakeup_scheduler", "grpc_promise_endpoint", - "join", "loop", "memory_quota", "mpsc", "pipe", "resource_quota", + "seq", "slice", "slice_buffer", "try_join", diff --git a/src/core/ext/transport/chaotic_good/server_transport.cc b/src/core/ext/transport/chaotic_good/server_transport.cc index 7727d9b60af..a844f7eaef2 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.cc +++ b/src/core/ext/transport/chaotic_good/server_transport.cc @@ -37,7 +37,6 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/event_engine_wakeup_scheduler.h" -#include "src/core/lib/promise/join.h" #include "src/core/lib/promise/loop.h" #include "src/core/lib/promise/try_join.h" #include "src/core/lib/promise/try_seq.h" @@ -52,13 +51,12 @@ namespace grpc_core { namespace chaotic_good { ServerTransport::ServerTransport( - absl::AnyInvocable(CallArgs)> - start_receive_callback, std::unique_ptr control_endpoint, std::unique_ptr data_endpoint, - std::shared_ptr event_engine) - : outgoing_frames_(MpscReceiver(4)), - start_receive_callback_(std::move(start_receive_callback)), + std::shared_ptr event_engine, + AcceptFn accept_fn) + : accept_fn_(std::move(accept_fn)), + outgoing_frames_(MpscReceiver(4)), control_endpoint_(std::move(control_endpoint)), data_endpoint_(std::move(data_endpoint)), control_endpoint_write_buffer_(SliceBuffer()), @@ -121,9 +119,7 @@ ServerTransport::ServerTransport( // Continuously write next outgoing frames to promise endpoints. std::move(write_loop), EventEngineWakeupScheduler(event_engine_), [this](absl::Status status) { - GPR_ASSERT(status.code() == absl::StatusCode::kCancelled || - status.code() == absl::StatusCode::kInternal); - if (status.code() == absl::StatusCode::kInternal) { + if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) { this->AbortWithError(); } }, @@ -167,26 +163,13 @@ ServerTransport::ServerTransport( GPR_ASSERT(status.ok()); auto message = arena_->MakePooled( std::move(data_endpoint_read_buffer_), 0); - // Construct call args for stream. - auto call_data = ConstructCallData(frame.frame_header.stream_id); - auto call_args = - CallArgs{std::move(frame.headers), - ClientInitialMetadataOutstandingToken::Empty(), - nullptr, - &call_data->pipe_server_intial_metadata_.sender, - &call_data->pipe_client_to_server_messages_.receiver, - &call_data->pipe_server_to_client_messages_.sender}; - return Join( - // Push message into pipe_client_to_server_messages_. - call_data->pipe_client_to_server_messages_.sender.Push( - std::move(message)), - // Execute start_receive_callback. - start_receive_callback_(std::move(call_args))); + // Initialize call. + auto call_initiator = accept_fn_(*frame.headers); + AddCall(call_initiator); + return call_initiator.PushClientToServerMessage(std::move(message)); }, - [](std::tuple ret) - -> LoopCtl { - // TODO(ladynana): figure out what to do with ServerMetadataHandle. - if (std::get<0>(ret)) { + [](bool ret) -> LoopCtl { + if (ret) { return Continue(); } else { return absl::InternalError("Send message to pipe failed."); @@ -197,9 +180,7 @@ ServerTransport::ServerTransport( // Continuously read next incoming frames from promise endpoints. std::move(read_loop), EventEngineWakeupScheduler(event_engine_), [this](absl::Status status) { - GPR_ASSERT(status.code() == absl::StatusCode::kCancelled || - status.code() == absl::StatusCode::kInternal); - if (status.code() == absl::StatusCode::kInternal) { + if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) { this->AbortWithError(); } }, diff --git a/src/core/ext/transport/chaotic_good/server_transport.h b/src/core/ext/transport/chaotic_good/server_transport.h index 2c995d0f0c4..8e6ecc0ee79 100644 --- a/src/core/ext/transport/chaotic_good/server_transport.h +++ b/src/core/ext/transport/chaotic_good/server_transport.h @@ -21,12 +21,13 @@ #include #include // IWYU pragma: keep -#include #include #include +#include -#include "absl/base/thread_annotations.h" #include "absl/functional/any_invocable.h" +#include "absl/status/status.h" +#include "absl/types/variant.h" #include #include @@ -35,30 +36,79 @@ #include "src/core/ext/transport/chaotic_good/frame_header.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/promise/activity.h" -#include "src/core/lib/promise/arena_promise.h" #include "src/core/lib/promise/context.h" +#include "src/core/lib/promise/loop.h" #include "src/core/lib/promise/mpsc.h" #include "src/core/lib/promise/pipe.h" +#include "src/core/lib/promise/seq.h" +#include "src/core/lib/promise/try_seq.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/resource_quota/memory_quota.h" #include "src/core/lib/slice/slice_buffer.h" +#include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/promise_endpoint.h" #include "src/core/lib/transport/transport.h" namespace grpc_core { namespace chaotic_good { +// Prototype based on gRPC Call 3.0. +// TODO(ladynana): convert to the true Call/CallInitiator once available. +struct CallData { + uint32_t stream_id; + Pipe pipe_client_to_server_messages_; + Pipe pipe_server_to_client_messages_; + Pipe pipe_server_intial_metadata_; +}; +class CallInitiator { + public: + explicit CallInitiator(std::unique_ptr call_data) + : call_(std::move(call_data)) {} + // Returns a promise that push/pull message/metadata from corresponding pipe. + auto PushServerInitialMetadata(ServerMetadataHandle metadata) { + return call_->pipe_server_intial_metadata_.sender.Push(std::move(metadata)); + } + auto PushServerToClientMessage(MessageHandle message) { + return call_->pipe_server_to_client_messages_.sender.Push( + std::move(message)); + } + auto PushClientToServerMessage(MessageHandle message) { + return call_->pipe_client_to_server_messages_.sender.Push( + std::move(message)); + } + auto PullServerInitialMetadata() { + return call_->pipe_server_intial_metadata_.receiver.Next(); + } + auto PullServerToClientMessage() { + return call_->pipe_server_to_client_messages_.receiver.Next(); + } + auto PullClientToServerMessage() { + return call_->pipe_client_to_server_messages_.receiver.Next(); + } + uint32_t GetStreamId() { return call_->stream_id; } + void SetCall(std::unique_ptr call_data) { + call_ = std::move(call_data); + } + template + void Spawn(Promise p) { + // TODO(ladynana): call/implement Spawn method such as party->Spawn. + // party_->Spawn("Run promise", std::move(p), [](){return + // absl::OkStatus();}); + } + + private: + std::unique_ptr call_; +}; + class ServerTransport { public: - ServerTransport( - absl::AnyInvocable(CallArgs)> - start_receive_callback, - std::unique_ptr control_endpoint, - std::unique_ptr data_endpoint, - std::shared_ptr - event_engine); + using AcceptFn = absl::AnyInvocable; + ServerTransport(std::unique_ptr control_endpoint, + std::unique_ptr data_endpoint, + std::shared_ptr + event_engine, + AcceptFn accept_fn); ~ServerTransport() { if (writer_ != nullptr) { writer_.reset(); @@ -73,66 +123,59 @@ class ServerTransport { if (!outgoing_frames_.IsClosed()) { outgoing_frames_.MarkClosed(); } - std::map> stream_map; - { - MutexLock lock(&mu_); - stream_map = stream_map_; - } - for (const auto& pair : stream_map) { - auto call_data = pair.second; - call_data->pipe_client_to_server_messages_.receiver.CloseWithError(); - call_data->pipe_server_intial_metadata_.sender.CloseWithError(); - call_data->pipe_server_to_client_messages_.sender.CloseWithError(); - } + // MutexLock lock(&mu_); + // for (const auto& pair : stream_map_) { + // if (!pair.second->IsClose()) { + // pair.second->MarkClose(); + // } + // } } - // Prototype of what start_receive_callback will do. - // ArenaPromise start_receive_callback (CallArgs - // callargs){ - // return TrySeq( - // ProcessClientInitialMetadata(callargs.client_initial_metadata), - // ForEach(callargs.client_to_server_messages, - // [](MessageHandle message){ - // ProcessClientMessage(); - // }), - // // Send server initial metadata to client. - // callargs.server_initial_metadata->Push(md), - // // Send server message to client. - // callargs.server_to_client_messages->Push(md) - // ); - // } - private: - struct CallData { - Pipe pipe_client_to_server_messages_; - Pipe pipe_server_to_client_messages_; - Pipe pipe_server_intial_metadata_; - }; - // Construct call data of each stream - CallData* ConstructCallData(uint32_t stream_id) { - MutexLock lock(&mu_); - auto iter = stream_map_.find(stream_id); - if (iter != stream_map_.end()) { - return stream_map_[stream_id].get(); - } else { - CallData call_data{Pipe(arena_.get()), - Pipe(arena_.get()), - Pipe(arena_.get())}; - stream_map_[stream_id] = std::make_shared(std::move(call_data)); - return stream_map_[stream_id].get(); - } + void AddCall(CallInitiator& r) { + // Add server write promise. + auto server_write = Loop([&r, this] { + return TrySeq( + // TODO(ladynana): add initial metadata in server frame. + r.PullServerToClientMessage(), + [this, stream_id = r.GetStreamId()]( + NextResult result) mutable { + auto outgoing_frames = outgoing_frames_.MakeSender(); + ServerFragmentFrame frame; + uint32_t message_length = result.value()->payload()->Length(); + uint32_t message_padding = message_length % aligned_bytes; + frame.frame_header = FrameHeader{ + FrameType::kFragment, {}, stream_id, 0, message_length, + message_padding, 0}; + frame.message = std::move(*result); + return outgoing_frames.Send(ServerFrame(std::move(frame))); + }, + [](bool success) -> LoopCtl { + if (!success) { + // TODO(ladynana): propagate the actual error message + // from EventEngine. + return absl::UnavailableError( + "Transport closed due to endpoint write/read " + "failed."); + } + return Continue(); + }); + }); + r.Spawn(std::move(server_write)); } + AcceptFn accept_fn_; // Max buffer is set to 4, so that for stream writes each time it will queue // at most 2 frames. MpscReceiver outgoing_frames_; static const size_t client_frame_queue_size_ = 2; - Mutex mu_; - uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1; - // Map of stream outgoing client frames, key is stream_id. - std::map> stream_map_ - ABSL_GUARDED_BY(mu_); - absl::AnyInvocable(CallArgs)> - start_receive_callback_; + // Assigned aligned bytes from setting frame. + size_t aligned_bytes = 64; + // Mutex mu_; + // // Map of outgoing client frames, key is stream_id. + // std::map::Receiver>> + // stream_map_ + // ABSL_GUARDED_BY(mu_); ActivityPtr writer_; ActivityPtr reader_; std::unique_ptr control_endpoint_; diff --git a/src/core/lib/promise/inter_activity_pipe.h b/src/core/lib/promise/inter_activity_pipe.h index a7594fb26a2..146bdcd524d 100644 --- a/src/core/lib/promise/inter_activity_pipe.h +++ b/src/core/lib/promise/inter_activity_pipe.h @@ -146,6 +146,12 @@ class InterActivityPipe { return [center = center_]() { return center->Next(); }; } + bool IsClose() { return center_->IsClosed(); } + + void MarkClose() { + if (center_ != nullptr) center_->MarkClosed(); + } + private: RefCountedPtr
center_; }; diff --git a/test/core/transport/chaotic_good/BUILD b/test/core/transport/chaotic_good/BUILD index d75990b2233..394da0f3bee 100644 --- a/test/core/transport/chaotic_good/BUILD +++ b/test/core/transport/chaotic_good/BUILD @@ -183,7 +183,6 @@ grpc_cc_test( "//src/core:memory_quota", "//src/core:pipe", "//src/core:resource_quota", - "//src/core:seq", "//src/core:slice", "//src/core:slice_buffer", "//test/core/event_engine/fuzzing_event_engine", diff --git a/test/core/transport/chaotic_good/server_transport_test.cc b/test/core/transport/chaotic_good/server_transport_test.cc index 4ca8a4bcd93..542f5e0bf63 100644 --- a/test/core/transport/chaotic_good/server_transport_test.cc +++ b/test/core/transport/chaotic_good/server_transport_test.cc @@ -41,11 +41,9 @@ #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/event_engine_wakeup_scheduler.h" #include "src/core/lib/promise/pipe.h" -#include "src/core/lib/promise/seq.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/resource_quota/memory_quota.h" #include "src/core/lib/resource_quota/resource_quota.h" -#include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_internal.h" // IWYU pragma: keep #include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep @@ -108,54 +106,103 @@ class ServerTransportTest : public ::testing::Test { return options; }(), fuzzing_event_engine::Actions())), - arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)) {} - // Add expectations of control/data endpoints write/read operations. - void AddExpectations(int num_of_streams, int successful_write_messages, - int successful_read_messages, bool expect_write_failed, - bool expect_read_failed) { - for (int i = 1; i <= num_of_streams; i++) { - AddReadExpectations(/*stream_id*/ i, successful_read_messages, - expect_read_failed); - } - if (expect_read_failed) { - EXPECT_CALL(control_endpoint_, Read) - .InSequence(control_endpoint_read_sequence) - .WillOnce( - WithArgs<0>([](absl::AnyInvocable on_read) { - // Mock EventEngine enpoint read fails. - on_read(absl::InternalError("control endpoint read failed.")); - return false; - })); - } else { - // reader_ is pending for next read. - EXPECT_CALL(control_endpoint_, Read) - .InSequence(control_endpoint_read_sequence); - } - } + arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)), + pipe_client_to_server_messages_(arena_.get()), + pipe_server_to_client_messages_(arena_.get()), + pipe_server_intial_metadata_(arena_.get()) {} void InitialServerTransport() { - // Read expectaions need to be added before transport initialization since - // reader_ activity loop is started in ServerTransport initialization, + auto accept_fn = [this](ClientMetadata& md) { + CallData call_data{1, Pipe(arena_.get()), + Pipe(arena_.get()), + Pipe(arena_.get())}; + CallInitiator call_initiator( + std::make_unique(std::move(call_data))); + return call_initiator; + }; server_transport_ = std::make_unique( - std::move(start_receive_callback_), std::make_unique( std::unique_ptr(control_endpoint_ptr_), SliceBuffer()), std::make_unique( std::unique_ptr(data_endpoint_ptr_), SliceBuffer()), std::static_pointer_cast( - event_engine_)); + event_engine_), + std::move(accept_fn)); } - // Create client to server test messages. - std::vector CreateMessages(int num_of_messages) { - std::vector messages; - for (int i = 0; i < num_of_messages; i++) { - SliceBuffer buffer; - buffer.Append( - Slice::FromCopiedString(absl::StrFormat("test message %d", i))); - auto message = arena_->MakePooled(std::move(buffer), 0); - messages.push_back(std::move(message)); - } - return messages; + void AddReadExpectation() { + EXPECT_CALL(control_endpoint_, Read) + .InSequence(control_endpoint_sequence) + .WillOnce(WithArgs<0, 1>( + [](absl::AnyInvocable on_read, + grpc_event_engine::experimental::SliceBuffer* buffer) mutable { + // Construct test frame for EventEngine read: headers (26 + // bytes), message(8 bytes), message padding (56 byte), + // trailers (0 bytes). + const std::string frame_header = { + static_cast(0x80), // frame type = fragment + 0x01, // flag = has header + 0x00, + 0x00, + 0x01, // stream id = 1 + 0x00, + 0x00, + 0x00, + 0x1a, // header length = 26 + 0x00, + 0x00, + 0x00, + 0x08, // message length = 8 + 0x00, + 0x00, + 0x00, + 0x38, // message padding =56 + 0x00, + 0x00, + 0x00, + 0x00, // trailer length = 0 + 0x00, + 0x00, + 0x00}; + // Schedule mock_endpoint to read buffer. + grpc_event_engine::experimental::Slice slice( + grpc_slice_from_cpp_string(frame_header)); + buffer->Append(std::move(slice)); + return true; + })); + EXPECT_CALL(control_endpoint_, Read) + .InSequence(control_endpoint_sequence) + .WillOnce(WithArgs<1>( + [](grpc_event_engine::experimental::SliceBuffer* buffer) { + // Encoded string of header ":path: /demo.Service/Step". + const std::string header = { + 0x10, 0x05, 0x3a, 0x70, 0x61, 0x74, 0x68, 0x12, 0x2f, + 0x64, 0x65, 0x6d, 0x6f, 0x2e, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x2f, 0x53, 0x74, 0x65, 0x70}; + // Schedule mock_endpoint to read buffer. + grpc_event_engine::experimental::Slice slice( + grpc_slice_from_cpp_string(header)); + buffer->Append(std::move(slice)); + return true; + })); + // EXPECT_CALL(control_endpoint_, Read) + // .InSequence(control_endpoint_sequence) + // .WillOnce(Return(false)); + EXPECT_CALL(data_endpoint_, Read) + .InSequence(data_endpoint_sequence) + .WillOnce(WithArgs<1>( + [this](grpc_event_engine::experimental::SliceBuffer* buffer) { + const std::string message_padding = { + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + grpc_event_engine::experimental::Slice slice( + grpc_slice_from_cpp_string(message_padding + message_)); + buffer->Append(std::move(slice)); + return true; + })); } private: @@ -163,91 +210,8 @@ class ServerTransportTest : public ::testing::Test { MockEndpoint* data_endpoint_ptr_; size_t initial_arena_size = 1024; MemoryAllocator memory_allocator_; - Sequence control_endpoint_write_sequence; - Sequence control_endpoint_read_sequence; - Sequence data_endpoint_write_sequence; - Sequence data_endpoint_read_sequence; - - void AddReadExpectations(int stream_id, int successful_read_messages, - bool failed_at_last) { - if (successful_read_messages > 0) { - // Transport starts read. - for (int i = 1; i <= successful_read_messages; i++) { - // Only last message will return trailer. - EXPECT_CALL(control_endpoint_, Read) - .InSequence(control_endpoint_read_sequence) - .WillOnce(WithArgs<1>( - [stream_id](grpc_event_engine::experimental::SliceBuffer* - buffer) mutable { - // Construct test frame for EventEngine read: headers (15 - // bytes), message(16 bytes), message padding (48 byte), - // trailers (15 bytes). - const std::string frame_header = { - static_cast(0x80), // frame type = fragment - static_cast(0x01), // flag = has header - 0x00, - 0x00, - static_cast(stream_id), // stream id >= 1 - 0x00, - 0x00, - 0x00, - 0x1a, // header length = 26 - 0x00, - 0x00, - 0x00, - 0x08, // message length = 8 - 0x00, - 0x00, - 0x00, - 0x38, // message padding =56 - 0x00, - 0x00, - 0x00, - static_cast(0x00), // trailer length = 0 - 0x00, - 0x00, - 0x00}; - // Schedule mock_endpoint to read buffer. - grpc_event_engine::experimental::Slice slice( - grpc_slice_from_cpp_string(frame_header)); - buffer->Append(std::move(slice)); - return true; - })); - EXPECT_CALL(control_endpoint_, Read) - .InSequence(control_endpoint_read_sequence) - .WillOnce(WithArgs<1>( - [](grpc_event_engine::experimental::SliceBuffer* buffer) { - // Encoded string of header ":path: /demo.Service/Step". - const std::string header = { - 0x10, 0x05, 0x3a, 0x70, 0x61, 0x74, 0x68, 0x12, 0x2f, - 0x64, 0x65, 0x6d, 0x6f, 0x2e, 0x53, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x2f, 0x53, 0x74, 0x65, 0x70}; - // Schedule mock_endpoint to read buffer. - grpc_event_engine::experimental::Slice slice( - grpc_slice_from_cpp_string(header)); - buffer->Append(std::move(slice)); - return true; - })); - EXPECT_CALL(data_endpoint_, Read) - .InSequence(data_endpoint_read_sequence) - .WillOnce(WithArgs<1>( - [this](grpc_event_engine::experimental::SliceBuffer* buffer) { - const std::string message_padding = { - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; - grpc_event_engine::experimental::Slice slice( - grpc_slice_from_cpp_string(message_padding + message)); - buffer->Append(std::move(slice)); - return true; - })); - } - } - } + Sequence control_endpoint_sequence; + Sequence data_endpoint_sequence; protected: MockEndpoint& control_endpoint_; @@ -256,19 +220,15 @@ class ServerTransportTest : public ::testing::Test { event_engine_; std::unique_ptr server_transport_; ScopedArenaPtr arena_; - absl::AnyInvocable(CallArgs)> - start_receive_callback_ = [](CallArgs call_args) { - return Seq(call_args.client_to_server_messages->Next(), - [] { return ServerMetadataFromStatus(absl::OkStatus()); }); - }; + Pipe pipe_client_to_server_messages_; + Pipe pipe_server_to_client_messages_; + Pipe pipe_server_intial_metadata_; // Added to verify received message payload. - const std::string message = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}; + const std::string message_ = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}; }; TEST_F(ServerTransportTest, ReadOneMessage) { - AddExpectations(/*num_of_streams*/ 1, /*successful_write_messages*/ 1, - /*successful_read_messages*/ 1, /*expect_write_failed*/ false, - /*expect_read_failed*/ false); + AddReadExpectation(); StrictMock> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); auto activity = MakeActivity(