diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index a82085a2dc6..a2addbdeaa9 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -7082,12 +7082,13 @@ targets: - src/core/ext/transport/chaotic_good/frame_header.h - src/core/lib/promise/detail/join_state.h - src/core/lib/promise/event_engine_wakeup_scheduler.h + - src/core/lib/promise/inter_activity_pipe.h - src/core/lib/promise/join.h - src/core/lib/promise/mpsc.h + - src/core/lib/promise/try_join.h - src/core/lib/promise/wait_set.h - src/core/lib/transport/promise_endpoint.h - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h - - test/core/promise/test_wakeup_schedulers.h src: - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto - src/core/ext/transport/chaotic_good/client_transport.cc diff --git a/src/core/BUILD b/src/core/BUILD index d5a6b668a65..d0064a86101 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -6050,6 +6050,7 @@ grpc_cc_library( "arena", "bitset", "chaotic_good_frame_header", + "context", "no_destruct", "slice", "slice_buffer", @@ -6212,29 +6213,42 @@ grpc_cc_library( ], external_deps = [ "absl/base:core_headers", + "absl/random", + "absl/random:bit_gen_ref", "absl/status", + "absl/status:statusor", + "absl/types:optional", "absl/types:variant", ], language = "c++", deps = [ "activity", + "arena", "chaotic_good_frame", "chaotic_good_frame_header", "event_engine_wakeup_scheduler", "for_each", "grpc_promise_endpoint", - "join", + "if", + "inter_activity_pipe", "loop", "match", + "memory_quota", "mpsc", "pipe", - "seq", + "poll", + "resource_quota", "slice", "slice_buffer", + "try_join", + "try_seq", + "//:exec_ctx", "//:gpr", "//:gpr_platform", "//:grpc_base", "//:hpack_encoder", + "//:hpack_parser", + "//:ref_counted_ptr", ], ) diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc index 6eb6e4b86ba..70492268816 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.cc +++ b/src/core/ext/transport/chaotic_good/client_transport.cc @@ -20,17 +20,26 @@ #include #include +#include "absl/random/bit_gen_ref.h" +#include "absl/random/random.h" +#include "absl/status/statusor.h" + #include +#include #include #include "src/core/ext/transport/chaotic_good/frame.h" #include "src/core/ext/transport/chaotic_good/frame_header.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" #include "src/core/lib/gprpp/match.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/event_engine_wakeup_scheduler.h" -#include "src/core/lib/promise/join.h" #include "src/core/lib/promise/loop.h" +#include "src/core/lib/promise/try_join.h" +#include "src/core/lib/resource_quota/arena.h" +#include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_internal.h" @@ -49,9 +58,14 @@ ClientTransport::ClientTransport( control_endpoint_write_buffer_(SliceBuffer()), data_endpoint_write_buffer_(SliceBuffer()), hpack_compressor_(std::make_unique()), + hpack_parser_(std::make_unique()), + memory_allocator_( + ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( + "client_transport")), + arena_(MakeScopedArena(1024, &memory_allocator_)), event_engine_(event_engine) { auto write_loop = Loop([this] { - return Seq( + return TrySeq( // Get next outgoing frame. this->outgoing_frames_.Next(), // Construct data buffers that will be sent to the endpoints. @@ -81,20 +95,16 @@ ClientTransport::ClientTransport( }, // Write buffers to corresponding endpoints concurrently. [this]() { - return Join(this->control_endpoint_->Write( - std::move(control_endpoint_write_buffer_)), - this->data_endpoint_->Write( - std::move(data_endpoint_write_buffer_))); + return TryJoin( + control_endpoint_->Write( + std::move(control_endpoint_write_buffer_)), + data_endpoint_->Write(std::move(data_endpoint_write_buffer_))); }, - // Finish writes and return status. - [](std::tuple ret) - -> LoopCtl { - // If writes failed, return failure status. - if (!(std::get<0>(ret).ok() || std::get<1>(ret).ok())) { - // TODO(ladynana): handle the promise endpoint write failures with - // closing the transport. - return absl::InternalError("Promise endpoint writes failed."); - } + // Finish writes to difference endpoints and continue the loop. + []() -> LoopCtl { + // The write failures will be caught in TrySeq and exit loop. + // Therefore, only need to return Continue() in the last lambda + // function. return Continue(); }); }); @@ -104,7 +114,76 @@ ClientTransport::ClientTransport( [](absl::Status status) { GPR_ASSERT(status.code() == absl::StatusCode::kCancelled || status.code() == absl::StatusCode::kInternal); - }); + // TODO(ladynana): handle the promise endpoint write failures with + // outgoing_frames.close() once available. + }, + // Hold Arena in activity for GetContext usage. + arena_.get()); + auto read_loop = Loop([this] { + return TrySeq( + // Read frame header from control endpoint. + // TODO(ladynana): remove memcpy in ReadSlice. + this->control_endpoint_->ReadSlice(FrameHeader::frame_header_size_), + // Read different parts of the server frame from control/data endpoints + // based on frame header. + [this](Slice read_buffer) mutable { + frame_header_ = std::make_shared( + FrameHeader::Parse( + reinterpret_cast( + GRPC_SLICE_START_PTR(read_buffer.c_slice()))) + .value()); + // Read header and trailers from control endpoint. + // Read message padding and message from data endpoint. + return TryJoin( + control_endpoint_->Read(frame_header_->GetFrameLength()), + data_endpoint_->Read(frame_header_->message_padding + + frame_header_->message_length)); + }, + // Construct and send the server frame to corresponding stream. + [this](std::tuple ret) mutable { + control_endpoint_read_buffer_ = std::move(std::get<0>(ret)); + // Discard message padding and only keep message in data read buffer. + std::get<1>(ret).MoveLastNBytesIntoSliceBuffer( + frame_header_->message_length, data_endpoint_read_buffer_); + ServerFragmentFrame frame; + // Initialized to get this_cpu() info in global_stat(). + ExecCtx exec_ctx; + // Deserialize frame from read buffer. + absl::BitGen bitgen; + auto status = frame.Deserialize(hpack_parser_.get(), *frame_header_, + absl::BitGenRef(bitgen), + control_endpoint_read_buffer_); + GPR_ASSERT(status.ok()); + // Move message into frame. + frame.message = arena_->MakePooled( + std::move(data_endpoint_read_buffer_), 0); + auto stream_id = frame.frame_header.stream_id; + { + MutexLock lock(&mu_); + return stream_map_[stream_id]->Push(ServerFrame(std::move(frame))); + } + }, + // Check if send frame to corresponding stream successfully. + [](bool ret) -> LoopCtl { + if (ret) { + // Send incoming frames successfully. + return Continue(); + } else { + return absl::InternalError("Send incoming frames failed."); + } + }); + }); + reader_ = MakeActivity( + // Continuously read next incoming frames from promise endpoints. + std::move(read_loop), EventEngineWakeupScheduler(event_engine_), + [](absl::Status status) { + GPR_ASSERT(status.code() == absl::StatusCode::kCancelled || + status.code() == absl::StatusCode::kInternal); + // TODO(ladynana): handle the promise endpoint read failures with + // iterating stream_map_ and close all the pipes once available. + }, + // Hold Arena in activity for GetContext usage. + arena_.get()); } } // namespace chaotic_good diff --git a/src/core/ext/transport/chaotic_good/client_transport.h b/src/core/ext/transport/chaotic_good/client_transport.h index 9b2129c083f..a2110f11922 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.h +++ b/src/core/ext/transport/chaotic_good/client_transport.h @@ -17,29 +17,42 @@ #include -#include #include +#include #include // IWYU pragma: keep +#include #include +#include #include #include #include "absl/base/thread_annotations.h" #include "absl/status/status.h" +#include "absl/types/optional.h" #include "absl/types/variant.h" #include +#include +#include #include "src/core/ext/transport/chaotic_good/frame.h" #include "src/core/ext/transport/chaotic_good/frame_header.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" +#include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/for_each.h" +#include "src/core/lib/promise/if.h" +#include "src/core/lib/promise/inter_activity_pipe.h" +#include "src/core/lib/promise/loop.h" #include "src/core/lib/promise/mpsc.h" #include "src/core/lib/promise/pipe.h" -#include "src/core/lib/promise/seq.h" +#include "src/core/lib/promise/poll.h" +#include "src/core/lib/promise/try_join.h" +#include "src/core/lib/promise/try_seq.h" +#include "src/core/lib/resource_quota/arena.h" +#include "src/core/lib/resource_quota/memory_quota.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep #include "src/core/lib/transport/promise_endpoint.h" @@ -58,18 +71,31 @@ class ClientTransport { if (writer_ != nullptr) { writer_.reset(); } + if (reader_ != nullptr) { + reader_.reset(); + } } auto AddStream(CallArgs call_args) { // At this point, the connection is set up. // Start sending data frames. uint32_t stream_id; + InterActivityPipe pipe_server_frames; { MutexLock lock(&mu_); stream_id = next_stream_id_++; + stream_map_.insert( + std::pair::Sender>>( + stream_id, std::make_shared::Sender>( + std::move(pipe_server_frames.sender)))); } - return Seq( - // Continuously send data frame with client to server messages. - ForEach(std::move(*call_args.client_to_server_messages), + return TrySeq( + TryJoin( + // Continuously send client frame with client to server messages. + ForEach( + std::move(*call_args.client_to_server_messages), [stream_id, initial_frame = true, client_initial_metadata = std::move(call_args.client_initial_metadata), @@ -89,7 +115,7 @@ class ClientTransport { frame.headers = std::move(client_initial_metadata); initial_frame = false; } - return Seq( + return TrySeq( outgoing_frames.Send(ClientFrame(std::move(frame))), [](bool success) -> absl::Status { if (!success) { @@ -98,24 +124,86 @@ class ClientTransport { } return absl::OkStatus(); }); - })); + }), + // Continuously receive server frames from endpoints and save + // results to call_args. + Loop([server_initial_metadata = call_args.server_initial_metadata, + server_to_client_messages = + call_args.server_to_client_messages, + receiver = std::move(pipe_server_frames.receiver)]() mutable { + return TrySeq( + // Receive incoming server frame. + receiver.Next(), + // Save incomming frame results to call_args. + [server_initial_metadata, server_to_client_messages]( + absl::optional server_frame) mutable { + GPR_ASSERT(server_frame.has_value()); + auto frame = std::move( + absl::get(*server_frame)); + bool has_headers = (frame.headers != nullptr); + bool has_message = (frame.message != nullptr); + bool has_trailers = (frame.trailers != nullptr); + return TrySeq( + If( + has_headers, + [server_initial_metadata, + headers = std::move(frame.headers)]() mutable { + return server_initial_metadata->Push( + std::move(headers)); + }, + [] { return false; }), + If( + has_message, + [server_to_client_messages, + message = std::move(frame.message)]() mutable { + return server_to_client_messages->Push( + std::move(message)); + }, + [] { return false; }), + If( + has_trailers, + [trailers = std::move(frame.trailers)]() mutable + -> LoopCtl { + return std::move(trailers); + }, + []() -> LoopCtl { + return Continue(); + })); + }); + })), + [](std::tuple ret) { + return std::move(std::get<1>(ret)); + }); } private: // Max buffer is set to 4, so that for stream writes each time it will queue // at most 2 frames. MpscReceiver outgoing_frames_; - Mutex mu_; - uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1; + // Queue size of each stream pipe is set to 2, so that for each stream read it + // will queue at most 2 frames. + static const size_t server_frame_queue_size_ = 2; // Assigned aligned bytes from setting frame. size_t aligned_bytes = 64; + Mutex mu_; + uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1; + // Map of stream incoming server frames, key is stream_id. + std::map::Sender>> + stream_map_ ABSL_GUARDED_BY(mu_); ActivityPtr writer_; ActivityPtr reader_; std::unique_ptr control_endpoint_; std::unique_ptr data_endpoint_; SliceBuffer control_endpoint_write_buffer_; SliceBuffer data_endpoint_write_buffer_; + SliceBuffer control_endpoint_read_buffer_; + SliceBuffer data_endpoint_read_buffer_; std::unique_ptr hpack_compressor_; + std::unique_ptr hpack_parser_; + std::shared_ptr frame_header_; + MemoryAllocator memory_allocator_; + ScopedArenaPtr arena_; // Use to synchronize writer_ and reader_ activity with outside activities; std::shared_ptr event_engine_; }; diff --git a/src/core/ext/transport/chaotic_good/frame.cc b/src/core/ext/transport/chaotic_good/frame.cc index a463c2e02b6..2f7f938e7c3 100644 --- a/src/core/ext/transport/chaotic_good/frame.cc +++ b/src/core/ext/transport/chaotic_good/frame.cc @@ -32,6 +32,8 @@ #include "src/core/lib/gprpp/bitset.h" #include "src/core/lib/gprpp/no_destruct.h" #include "src/core/lib/gprpp/status_helper.h" +#include "src/core/lib/promise/context.h" +#include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" @@ -134,7 +136,9 @@ absl::StatusOr> ReadMetadata( absl::BitGenRef bitsrc) { if (!maybe_slices.ok()) return maybe_slices.status(); auto& slices = *maybe_slices; - Arena::PoolPtr metadata; + auto arena = GetContext(); + GPR_ASSERT(arena != nullptr); + Arena::PoolPtr metadata = arena->MakePooled(arena); parser->BeginFrame( metadata.get(), std::numeric_limits::max(), std::numeric_limits::max(), diff --git a/src/core/ext/transport/chaotic_good/frame.h b/src/core/ext/transport/chaotic_good/frame.h index eca8200a1a8..8e5031802e5 100644 --- a/src/core/ext/transport/chaotic_good/frame.h +++ b/src/core/ext/transport/chaotic_good/frame.h @@ -97,6 +97,7 @@ struct ServerFragmentFrame final : public FrameInterface { FrameHeader frame_header; ServerMetadataHandle headers; + MessageHandle message; ServerMetadataHandle trailers; bool operator==(const ServerFragmentFrame& other) const { diff --git a/src/core/lib/transport/promise_endpoint.cc b/src/core/lib/transport/promise_endpoint.cc index 660d183a647..e9bc70a2e16 100644 --- a/src/core/lib/transport/promise_endpoint.cc +++ b/src/core/lib/transport/promise_endpoint.cc @@ -46,10 +46,10 @@ PromiseEndpoint::PromiseEndpoint( } PromiseEndpoint::~PromiseEndpoint() { - // Last write result has not been polled. - GPR_ASSERT(!write_result_.has_value()); - // Last read result has not been polled. - GPR_ASSERT(!read_result_.has_value()); + // Promise endpoint close when last write result has not been polled. + write_result_.reset(); + // Promise endpoint close when last read result has not been polled. + read_result_.reset(); } const grpc_event_engine::experimental::EventEngine::ResolvedAddress& diff --git a/test/core/transport/chaotic_good/BUILD b/test/core/transport/chaotic_good/BUILD index 9c7cd464cdb..036c894e321 100644 --- a/test/core/transport/chaotic_good/BUILD +++ b/test/core/transport/chaotic_good/BUILD @@ -87,7 +87,9 @@ grpc_cc_test( srcs = ["client_transport_test.cc"], external_deps = [ "absl/functional:any_invocable", + "absl/status:statusor", "absl/strings:str_format", + "absl/types:optional", "gtest", ], language = "C++", @@ -95,12 +97,17 @@ grpc_cc_test( uses_polling = False, deps = [ "//:grpc", + "//:grpc_public_hdrs", "//:iomgr_timer", "//:ref_counted_ptr", "//src/core:activity", "//src/core:arena", "//src/core:chaotic_good_client_transport", + "//src/core:event_engine_wakeup_scheduler", + "//src/core:if", "//src/core:join", + "//src/core:loop", + "//src/core:map", "//src/core:memory_quota", "//src/core:pipe", "//src/core:resource_quota", @@ -109,6 +116,5 @@ grpc_cc_test( "//src/core:slice_buffer", "//test/core/event_engine/fuzzing_event_engine", "//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto", - "//test/core/promise:test_wakeup_schedulers", ], ) diff --git a/test/core/transport/chaotic_good/client_transport_test.cc b/test/core/transport/chaotic_good/client_transport_test.cc index 96889082a2c..86bbf578c29 100644 --- a/test/core/transport/chaotic_good/client_transport_test.cc +++ b/test/core/transport/chaotic_good/client_transport_test.cc @@ -18,36 +18,46 @@ #include // IWYU pragma: keep #include +#include // IWYU pragma: keep #include #include // IWYU pragma: keep #include "absl/functional/any_invocable.h" +#include "absl/status/statusor.h" // IWYU pragma: keep #include "absl/strings/str_format.h" // IWYU pragma: keep +#include "absl/types/optional.h" // IWYU pragma: keep #include "gmock/gmock.h" #include "gtest/gtest.h" #include #include +#include // IWYU pragma: keep #include #include +#include // IWYU pragma: keep #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/timer_manager.h" #include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/event_engine_wakeup_scheduler.h" +#include "src/core/lib/promise/if.h" #include "src/core/lib/promise/join.h" +#include "src/core/lib/promise/loop.h" +#include "src/core/lib/promise/map.h" #include "src/core/lib/promise/pipe.h" #include "src/core/lib/promise/seq.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/resource_quota/memory_quota.h" #include "src/core/lib/resource_quota/resource_quota.h" -#include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" +#include "src/core/lib/slice/slice_internal.h" // IWYU pragma: keep +#include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" -#include "test/core/promise/test_wakeup_schedulers.h" using testing::MockFunction; using testing::Return; +using testing::Sequence; using testing::StrictMock; using testing::WithArgs; @@ -101,29 +111,187 @@ class ClientTransportTest : public ::testing::Test { return options; }(), fuzzing_event_engine::Actions())), - client_transport_( - std::make_unique( - std::unique_ptr(control_endpoint_ptr_), - SliceBuffer()), - std::make_unique( - std::unique_ptr(data_endpoint_ptr_), - SliceBuffer()), - std::static_pointer_cast< - grpc_event_engine::experimental::EventEngine>(event_engine_)), arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)), pipe_client_to_server_messages_(arena_.get()), - pipe_client_to_server_messages_second_(arena_.get()) {} - - std::vector CreateMessages(int num_of_messages) { - std::vector messages; - for (int i = 0; i < num_of_messages; i++) { - SliceBuffer buffer; - buffer.Append( - Slice::FromCopiedString(absl::StrFormat("test message %d", i))); - auto message = arena_->MakePooled(std::move(buffer), 0); - messages.push_back(std::move(message)); + pipe_server_to_client_messages_(arena_.get()), + pipe_server_intial_metadata_(arena_.get()), + pipe_client_to_server_messages_second_(arena_.get()), + pipe_server_to_client_messages_second_(arena_.get()), + pipe_server_intial_metadata_second_(arena_.get()) {} + // Expect how client transport will read from control/data endpoints with a + // test frame. + void AddReadExpectations(int num_of_streams) { + for (int i = 0; i < num_of_streams; i++) { + EXPECT_CALL(control_endpoint_, Read) + .InSequence(control_endpoint_sequence) + .WillOnce(WithArgs<0, 1>( + [this, i](absl::AnyInvocable on_read, + grpc_event_engine::experimental::SliceBuffer* + buffer) mutable { + // Construct test frame for EventEngine read: headers (15 + // bytes), message(16 bytes), message padding (48 byte), + // trailers (15 bytes). + const std::string frame_header = { + static_cast(0x80), // frame type = fragment + 0x03, // flag = has header + has trailer + 0x00, + 0x00, + static_cast(i + 1), // stream id = 1 + 0x00, + 0x00, + 0x00, + 0x1a, // header length = 26 + 0x00, + 0x00, + 0x00, + 0x08, // message length = 8 + 0x00, + 0x00, + 0x00, + 0x38, // message padding =56 + 0x00, + 0x00, + 0x00, + 0x0f, // trailer length = 15 + 0x00, + 0x00, + 0x00}; + // Schedule mock_endpoint to read buffer. + grpc_event_engine::experimental::Slice slice( + grpc_slice_from_cpp_string(frame_header)); + buffer->Append(std::move(slice)); + // Execute read callback later to control when read starts. + if (i == 0) { + read_callback_ = std::move(on_read); + // Return false to mock EventEngine read not finish. + return false; + } else { + return true; + } + })); + EXPECT_CALL(control_endpoint_, Read) + .InSequence(control_endpoint_sequence) + .WillOnce(WithArgs<1>( + [](grpc_event_engine::experimental::SliceBuffer* buffer) { + // Encoded string of header ":path: /demo.Service/Step". + const std::string header = { + 0x10, 0x05, 0x3a, 0x70, 0x61, 0x74, 0x68, 0x12, 0x2f, + 0x64, 0x65, 0x6d, 0x6f, 0x2e, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x2f, 0x53, 0x74, 0x65, 0x70}; + // Encoded string of trailer "grpc-status: 0". + const std::string trailers = {0x10, 0x0b, 0x67, 0x72, 0x70, + 0x63, 0x2d, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x01, 0x30}; + // Schedule mock_endpoint to read buffer. + grpc_event_engine::experimental::Slice slice( + grpc_slice_from_cpp_string(header + trailers)); + buffer->Append(std::move(slice)); + return true; + })); + } + EXPECT_CALL(control_endpoint_, Read) + .InSequence(control_endpoint_sequence) + .WillOnce(Return(false)); + for (int i = 0; i < num_of_streams; i++) { + EXPECT_CALL(data_endpoint_, Read) + .InSequence(data_endpoint_sequence) + .WillOnce(WithArgs<1>( + [this](grpc_event_engine::experimental::SliceBuffer* buffer) { + const std::string message_padding = { + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + grpc_event_engine::experimental::Slice slice( + grpc_slice_from_cpp_string(message_padding + message_)); + buffer->Append(std::move(slice)); + return true; + })); } - return messages; + } + // Initial ClientTransport with read expecations + void InitialClientTransport(int num_of_streams) { + // Read expectaions need to be added before transport initialization since + // reader_ activity loop is started in ClientTransport initialization, + AddReadExpectations(num_of_streams); + client_transport_ = std::make_unique( + std::make_unique( + std::unique_ptr(control_endpoint_ptr_), + SliceBuffer()), + std::make_unique( + std::unique_ptr(data_endpoint_ptr_), SliceBuffer()), + event_engine_); + } + // Send messages from client to server. + auto SendClientToServerMessages( + Pipe& pipe_client_to_server_messages, + int num_of_messages) { + return Loop([&pipe_client_to_server_messages, num_of_messages, + this]() mutable { + bool has_message = (num_of_messages > 0); + return If( + has_message, + Seq(pipe_client_to_server_messages.sender.Push( + arena_->MakePooled()), + [&num_of_messages]() -> LoopCtl { + num_of_messages--; + return Continue(); + }), + [&pipe_client_to_server_messages]() mutable -> LoopCtl { + pipe_client_to_server_messages.sender.Close(); + return absl::OkStatus(); + }); + }); + } + // Add stream into client transport, and expect return trailers of + // "grpc-status:code". + auto AddStream(CallArgs args, const grpc_status_code trailers) { + return Seq(client_transport_->AddStream(std::move(args)), + [trailers](ServerMetadataHandle ret) { + // AddStream will finish with server trailers: + // "grpc-status:code". + EXPECT_EQ(ret->get(GrpcStatusMetadata()).value(), trailers); + return trailers; + }); + } + // Start read from control endpoints. + auto StartRead(const absl::Status& read_status) { + return [read_status, this] { + read_callback_(read_status); + return read_status; + }; + } + // Receive messages from server to client. + auto ReceiveServerToClientMessages( + Pipe& pipe_server_intial_metadata, + Pipe& pipe_server_to_client_messages) { + return Seq( + // Receive server initial metadata. + Map(pipe_server_intial_metadata.receiver.Next(), + [](NextResult r) { + // Expect value: ":path: /demo.Service/Step" + EXPECT_TRUE(r.has_value()); + EXPECT_EQ( + r.value()->get_pointer(HttpPathMetadata())->as_string_view(), + "/demo.Service/Step"); + return absl::OkStatus(); + }), + // Receive server to client messages. + Map(pipe_server_to_client_messages.receiver.Next(), + [this](NextResult r) { + EXPECT_TRUE(r.has_value()); + EXPECT_EQ(r.value()->payload()->JoinIntoString(), message_); + return absl::OkStatus(); + }), + [&pipe_server_intial_metadata, + &pipe_server_to_client_messages]() mutable { + // Close pipes after receive message. + pipe_server_to_client_messages.sender.Close(); + pipe_server_intial_metadata.sender.Close(); + return absl::OkStatus(); + }); } private: @@ -131,97 +299,65 @@ class ClientTransportTest : public ::testing::Test { MockEndpoint* data_endpoint_ptr_; size_t initial_arena_size = 1024; MemoryAllocator memory_allocator_; + Sequence control_endpoint_sequence; + Sequence data_endpoint_sequence; protected: MockEndpoint& control_endpoint_; MockEndpoint& data_endpoint_; std::shared_ptr event_engine_; - ClientTransport client_transport_; + std::unique_ptr client_transport_; ScopedArenaPtr arena_; Pipe pipe_client_to_server_messages_; + Pipe pipe_server_to_client_messages_; + Pipe pipe_server_intial_metadata_; // Added for mutliple streams tests. Pipe pipe_client_to_server_messages_second_; - - const absl::Status kDummyErrorStatus = - absl::ErrnoToStatus(5566, "just an error"); - static constexpr size_t kDummyRequestSize = 5566u; + Pipe pipe_server_to_client_messages_second_; + Pipe pipe_server_intial_metadata_second_; + absl::AnyInvocable read_callback_; + // Added to verify received message payload. + const std::string message_ = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}; }; TEST_F(ClientTransportTest, AddOneStream) { - auto messages = CreateMessages(1); + InitialClientTransport(1); ClientMetadataHandle md; - auto args = CallArgs{ - std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, - nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; + auto args = CallArgs{std::move(md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_.sender, + &pipe_client_to_server_messages_.receiver, + &pipe_server_to_client_messages_.sender}; StrictMock> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(control_endpoint_, Write).WillOnce(Return(true)); EXPECT_CALL(data_endpoint_, Write).WillOnce(Return(true)); auto activity = MakeActivity( Seq( - // Concurrently: send message into the pipe, and receive from the - // pipe. - Join(Seq(pipe_client_to_server_messages_.sender.Push( - std::move(messages[0])), - [this] { - this->pipe_client_to_server_messages_.sender.Close(); - return absl::OkStatus(); - }), - client_transport_.AddStream(std::move(args))), - // Once complete, verify successful sending and the received value. - [](const std::tuple& ret) { - EXPECT_TRUE(std::get<0>(ret).ok()); - EXPECT_TRUE(std::get<1>(ret).ok()); - return absl::OkStatus(); - }), - InlineWakeupScheduler(), - [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); - // Wait until ClientTransport's internal activities to finish. - event_engine_->TickUntilIdle(); - event_engine_->UnsetGlobalHooks(); -} - -TEST_F(ClientTransportTest, AddOneStreamWithEEFailed) { - auto messages = CreateMessages(1); - ClientMetadataHandle md; - auto args = CallArgs{ - std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, - nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; - StrictMock> on_done; - EXPECT_CALL(on_done, Call(absl::OkStatus())); - EXPECT_CALL(control_endpoint_, Write) - .WillOnce( - WithArgs<0>([this](absl::AnyInvocable on_write) { - on_write(this->kDummyErrorStatus); - return false; - })); - EXPECT_CALL(data_endpoint_, Write) - .WillOnce( - WithArgs<0>([this](absl::AnyInvocable on_write) { - on_write(this->kDummyErrorStatus); - return false; - })); - auto activity = MakeActivity( - Seq( - // Concurrently: send message into the pipe, and receive from the - // pipe. - Join(Seq(pipe_client_to_server_messages_.sender.Push( - std::move(messages[0])), - [this] { - this->pipe_client_to_server_messages_.sender.Close(); - return absl::OkStatus(); - }), - client_transport_.AddStream(std::move(args))), + // Concurrently: write and read messages in client transport. + Join( + // Add first stream with call_args into client transport. + AddStream(std::move(args), GRPC_STATUS_OK), + // Start read from control endpoints. + StartRead(absl::OkStatus()), + // Send messages to call_args.client_to_server_messages pipe, + // which will be eventually sent to control/data endpoints. + SendClientToServerMessages(pipe_client_to_server_messages_, 1), + // Receive messages from control/data endpoints. + ReceiveServerToClientMessages(pipe_server_intial_metadata_, + pipe_server_to_client_messages_)), // Once complete, verify successful sending and the received value. - [](const std::tuple& ret) { - // TODO(ladynana): change these expectations to errors after the - // writer activity closes transport for EE failures. - EXPECT_TRUE(std::get<0>(ret).ok()); + [](const std::tuple& ret) { + EXPECT_EQ(std::get<0>(ret), GRPC_STATUS_OK); EXPECT_TRUE(std::get<1>(ret).ok()); + EXPECT_TRUE(std::get<2>(ret).ok()); + EXPECT_TRUE(std::get<3>(ret).ok()); return absl::OkStatus(); }), - InlineWakeupScheduler(), + EventEngineWakeupScheduler(event_engine_), [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); // Wait until ClientTransport's internal activities to finish. event_engine_->TickUntilIdle(); @@ -229,89 +365,42 @@ TEST_F(ClientTransportTest, AddOneStreamWithEEFailed) { } TEST_F(ClientTransportTest, AddOneStreamMultipleMessages) { - auto messages = CreateMessages(3); + InitialClientTransport(1); ClientMetadataHandle md; - auto args = CallArgs{ - std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, - nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; + auto args = CallArgs{std::move(md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_.sender, + &pipe_client_to_server_messages_.receiver, + &pipe_server_to_client_messages_.sender}; StrictMock> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(control_endpoint_, Write).Times(3).WillRepeatedly(Return(true)); EXPECT_CALL(data_endpoint_, Write).Times(3).WillRepeatedly(Return(true)); auto activity = MakeActivity( Seq( - // Concurrently: send messages into the pipe, and receive from the - // pipe. - Join(Seq(pipe_client_to_server_messages_.sender.Push( - std::move(messages[0])), - pipe_client_to_server_messages_.sender.Push( - std::move(messages[1])), - pipe_client_to_server_messages_.sender.Push( - std::move(messages[2])), - [this] { - this->pipe_client_to_server_messages_.sender.Close(); - return absl::OkStatus(); - }), - client_transport_.AddStream(std::move(args))), - // Once complete, verify successful sending and the received value. - [](const std::tuple& ret) { - EXPECT_TRUE(std::get<0>(ret).ok()); - EXPECT_TRUE(std::get<1>(ret).ok()); - return absl::OkStatus(); - }), - InlineWakeupScheduler(), - [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); - // Wait until ClientTransport's internal activities to finish. - event_engine_->TickUntilIdle(); - event_engine_->UnsetGlobalHooks(); -} - -TEST_F(ClientTransportTest, AddMultipleStreams) { - auto messages = CreateMessages(2); - ClientMetadataHandle md; - auto first_stream_args = CallArgs{ - std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, - nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; - auto second_stream_args = CallArgs{ - std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, - nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr}; - StrictMock> on_done; - EXPECT_CALL(on_done, Call(absl::OkStatus())); - EXPECT_CALL(control_endpoint_, Write).Times(2).WillRepeatedly(Return(true)); - EXPECT_CALL(data_endpoint_, Write).Times(2).WillRepeatedly(Return(true)); - auto activity = MakeActivity( - Seq( - // Concurrently: send messages into the pipe, and receive from the - // pipe. + // Concurrently: write and read messages in client transport. Join( - // Send message to first stream pipe. - Seq(pipe_client_to_server_messages_.sender.Push( - std::move(messages[0])), - [this] { - pipe_client_to_server_messages_.sender.Close(); - return absl::OkStatus(); - }), - // Send message to second stream pipe. - Seq(pipe_client_to_server_messages_second_.sender.Push( - std::move(messages[1])), - [this] { - pipe_client_to_server_messages_second_.sender.Close(); - return absl::OkStatus(); - }), - // Receive message from first stream pipe. - client_transport_.AddStream(std::move(first_stream_args)), - // Receive message from second stream pipe. - client_transport_.AddStream(std::move(second_stream_args))), + // Add first stream with call_args into client transport. + AddStream(std::move(args), GRPC_STATUS_OK), + // Start read from control endpoints. + StartRead(absl::OkStatus()), + // Send messages to call_args.client_to_server_messages pipe, + // which will be eventually sent to control/data endpoints. + SendClientToServerMessages(pipe_client_to_server_messages_, 3), + // Receive messages from control/data endpoints. + ReceiveServerToClientMessages(pipe_server_intial_metadata_, + pipe_server_to_client_messages_)), // Once complete, verify successful sending and the received value. - [](const std::tuple& ret) { - EXPECT_TRUE(std::get<0>(ret).ok()); + EXPECT_EQ(std::get<0>(ret), GRPC_STATUS_OK); EXPECT_TRUE(std::get<1>(ret).ok()); EXPECT_TRUE(std::get<2>(ret).ok()); EXPECT_TRUE(std::get<3>(ret).ok()); return absl::OkStatus(); }), - InlineWakeupScheduler(), + EventEngineWakeupScheduler(event_engine_), [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); // Wait until ClientTransport's internal activities to finish. event_engine_->TickUntilIdle(); @@ -319,59 +408,63 @@ TEST_F(ClientTransportTest, AddMultipleStreams) { } TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) { - auto messages = CreateMessages(6); - ClientMetadataHandle md; - auto first_stream_args = CallArgs{ - std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, - nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; - auto second_stream_args = CallArgs{ - std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, - nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr}; + InitialClientTransport(2); + ClientMetadataHandle first_stream_md; + ClientMetadataHandle second_stream_md; + auto first_stream_args = + CallArgs{std::move(first_stream_md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_.sender, + &pipe_client_to_server_messages_.receiver, + &pipe_server_to_client_messages_.sender}; + auto second_stream_args = + CallArgs{std::move(second_stream_md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_second_.sender, + &pipe_client_to_server_messages_second_.receiver, + &pipe_server_to_client_messages_second_.sender}; StrictMock> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(control_endpoint_, Write).Times(6).WillRepeatedly(Return(true)); EXPECT_CALL(data_endpoint_, Write).Times(6).WillRepeatedly(Return(true)); auto activity = MakeActivity( Seq( - // Concurrently: send messages into the pipe, and receive from the - // pipe. + // Concurrently: write and read messages from client transport. + Join( + // Add first stream with call_args into client transport. + AddStream(std::move(first_stream_args), GRPC_STATUS_OK), + // Start read from control endpoints. + StartRead(absl::OkStatus()), + // Send messages to first stream's + // call_args.client_to_server_messages pipe, which will be + // eventually sent to control/data endpoints. + SendClientToServerMessages(pipe_client_to_server_messages_, 3), + // Receive first stream's messages from control/data endpoints. + ReceiveServerToClientMessages(pipe_server_intial_metadata_, + pipe_server_to_client_messages_)), Join( - // Send messages to first stream pipe. - Seq(pipe_client_to_server_messages_.sender.Push( - std::move(messages[0])), - pipe_client_to_server_messages_.sender.Push( - std::move(messages[1])), - pipe_client_to_server_messages_.sender.Push( - std::move(messages[2])), - [this] { - pipe_client_to_server_messages_.sender.Close(); - return absl::OkStatus(); - }), - // Send messages to second stream pipe. - Seq(pipe_client_to_server_messages_second_.sender.Push( - std::move(messages[3])), - pipe_client_to_server_messages_second_.sender.Push( - std::move(messages[4])), - pipe_client_to_server_messages_second_.sender.Push( - std::move(messages[5])), - [this] { - pipe_client_to_server_messages_second_.sender.Close(); - return absl::OkStatus(); - }), - // Receive messages from first stream pipe. - client_transport_.AddStream(std::move(first_stream_args)), - // Receive messages from second stream pipe. - client_transport_.AddStream(std::move(second_stream_args))), + // Add second stream with call_args into client transport. + AddStream(std::move(second_stream_args), GRPC_STATUS_OK), + // Send messages to second stream's + // call_args.client_to_server_messages pipe, which will be + // eventually sent to control/data endpoints. + SendClientToServerMessages(pipe_client_to_server_messages_second_, + 3), + // Receive second stream's messages from control/data endpoints. + ReceiveServerToClientMessages( + pipe_server_intial_metadata_second_, + pipe_server_to_client_messages_second_)), // Once complete, verify successful sending and the received value. - [](const std::tuple& ret) { - EXPECT_TRUE(std::get<0>(ret).ok()); + [](const std::tuple& + ret) { + EXPECT_EQ(std::get<0>(ret), GRPC_STATUS_OK); EXPECT_TRUE(std::get<1>(ret).ok()); EXPECT_TRUE(std::get<2>(ret).ok()); - EXPECT_TRUE(std::get<3>(ret).ok()); return absl::OkStatus(); }), - InlineWakeupScheduler(), + EventEngineWakeupScheduler(event_engine_), [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); // Wait until ClientTransport's internal activities to finish. event_engine_->TickUntilIdle();