diff --git a/CMakeLists.txt b/CMakeLists.txt index 71f4d86d309..4d4512af35c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -919,6 +919,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx client_ssl_test) endif() add_dependencies(buildtests_cxx client_streaming_test) + add_dependencies(buildtests_cxx client_transport_test) add_dependencies(buildtests_cxx cmdline_test) add_dependencies(buildtests_cxx codegen_test_full) add_dependencies(buildtests_cxx codegen_test_minimal) @@ -9195,6 +9196,49 @@ target_link_libraries(client_streaming_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(client_transport_test + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h + src/core/ext/transport/chaotic_good/client_transport.cc + src/core/ext/transport/chaotic_good/frame.cc + src/core/ext/transport/chaotic_good/frame_header.cc + src/core/lib/transport/promise_endpoint.cc + test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc + test/core/transport/chaotic_good/client_transport_test.cc +) +target_compile_features(client_transport_test PUBLIC cxx_std_14) +target_include_directories(client_transport_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(client_transport_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gtest + ${_gRPC_PROTOBUF_LIBRARIES} + grpc_test_util +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index f74c2a16850..dea3a4fb7b6 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -6935,6 +6935,35 @@ targets: - grpc_authorization_provider - grpc_unsecure - grpc_test_util +- name: client_transport_test + gtest: true + build: test + language: c++ + headers: + - src/core/ext/transport/chaotic_good/client_transport.h + - src/core/ext/transport/chaotic_good/frame.h + - src/core/ext/transport/chaotic_good/frame_header.h + - src/core/lib/promise/detail/join_state.h + - src/core/lib/promise/event_engine_wakeup_scheduler.h + - src/core/lib/promise/join.h + - src/core/lib/promise/mpsc.h + - src/core/lib/promise/wait_set.h + - src/core/lib/transport/promise_endpoint.h + - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h + - test/core/promise/test_wakeup_schedulers.h + src: + - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto + - src/core/ext/transport/chaotic_good/client_transport.cc + - src/core/ext/transport/chaotic_good/frame.cc + - src/core/ext/transport/chaotic_good/frame_header.cc + - src/core/lib/transport/promise_endpoint.cc + - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc + - test/core/transport/chaotic_good/client_transport_test.cc + deps: + - gtest + - protobuf + - grpc_test_util + uses_polling: false - name: cmdline_test gtest: true build: test diff --git a/src/core/BUILD b/src/core/BUILD index 44458014cc0..d2941ecb7f8 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -5976,6 +5976,43 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "chaotic_good_client_transport", + srcs = [ + "ext/transport/chaotic_good/client_transport.cc", + ], + hdrs = [ + "ext/transport/chaotic_good/client_transport.h", + ], + external_deps = [ + "absl/base:core_headers", + "absl/status", + "absl/status:statusor", + "absl/types:variant", + ], + language = "c++", + deps = [ + "activity", + "chaotic_good_frame", + "chaotic_good_frame_header", + "event_engine_wakeup_scheduler", + "for_each", + "grpc_promise_endpoint", + "join", + "loop", + "match", + "mpsc", + "pipe", + "seq", + "slice", + "slice_buffer", + "//:gpr", + "//:gpr_platform", + "//:grpc_base", + "//:hpack_encoder", + ], +) + ### UPB Targets grpc_upb_proto_library( diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc new file mode 100644 index 00000000000..88364499d2c --- /dev/null +++ b/src/core/ext/transport/chaotic_good/client_transport.cc @@ -0,0 +1,120 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/ext/transport/chaotic_good/client_transport.h" + +#include +#include +#include + +#include "absl/status/statusor.h" + +#include +#include +#include + +#include "src/core/ext/transport/chaotic_good/frame.h" +#include "src/core/ext/transport/chaotic_good/frame_header.h" +#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" +#include "src/core/lib/gprpp/match.h" +#include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/event_engine_wakeup_scheduler.h" +#include "src/core/lib/promise/join.h" +#include "src/core/lib/promise/loop.h" +#include "src/core/lib/slice/slice.h" +#include "src/core/lib/slice/slice_buffer.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/transport/promise_endpoint.h" + +namespace grpc_core { +namespace chaotic_good { + +ClientTransport::ClientTransport( + std::unique_ptr control_endpoint, + std::unique_ptr data_endpoint, + std::shared_ptr event_engine) + : outgoing_frames_(MpscReceiver(4)), + control_endpoint_(std::move(control_endpoint)), + data_endpoint_(std::move(data_endpoint)), + control_endpoint_write_buffer_(SliceBuffer()), + data_endpoint_write_buffer_(SliceBuffer()), + hpack_compressor_(std::make_unique()), + event_engine_(event_engine) { + auto write_loop = Loop([this] { + return Seq( + // Get next outgoing frame. + this->outgoing_frames_.Next(), + // Construct data buffers that will be sent to the endpoints. + [this](ClientFrame client_frame) { + MatchMutable( + &client_frame, + [this](ClientFragmentFrame* frame) mutable { + control_endpoint_write_buffer_.Append( + frame->Serialize(hpack_compressor_.get())); + if (frame->message != nullptr) { + auto frame_header = + FrameHeader::Parse( + reinterpret_cast(GRPC_SLICE_START_PTR( + control_endpoint_write_buffer_.c_slice_buffer() + ->slices[0]))) + .value(); + std::string message_padding(frame_header.message_padding, + '0'); + Slice slice(grpc_slice_from_cpp_string(message_padding)); + // Append message payload to data_endpoint_buffer. + data_endpoint_write_buffer_.Append(std::move(slice)); + // Append message payload to data_endpoint_buffer. + frame->message->payload()->MoveFirstNBytesIntoSliceBuffer( + frame->message->payload()->Length(), + data_endpoint_write_buffer_); + } + }, + [this](CancelFrame* frame) mutable { + control_endpoint_write_buffer_.Append( + frame->Serialize(hpack_compressor_.get())); + }); + return absl::OkStatus(); + }, + // Write buffers to corresponding endpoints concurrently. + [this]() { + return Join(this->control_endpoint_->Write( + std::move(control_endpoint_write_buffer_)), + this->data_endpoint_->Write( + std::move(data_endpoint_write_buffer_))); + }, + // Finish writes and return status. + [](std::tuple ret) + -> LoopCtl { + // If writes failed, return failure status. + if (!(std::get<0>(ret).ok() || std::get<1>(ret).ok())) { + // TODO(ladynana): handle the promise endpoint write failures with + // closing the transport. + return absl::InternalError("Promise endpoint writes failed."); + } + return Continue(); + }); + }); + writer_ = MakeActivity( + // Continuously write next outgoing frames to promise endpoints. + std::move(write_loop), EventEngineWakeupScheduler(event_engine_), + [](absl::Status status) { + GPR_ASSERT(status.code() == absl::StatusCode::kCancelled || + status.code() == absl::StatusCode::kInternal); + }); +} + +} // namespace chaotic_good +} // namespace grpc_core diff --git a/src/core/ext/transport/chaotic_good/client_transport.h b/src/core/ext/transport/chaotic_good/client_transport.h new file mode 100644 index 00000000000..d8e9cca3632 --- /dev/null +++ b/src/core/ext/transport/chaotic_good/client_transport.h @@ -0,0 +1,115 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CLIENT_TRANSPORT_H +#define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CLIENT_TRANSPORT_H + +#include + +#include + +#include // IWYU pragma: keep +#include +#include +#include + +#include "absl/base/thread_annotations.h" +#include "absl/status/status.h" +#include "absl/types/variant.h" + +#include + +#include "src/core/ext/transport/chaotic_good/frame.h" +#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/for_each.h" +#include "src/core/lib/promise/mpsc.h" +#include "src/core/lib/promise/pipe.h" +#include "src/core/lib/promise/seq.h" +#include "src/core/lib/slice/slice_buffer.h" +#include "src/core/lib/transport/promise_endpoint.h" +#include "src/core/lib/transport/transport.h" + +namespace grpc_core { +namespace chaotic_good { + +class ClientTransport { + public: + ClientTransport(std::unique_ptr control_endpoint, + std::unique_ptr data_endpoint, + std::shared_ptr + event_engine); + ~ClientTransport() { + if (writer_ != nullptr) { + writer_.reset(); + } + } + auto AddStream(CallArgs call_args) { + // At this point, the connection is set up. + // Start sending data frames. + uint64_t stream_id; + { + MutexLock lock(&mu_); + stream_id = next_stream_id_++; + } + return Seq( + // Continuously send data frame with client to server messages. + ForEach(std::move(*call_args.client_to_server_messages), + [stream_id, initial_frame = true, + client_initial_metadata = + std::move(call_args.client_initial_metadata), + outgoing_frames = outgoing_frames_.MakeSender()]( + MessageHandle result) mutable { + ClientFragmentFrame frame; + frame.stream_id = stream_id; + frame.message = std::move(result); + if (initial_frame) { + // Send initial frame with client intial metadata. + frame.headers = std::move(client_initial_metadata); + initial_frame = false; + } + return Seq( + outgoing_frames.Send(ClientFrame(std::move(frame))), + [](bool success) -> absl::Status { + if (!success) { + return absl::InternalError( + "Send frame to outgoing_frames failed."); + } + return absl::OkStatus(); + }); + })); + } + + private: + // Max buffer is set to 4, so that for stream writes each time it will queue + // at most 2 frames. + MpscReceiver outgoing_frames_; + Mutex mu_; + uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1; + ActivityPtr writer_; + ActivityPtr reader_; + std::unique_ptr control_endpoint_; + std::unique_ptr data_endpoint_; + SliceBuffer control_endpoint_write_buffer_; + SliceBuffer data_endpoint_write_buffer_; + std::unique_ptr hpack_compressor_; + // Use to synchronize writer_ and reader_ activity with outside activities; + std::shared_ptr event_engine_; +}; + +} // namespace chaotic_good +} // namespace grpc_core + +#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CLIENT_TRANSPORT_H \ No newline at end of file diff --git a/src/core/ext/transport/chaotic_good/frame.h b/src/core/ext/transport/chaotic_good/frame.h index 25df8d3442e..03b53d195f1 100644 --- a/src/core/ext/transport/chaotic_good/frame.h +++ b/src/core/ext/transport/chaotic_good/frame.h @@ -75,6 +75,7 @@ struct ClientFragmentFrame final : public FrameInterface { uint32_t stream_id; ClientMetadataHandle headers; + MessageHandle message; bool end_of_stream = false; bool operator==(const ClientFragmentFrame& other) const { diff --git a/src/core/lib/transport/promise_endpoint.h b/src/core/lib/transport/promise_endpoint.h index d80071703d6..50358fceb1b 100644 --- a/src/core/lib/transport/promise_endpoint.h +++ b/src/core/lib/transport/promise_endpoint.h @@ -51,6 +51,9 @@ class PromiseEndpoint { endpoint, SliceBuffer already_received); ~PromiseEndpoint(); + /// Prevent copying and moving of PromiseEndpoint. + PromiseEndpoint(const PromiseEndpoint&) = delete; + PromiseEndpoint(PromiseEndpoint&&) = delete; // Returns a promise that resolves to a `absl::Status` indicating the result // of the write operation. diff --git a/test/core/transport/chaotic_good/BUILD b/test/core/transport/chaotic_good/BUILD index 5323ad3c7e1..6d693256c2d 100644 --- a/test/core/transport/chaotic_good/BUILD +++ b/test/core/transport/chaotic_good/BUILD @@ -77,3 +77,34 @@ grpc_fuzzer( "//test/core/promise:test_context", ], ) + +grpc_cc_test( + name = "client_transport_test", + srcs = ["client_transport_test.cc"], + external_deps = [ + "absl/functional:any_invocable", + "absl/strings:str_format", + "gtest", + ], + language = "C++", + uses_event_engine = False, + uses_polling = False, + deps = [ + "//:grpc", + "//:iomgr_timer", + "//:ref_counted_ptr", + "//src/core:activity", + "//src/core:arena", + "//src/core:chaotic_good_client_transport", + "//src/core:join", + "//src/core:memory_quota", + "//src/core:pipe", + "//src/core:resource_quota", + "//src/core:seq", + "//src/core:slice", + "//src/core:slice_buffer", + "//test/core/event_engine/fuzzing_event_engine", + "//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto", + "//test/core/promise:test_wakeup_schedulers", + ], +) diff --git a/test/core/transport/chaotic_good/client_transport_test.cc b/test/core/transport/chaotic_good/client_transport_test.cc new file mode 100644 index 00000000000..b922eaa037a --- /dev/null +++ b/test/core/transport/chaotic_good/client_transport_test.cc @@ -0,0 +1,394 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/ext/transport/chaotic_good/client_transport.h" + +// IWYU pragma: no_include + +#include + +#include // IWYU pragma: keep +#include +#include +#include // IWYU pragma: keep + +#include "absl/functional/any_invocable.h" +#include "absl/strings/str_format.h" // IWYU pragma: keep +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include +#include +#include +#include + +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/timer_manager.h" +#include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/join.h" +#include "src/core/lib/promise/pipe.h" +#include "src/core/lib/promise/seq.h" +#include "src/core/lib/resource_quota/arena.h" +#include "src/core/lib/resource_quota/memory_quota.h" +#include "src/core/lib/resource_quota/resource_quota.h" +#include "src/core/lib/slice/slice.h" +#include "src/core/lib/slice/slice_buffer.h" +#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" +#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" +#include "test/core/promise/test_wakeup_schedulers.h" + +using testing::MockFunction; +using testing::Return; +using testing::StrictMock; +using testing::WithArgs; + +namespace grpc_core { +namespace chaotic_good { +namespace testing { + +class MockEndpoint + : public grpc_event_engine::experimental::EventEngine::Endpoint { + public: + MOCK_METHOD( + bool, Read, + (absl::AnyInvocable on_read, + grpc_event_engine::experimental::SliceBuffer* buffer, + const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs* + args), + (override)); + + MOCK_METHOD( + bool, Write, + (absl::AnyInvocable on_writable, + grpc_event_engine::experimental::SliceBuffer* data, + const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs* + args), + (override)); + + MOCK_METHOD( + const grpc_event_engine::experimental::EventEngine::ResolvedAddress&, + GetPeerAddress, (), (const, override)); + MOCK_METHOD( + const grpc_event_engine::experimental::EventEngine::ResolvedAddress&, + GetLocalAddress, (), (const, override)); +}; + +class ClientTransportTest : public ::testing::Test { + public: + ClientTransportTest() + : control_endpoint_ptr_(new StrictMock()), + data_endpoint_ptr_(new StrictMock()), + memory_allocator_( + ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( + "test")), + control_endpoint_(*control_endpoint_ptr_), + data_endpoint_(*data_endpoint_ptr_), + event_engine_(std::make_shared< + grpc_event_engine::experimental::FuzzingEventEngine>( + []() { + grpc_timer_manager_set_threading(false); + grpc_event_engine::experimental::FuzzingEventEngine::Options + options; + return options; + }(), + fuzzing_event_engine::Actions())), + client_transport_( + std::make_unique( + std::unique_ptr(control_endpoint_ptr_), + SliceBuffer()), + std::make_unique( + std::unique_ptr(data_endpoint_ptr_), + SliceBuffer()), + std::static_pointer_cast< + grpc_event_engine::experimental::EventEngine>(event_engine_)), + arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)), + pipe_client_to_server_messages_(arena_.get()), + pipe_client_to_server_messages_second_(arena_.get()) {} + + std::vector CreateMessages(int num_of_messages) { + std::vector messages; + for (int i = 0; i < num_of_messages; i++) { + SliceBuffer buffer; + buffer.Append( + Slice::FromCopiedString(absl::StrFormat("test message %d", i))); + auto message = arena_->MakePooled(std::move(buffer), 0); + messages.push_back(std::move(message)); + } + return messages; + } + + private: + MockEndpoint* control_endpoint_ptr_; + MockEndpoint* data_endpoint_ptr_; + size_t initial_arena_size = 1024; + MemoryAllocator memory_allocator_; + + protected: + MockEndpoint& control_endpoint_; + MockEndpoint& data_endpoint_; + std::shared_ptr + event_engine_; + ClientTransport client_transport_; + ScopedArenaPtr arena_; + Pipe pipe_client_to_server_messages_; + // Added for mutliple streams tests. + Pipe pipe_client_to_server_messages_second_; + + const absl::Status kDummyErrorStatus = + absl::ErrnoToStatus(5566, "just an error"); + static constexpr size_t kDummyRequestSize = 5566u; +}; + +TEST_F(ClientTransportTest, AddOneStream) { + auto messages = CreateMessages(1); + ClientMetadataHandle md; + auto args = CallArgs{ + std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, + nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; + StrictMock> on_done; + EXPECT_CALL(on_done, Call(absl::OkStatus())); + EXPECT_CALL(control_endpoint_, Write).WillOnce(Return(true)); + EXPECT_CALL(data_endpoint_, Write).WillOnce(Return(true)); + auto activity = MakeActivity( + Seq( + // Concurrently: send message into the pipe, and receive from the + // pipe. + Join(Seq(pipe_client_to_server_messages_.sender.Push( + std::move(messages[0])), + [this] { + this->pipe_client_to_server_messages_.sender.Close(); + return absl::OkStatus(); + }), + client_transport_.AddStream(std::move(args))), + // Once complete, verify successful sending and the received value. + [](const std::tuple& ret) { + EXPECT_TRUE(std::get<0>(ret).ok()); + EXPECT_TRUE(std::get<1>(ret).ok()); + return absl::OkStatus(); + }), + InlineWakeupScheduler(), + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); + // Wait until ClientTransport's internal activities to finish. + event_engine_->TickUntilIdle(); + event_engine_->UnsetGlobalHooks(); +} + +TEST_F(ClientTransportTest, AddOneStreamWithEEFailed) { + auto messages = CreateMessages(1); + ClientMetadataHandle md; + auto args = CallArgs{ + std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, + nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; + StrictMock> on_done; + EXPECT_CALL(on_done, Call(absl::OkStatus())); + EXPECT_CALL(control_endpoint_, Write) + .WillOnce( + WithArgs<0>([this](absl::AnyInvocable on_write) { + on_write(this->kDummyErrorStatus); + return false; + })); + EXPECT_CALL(data_endpoint_, Write) + .WillOnce( + WithArgs<0>([this](absl::AnyInvocable on_write) { + on_write(this->kDummyErrorStatus); + return false; + })); + auto activity = MakeActivity( + Seq( + // Concurrently: send message into the pipe, and receive from the + // pipe. + Join(Seq(pipe_client_to_server_messages_.sender.Push( + std::move(messages[0])), + [this] { + this->pipe_client_to_server_messages_.sender.Close(); + return absl::OkStatus(); + }), + client_transport_.AddStream(std::move(args))), + // Once complete, verify successful sending and the received value. + [](const std::tuple& ret) { + // TODO(ladynana): change these expectations to errors after the + // writer activity closes transport for EE failures. + EXPECT_TRUE(std::get<0>(ret).ok()); + EXPECT_TRUE(std::get<1>(ret).ok()); + return absl::OkStatus(); + }), + InlineWakeupScheduler(), + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); + // Wait until ClientTransport's internal activities to finish. + event_engine_->TickUntilIdle(); + event_engine_->UnsetGlobalHooks(); +} + +TEST_F(ClientTransportTest, AddOneStreamMultipleMessages) { + auto messages = CreateMessages(3); + ClientMetadataHandle md; + auto args = CallArgs{ + std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, + nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; + StrictMock> on_done; + EXPECT_CALL(on_done, Call(absl::OkStatus())); + EXPECT_CALL(control_endpoint_, Write).Times(3).WillRepeatedly(Return(true)); + EXPECT_CALL(data_endpoint_, Write).Times(3).WillRepeatedly(Return(true)); + auto activity = MakeActivity( + Seq( + // Concurrently: send messages into the pipe, and receive from the + // pipe. + Join(Seq(pipe_client_to_server_messages_.sender.Push( + std::move(messages[0])), + pipe_client_to_server_messages_.sender.Push( + std::move(messages[1])), + pipe_client_to_server_messages_.sender.Push( + std::move(messages[2])), + [this] { + this->pipe_client_to_server_messages_.sender.Close(); + return absl::OkStatus(); + }), + client_transport_.AddStream(std::move(args))), + // Once complete, verify successful sending and the received value. + [](const std::tuple& ret) { + EXPECT_TRUE(std::get<0>(ret).ok()); + EXPECT_TRUE(std::get<1>(ret).ok()); + return absl::OkStatus(); + }), + InlineWakeupScheduler(), + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); + // Wait until ClientTransport's internal activities to finish. + event_engine_->TickUntilIdle(); + event_engine_->UnsetGlobalHooks(); +} + +TEST_F(ClientTransportTest, AddMultipleStreams) { + auto messages = CreateMessages(2); + ClientMetadataHandle md; + auto first_stream_args = CallArgs{ + std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, + nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; + auto second_stream_args = CallArgs{ + std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, + nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr}; + StrictMock> on_done; + EXPECT_CALL(on_done, Call(absl::OkStatus())); + EXPECT_CALL(control_endpoint_, Write).Times(2).WillRepeatedly(Return(true)); + EXPECT_CALL(data_endpoint_, Write).Times(2).WillRepeatedly(Return(true)); + auto activity = MakeActivity( + Seq( + // Concurrently: send messages into the pipe, and receive from the + // pipe. + Join( + // Send message to first stream pipe. + Seq(pipe_client_to_server_messages_.sender.Push( + std::move(messages[0])), + [this] { + pipe_client_to_server_messages_.sender.Close(); + return absl::OkStatus(); + }), + // Send message to second stream pipe. + Seq(pipe_client_to_server_messages_second_.sender.Push( + std::move(messages[1])), + [this] { + pipe_client_to_server_messages_second_.sender.Close(); + return absl::OkStatus(); + }), + // Receive message from first stream pipe. + client_transport_.AddStream(std::move(first_stream_args)), + // Receive message from second stream pipe. + client_transport_.AddStream(std::move(second_stream_args))), + // Once complete, verify successful sending and the received value. + [](const std::tuple& ret) { + EXPECT_TRUE(std::get<0>(ret).ok()); + EXPECT_TRUE(std::get<1>(ret).ok()); + EXPECT_TRUE(std::get<2>(ret).ok()); + EXPECT_TRUE(std::get<3>(ret).ok()); + return absl::OkStatus(); + }), + InlineWakeupScheduler(), + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); + // Wait until ClientTransport's internal activities to finish. + event_engine_->TickUntilIdle(); + event_engine_->UnsetGlobalHooks(); +} + +TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) { + auto messages = CreateMessages(6); + ClientMetadataHandle md; + auto first_stream_args = CallArgs{ + std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, + nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; + auto second_stream_args = CallArgs{ + std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, + nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr}; + StrictMock> on_done; + EXPECT_CALL(on_done, Call(absl::OkStatus())); + EXPECT_CALL(control_endpoint_, Write).Times(6).WillRepeatedly(Return(true)); + EXPECT_CALL(data_endpoint_, Write).Times(6).WillRepeatedly(Return(true)); + auto activity = MakeActivity( + Seq( + // Concurrently: send messages into the pipe, and receive from the + // pipe. + Join( + // Send messages to first stream pipe. + Seq(pipe_client_to_server_messages_.sender.Push( + std::move(messages[0])), + pipe_client_to_server_messages_.sender.Push( + std::move(messages[1])), + pipe_client_to_server_messages_.sender.Push( + std::move(messages[2])), + [this] { + pipe_client_to_server_messages_.sender.Close(); + return absl::OkStatus(); + }), + // Send messages to second stream pipe. + Seq(pipe_client_to_server_messages_second_.sender.Push( + std::move(messages[3])), + pipe_client_to_server_messages_second_.sender.Push( + std::move(messages[4])), + pipe_client_to_server_messages_second_.sender.Push( + std::move(messages[5])), + [this] { + pipe_client_to_server_messages_second_.sender.Close(); + return absl::OkStatus(); + }), + // Receive messages from first stream pipe. + client_transport_.AddStream(std::move(first_stream_args)), + // Receive messages from second stream pipe. + client_transport_.AddStream(std::move(second_stream_args))), + // Once complete, verify successful sending and the received value. + [](const std::tuple& ret) { + EXPECT_TRUE(std::get<0>(ret).ok()); + EXPECT_TRUE(std::get<1>(ret).ok()); + EXPECT_TRUE(std::get<2>(ret).ok()); + EXPECT_TRUE(std::get<3>(ret).ok()); + return absl::OkStatus(); + }), + InlineWakeupScheduler(), + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); + // Wait until ClientTransport's internal activities to finish. + event_engine_->TickUntilIdle(); + event_engine_->UnsetGlobalHooks(); +} + +} // namespace testing +} // namespace chaotic_good +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + // Must call to create default EventEngine. + grpc_init(); + int ret = RUN_ALL_TESTS(); + grpc_shutdown(); + return ret; +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 05be005caf8..7036e823bff 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -2183,6 +2183,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "client_transport_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,