From 8f14e37b1195d21188be1cb41f90504efe2b77ee Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 20 Oct 2023 08:19:56 -0700 Subject: [PATCH] Revert "[chaotic-good] Add chaotic good client transport read (roll-forward)" (#34761) Reverts grpc/grpc#34657 --- CMakeLists.txt | 80 +- build_autogenerated.yaml | 8 +- src/core/BUILD | 18 +- .../chaotic_good/client_transport.cc | 114 +-- .../transport/chaotic_good/client_transport.h | 98 +-- src/core/ext/transport/chaotic_good/frame.cc | 6 +- src/core/ext/transport/chaotic_good/frame.h | 1 - src/core/lib/transport/promise_endpoint.cc | 8 +- test/core/transport/chaotic_good/BUILD | 10 +- .../chaotic_good/client_transport_test.cc | 719 +++--------------- tools/run_tests/generated/tests.json | 8 +- 11 files changed, 182 insertions(+), 888 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 3490da78430..dc45fc53edb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -948,9 +948,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx client_ssl_test) endif() add_dependencies(buildtests_cxx client_streaming_test) - if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) - add_dependencies(buildtests_cxx client_transport_test) - endif() + add_dependencies(buildtests_cxx client_transport_test) add_dependencies(buildtests_cxx cmdline_test) add_dependencies(buildtests_cxx codegen_test_full) add_dependencies(buildtests_cxx codegen_test_minimal) @@ -9339,49 +9337,47 @@ target_link_libraries(client_streaming_test endif() if(gRPC_BUILD_TESTS) -if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) - add_executable(client_transport_test - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h - src/core/ext/transport/chaotic_good/client_transport.cc - src/core/ext/transport/chaotic_good/frame.cc - src/core/ext/transport/chaotic_good/frame_header.cc - src/core/lib/transport/promise_endpoint.cc - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc - test/core/transport/chaotic_good/client_transport_test.cc - ) - target_compile_features(client_transport_test PUBLIC cxx_std_14) - target_include_directories(client_transport_test - PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - ${_gRPC_RE2_INCLUDE_DIR} - ${_gRPC_SSL_INCLUDE_DIR} - ${_gRPC_UPB_GENERATED_DIR} - ${_gRPC_UPB_GRPC_GENERATED_DIR} - ${_gRPC_UPB_INCLUDE_DIR} - ${_gRPC_XXHASH_INCLUDE_DIR} - ${_gRPC_ZLIB_INCLUDE_DIR} - third_party/googletest/googletest/include - third_party/googletest/googletest - third_party/googletest/googlemock/include - third_party/googletest/googlemock - ${_gRPC_PROTO_GENS_DIR} - ) +add_executable(client_transport_test + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h + src/core/ext/transport/chaotic_good/client_transport.cc + src/core/ext/transport/chaotic_good/frame.cc + src/core/ext/transport/chaotic_good/frame_header.cc + src/core/lib/transport/promise_endpoint.cc + test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc + test/core/transport/chaotic_good/client_transport_test.cc +) +target_compile_features(client_transport_test PUBLIC cxx_std_14) +target_include_directories(client_transport_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) - target_link_libraries(client_transport_test - ${_gRPC_ALLTARGETS_LIBRARIES} - gtest - ${_gRPC_PROTOBUF_LIBRARIES} - grpc_test_util - ) +target_link_libraries(client_transport_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gtest + ${_gRPC_PROTOBUF_LIBRARIES} + grpc_test_util +) -endif() endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index fc8f23d03f3..1a45cece5e3 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -7069,13 +7069,12 @@ targets: - src/core/ext/transport/chaotic_good/frame_header.h - src/core/lib/promise/detail/join_state.h - src/core/lib/promise/event_engine_wakeup_scheduler.h - - src/core/lib/promise/inter_activity_pipe.h - src/core/lib/promise/join.h - src/core/lib/promise/mpsc.h - - src/core/lib/promise/try_join.h - src/core/lib/promise/wait_set.h - src/core/lib/transport/promise_endpoint.h - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h + - test/core/promise/test_wakeup_schedulers.h src: - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto - src/core/ext/transport/chaotic_good/client_transport.cc @@ -7088,10 +7087,7 @@ targets: - gtest - protobuf - grpc_test_util - platforms: - - linux - - posix - - mac + uses_polling: false - name: cmdline_test gtest: true build: test diff --git a/src/core/BUILD b/src/core/BUILD index 73e9890d8b8..cd0a5ee47bb 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -6020,7 +6020,6 @@ grpc_cc_library( "arena", "bitset", "chaotic_good_frame_header", - "context", "no_destruct", "slice", "slice_buffer", @@ -6183,42 +6182,29 @@ grpc_cc_library( ], external_deps = [ "absl/base:core_headers", - "absl/random", - "absl/random:bit_gen_ref", "absl/status", - "absl/status:statusor", - "absl/types:optional", "absl/types:variant", ], language = "c++", deps = [ "activity", - "arena", "chaotic_good_frame", "chaotic_good_frame_header", "event_engine_wakeup_scheduler", "for_each", "grpc_promise_endpoint", - "if", - "inter_activity_pipe", + "join", "loop", "match", - "memory_quota", "mpsc", "pipe", - "poll", - "resource_quota", + "seq", "slice", "slice_buffer", - "try_join", - "try_seq", - "//:exec_ctx", "//:gpr", "//:gpr_platform", "//:grpc_base", "//:hpack_encoder", - "//:hpack_parser", - "//:ref_counted_ptr", ], ) diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc index 7b12dea8560..6eb6e4b86ba 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.cc +++ b/src/core/ext/transport/chaotic_good/client_transport.cc @@ -20,26 +20,17 @@ #include #include -#include "absl/random/bit_gen_ref.h" -#include "absl/random/random.h" -#include "absl/status/statusor.h" - #include -#include #include #include "src/core/ext/transport/chaotic_good/frame.h" #include "src/core/ext/transport/chaotic_good/frame_header.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" #include "src/core/lib/gprpp/match.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/event_engine_wakeup_scheduler.h" +#include "src/core/lib/promise/join.h" #include "src/core/lib/promise/loop.h" -#include "src/core/lib/promise/try_join.h" -#include "src/core/lib/resource_quota/arena.h" -#include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_internal.h" @@ -58,14 +49,9 @@ ClientTransport::ClientTransport( control_endpoint_write_buffer_(SliceBuffer()), data_endpoint_write_buffer_(SliceBuffer()), hpack_compressor_(std::make_unique()), - hpack_parser_(std::make_unique()), - memory_allocator_( - ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( - "client_transport")), - arena_(MakeScopedArena(1024, &memory_allocator_)), event_engine_(event_engine) { auto write_loop = Loop([this] { - return TrySeq( + return Seq( // Get next outgoing frame. this->outgoing_frames_.Next(), // Construct data buffers that will be sent to the endpoints. @@ -95,16 +81,20 @@ ClientTransport::ClientTransport( }, // Write buffers to corresponding endpoints concurrently. [this]() { - return TryJoin( - control_endpoint_->Write( - std::move(control_endpoint_write_buffer_)), - data_endpoint_->Write(std::move(data_endpoint_write_buffer_))); + return Join(this->control_endpoint_->Write( + std::move(control_endpoint_write_buffer_)), + this->data_endpoint_->Write( + std::move(data_endpoint_write_buffer_))); }, - // Finish writes to difference endpoints and continue the loop. - []() -> LoopCtl { - // The write failures will be caught in TrySeq and exit loop. - // Therefore, only need to return Continue() in the last lambda - // function. + // Finish writes and return status. + [](std::tuple ret) + -> LoopCtl { + // If writes failed, return failure status. + if (!(std::get<0>(ret).ok() || std::get<1>(ret).ok())) { + // TODO(ladynana): handle the promise endpoint write failures with + // closing the transport. + return absl::InternalError("Promise endpoint writes failed."); + } return Continue(); }); }); @@ -114,79 +104,7 @@ ClientTransport::ClientTransport( [](absl::Status status) { GPR_ASSERT(status.code() == absl::StatusCode::kCancelled || status.code() == absl::StatusCode::kInternal); - // TODO(ladynana): handle the promise endpoint write failures with - // outgoing_frames.close() once available. - }, - // Hold Arena in activity for GetContext usage. - arena_.get()); - auto read_loop = Loop([this] { - return TrySeq( - // Read frame header from control endpoint. - // TODO(ladynana): remove memcpy in ReadSlice. - this->control_endpoint_->ReadSlice(FrameHeader::frame_header_size_), - // Read different parts of the server frame from control/data endpoints - // based on frame header. - [this](Slice read_buffer) mutable { - frame_header_ = std::make_shared( - FrameHeader::Parse( - reinterpret_cast( - GRPC_SLICE_START_PTR(read_buffer.c_slice()))) - .value()); - // Read header and trailers from control endpoint. - // Read message padding and message from data endpoint. - return TryJoin( - control_endpoint_->Read(frame_header_->GetFrameLength()), - data_endpoint_->Read(frame_header_->message_padding + - frame_header_->message_length)); - }, - // Construct and send the server frame to corresponding stream. - [this](std::tuple ret) mutable { - control_endpoint_read_buffer_ = std::move(std::get<0>(ret)); - // Discard message padding and only keep message in data read buffer. - std::get<1>(ret).MoveLastNBytesIntoSliceBuffer( - frame_header_->message_length, data_endpoint_read_buffer_); - ServerFragmentFrame frame; - // Initialized to get this_cpu() info in global_stat(). - ExecCtx exec_ctx; - // Deserialize frame from read buffer. - absl::BitGen bitgen; - auto status = frame.Deserialize(hpack_parser_.get(), *frame_header_, - absl::BitGenRef(bitgen), - control_endpoint_read_buffer_); - GPR_ASSERT(status.ok()); - // Move message into frame. - frame.message = arena_->MakePooled( - std::move(data_endpoint_read_buffer_), 0); - std::shared_ptr< - InterActivityPipe::Sender> - sender; - { - MutexLock lock(&mu_); - sender = stream_map_[frame.frame_header.stream_id]; - } - return sender->Push(ServerFrame(std::move(frame))); - }, - // Check if send frame to corresponding stream successfully. - [](bool ret) -> LoopCtl { - if (ret) { - // Send incoming frames successfully. - return Continue(); - } else { - return absl::InternalError("Send incoming frames failed."); - } - }); - }); - reader_ = MakeActivity( - // Continuously read next incoming frames from promise endpoints. - std::move(read_loop), EventEngineWakeupScheduler(event_engine_), - [](absl::Status status) { - GPR_ASSERT(status.code() == absl::StatusCode::kCancelled || - status.code() == absl::StatusCode::kInternal); - // TODO(ladynana): handle the promise endpoint read failures with - // iterating stream_map_ and close all the pipes once available. - }, - // Hold Arena in activity for GetContext usage. - arena_.get()); + }); } } // namespace chaotic_good diff --git a/src/core/ext/transport/chaotic_good/client_transport.h b/src/core/ext/transport/chaotic_good/client_transport.h index 92eb591de5b..9b2129c083f 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.h +++ b/src/core/ext/transport/chaotic_good/client_transport.h @@ -21,38 +21,25 @@ #include #include // IWYU pragma: keep -#include #include -#include #include #include #include "absl/base/thread_annotations.h" #include "absl/status/status.h" -#include "absl/types/optional.h" #include "absl/types/variant.h" #include -#include -#include #include "src/core/ext/transport/chaotic_good/frame.h" #include "src/core/ext/transport/chaotic_good/frame_header.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" -#include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/for_each.h" -#include "src/core/lib/promise/if.h" -#include "src/core/lib/promise/inter_activity_pipe.h" -#include "src/core/lib/promise/loop.h" #include "src/core/lib/promise/mpsc.h" #include "src/core/lib/promise/pipe.h" -#include "src/core/lib/promise/poll.h" -#include "src/core/lib/promise/try_join.h" -#include "src/core/lib/promise/try_seq.h" -#include "src/core/lib/resource_quota/arena.h" -#include "src/core/lib/resource_quota/memory_quota.h" +#include "src/core/lib/promise/seq.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep #include "src/core/lib/transport/promise_endpoint.h" @@ -71,31 +58,18 @@ class ClientTransport { if (writer_ != nullptr) { writer_.reset(); } - if (reader_ != nullptr) { - reader_.reset(); - } } auto AddStream(CallArgs call_args) { // At this point, the connection is set up. // Start sending data frames. uint32_t stream_id; - InterActivityPipe server_frames; { MutexLock lock(&mu_); stream_id = next_stream_id_++; - stream_map_.insert( - std::pair::Sender>>( - stream_id, std::make_shared::Sender>( - std::move(server_frames.sender)))); } - return TrySeq( - TryJoin( - // Continuously send client frame with client to server messages. - ForEach( - std::move(*call_args.client_to_server_messages), + return Seq( + // Continuously send data frame with client to server messages. + ForEach(std::move(*call_args.client_to_server_messages), [stream_id, initial_frame = true, client_initial_metadata = std::move(call_args.client_initial_metadata), @@ -115,7 +89,7 @@ class ClientTransport { frame.headers = std::move(client_initial_metadata); initial_frame = false; } - return TrySeq( + return Seq( outgoing_frames.Send(ClientFrame(std::move(frame))), [](bool success) -> absl::Status { if (!success) { @@ -124,80 +98,24 @@ class ClientTransport { } return absl::OkStatus(); }); - }), - // Continuously receive server frames from endpoints and save - // results to call_args. - Loop([server_initial_metadata = call_args.server_initial_metadata, - server_to_client_messages = - call_args.server_to_client_messages, - receiver = std::move(server_frames.receiver)]() mutable { - return TrySeq( - // Receive incoming server frame. - receiver.Next(), - // Save incomming frame results to call_args. - [server_initial_metadata, server_to_client_messages]( - absl::optional server_frame) mutable { - GPR_ASSERT(server_frame.has_value()); - auto frame = std::move( - absl::get(*server_frame)); - return TrySeq( - If((frame.headers != nullptr), - [server_initial_metadata, - headers = std::move(frame.headers)]() mutable { - return server_initial_metadata->Push( - std::move(headers)); - }, - [] { return false; }), - If((frame.message != nullptr), - [server_to_client_messages, - message = std::move(frame.message)]() mutable { - return server_to_client_messages->Push( - std::move(message)); - }, - [] { return false; }), - If((frame.trailers != nullptr), - [trailers = std::move(frame.trailers)]() mutable - -> LoopCtl { - return std::move(trailers); - }, - []() -> LoopCtl { - return Continue(); - })); - }); - })), - [](std::tuple ret) { - return std::move(std::get<1>(ret)); - }); + })); } private: // Max buffer is set to 4, so that for stream writes each time it will queue // at most 2 frames. MpscReceiver outgoing_frames_; - // Queue size of each stream pipe is set to 2, so that for each stream read it - // will queue at most 2 frames. - static const size_t server_frame_queue_size_ = 2; - // Assigned aligned bytes from setting frame. - size_t aligned_bytes = 64; Mutex mu_; uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1; - // Map of stream incoming server frames, key is stream_id. - std::map::Sender>> - stream_map_ ABSL_GUARDED_BY(mu_); + // Assigned aligned bytes from setting frame. + size_t aligned_bytes = 64; ActivityPtr writer_; ActivityPtr reader_; std::unique_ptr control_endpoint_; std::unique_ptr data_endpoint_; SliceBuffer control_endpoint_write_buffer_; SliceBuffer data_endpoint_write_buffer_; - SliceBuffer control_endpoint_read_buffer_; - SliceBuffer data_endpoint_read_buffer_; std::unique_ptr hpack_compressor_; - std::unique_ptr hpack_parser_; - std::shared_ptr frame_header_; - MemoryAllocator memory_allocator_; - ScopedArenaPtr arena_; // Use to synchronize writer_ and reader_ activity with outside activities; std::shared_ptr event_engine_; }; diff --git a/src/core/ext/transport/chaotic_good/frame.cc b/src/core/ext/transport/chaotic_good/frame.cc index 62c7dae372e..0f8302c54bf 100644 --- a/src/core/ext/transport/chaotic_good/frame.cc +++ b/src/core/ext/transport/chaotic_good/frame.cc @@ -32,8 +32,6 @@ #include "src/core/lib/gprpp/bitset.h" #include "src/core/lib/gprpp/no_destruct.h" #include "src/core/lib/gprpp/status_helper.h" -#include "src/core/lib/promise/context.h" -#include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" @@ -132,9 +130,7 @@ absl::StatusOr> ReadMetadata( absl::BitGenRef bitsrc) { if (!maybe_slices.ok()) return maybe_slices.status(); auto& slices = *maybe_slices; - auto arena = GetContext(); - GPR_ASSERT(arena != nullptr); - Arena::PoolPtr metadata = arena->MakePooled(arena); + Arena::PoolPtr metadata; parser->BeginFrame( metadata.get(), std::numeric_limits::max(), std::numeric_limits::max(), diff --git a/src/core/ext/transport/chaotic_good/frame.h b/src/core/ext/transport/chaotic_good/frame.h index 8e5031802e5..eca8200a1a8 100644 --- a/src/core/ext/transport/chaotic_good/frame.h +++ b/src/core/ext/transport/chaotic_good/frame.h @@ -97,7 +97,6 @@ struct ServerFragmentFrame final : public FrameInterface { FrameHeader frame_header; ServerMetadataHandle headers; - MessageHandle message; ServerMetadataHandle trailers; bool operator==(const ServerFragmentFrame& other) const { diff --git a/src/core/lib/transport/promise_endpoint.cc b/src/core/lib/transport/promise_endpoint.cc index e9bc70a2e16..660d183a647 100644 --- a/src/core/lib/transport/promise_endpoint.cc +++ b/src/core/lib/transport/promise_endpoint.cc @@ -46,10 +46,10 @@ PromiseEndpoint::PromiseEndpoint( } PromiseEndpoint::~PromiseEndpoint() { - // Promise endpoint close when last write result has not been polled. - write_result_.reset(); - // Promise endpoint close when last read result has not been polled. - read_result_.reset(); + // Last write result has not been polled. + GPR_ASSERT(!write_result_.has_value()); + // Last read result has not been polled. + GPR_ASSERT(!read_result_.has_value()); } const grpc_event_engine::experimental::EventEngine::ResolvedAddress& diff --git a/test/core/transport/chaotic_good/BUILD b/test/core/transport/chaotic_good/BUILD index e85aea9a45e..9c7cd464cdb 100644 --- a/test/core/transport/chaotic_good/BUILD +++ b/test/core/transport/chaotic_good/BUILD @@ -87,25 +87,20 @@ grpc_cc_test( srcs = ["client_transport_test.cc"], external_deps = [ "absl/functional:any_invocable", - "absl/status:statusor", "absl/strings:str_format", - "absl/types:optional", "gtest", ], language = "C++", - # TODO(ladynana): remove the no_windows tag. - tags = ["no_windows"], + uses_event_engine = False, + uses_polling = False, deps = [ "//:grpc", - "//:grpc_public_hdrs", "//:iomgr_timer", "//:ref_counted_ptr", "//src/core:activity", "//src/core:arena", "//src/core:chaotic_good_client_transport", - "//src/core:event_engine_wakeup_scheduler", "//src/core:join", - "//src/core:map", "//src/core:memory_quota", "//src/core:pipe", "//src/core:resource_quota", @@ -114,5 +109,6 @@ grpc_cc_test( "//src/core:slice_buffer", "//test/core/event_engine/fuzzing_event_engine", "//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto", + "//test/core/promise:test_wakeup_schedulers", ], ) diff --git a/test/core/transport/chaotic_good/client_transport_test.cc b/test/core/transport/chaotic_good/client_transport_test.cc index f2376d125e2..96889082a2c 100644 --- a/test/core/transport/chaotic_good/client_transport_test.cc +++ b/test/core/transport/chaotic_good/client_transport_test.cc @@ -18,30 +18,23 @@ #include // IWYU pragma: keep #include -#include // IWYU pragma: keep #include #include // IWYU pragma: keep #include "absl/functional/any_invocable.h" -#include "absl/status/statusor.h" // IWYU pragma: keep #include "absl/strings/str_format.h" // IWYU pragma: keep -#include "absl/types/optional.h" // IWYU pragma: keep #include "gmock/gmock.h" #include "gtest/gtest.h" #include #include -#include // IWYU pragma: keep #include #include -#include // IWYU pragma: keep #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/timer_manager.h" #include "src/core/lib/promise/activity.h" -#include "src/core/lib/promise/event_engine_wakeup_scheduler.h" #include "src/core/lib/promise/join.h" -#include "src/core/lib/promise/map.h" #include "src/core/lib/promise/pipe.h" #include "src/core/lib/promise/seq.h" #include "src/core/lib/resource_quota/arena.h" @@ -49,14 +42,12 @@ #include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" -#include "src/core/lib/slice/slice_internal.h" // IWYU pragma: keep -#include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" +#include "test/core/promise/test_wakeup_schedulers.h" using testing::MockFunction; using testing::Return; -using testing::Sequence; using testing::StrictMock; using testing::WithArgs; @@ -110,117 +101,19 @@ class ClientTransportTest : public ::testing::Test { return options; }(), fuzzing_event_engine::Actions())), + client_transport_( + std::make_unique( + std::unique_ptr(control_endpoint_ptr_), + SliceBuffer()), + std::make_unique( + std::unique_ptr(data_endpoint_ptr_), + SliceBuffer()), + std::static_pointer_cast< + grpc_event_engine::experimental::EventEngine>(event_engine_)), arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)), pipe_client_to_server_messages_(arena_.get()), - pipe_server_to_client_messages_(arena_.get()), - pipe_server_intial_metadata_(arena_.get()), - pipe_client_to_server_messages_second_(arena_.get()), - pipe_server_to_client_messages_second_(arena_.get()), - pipe_server_intial_metadata_second_(arena_.get()) {} - // Expect how client transport will read from control/data endpoints with a - // test frame. - void AddReadExpectations(int num_of_streams) { - for (int i = 0; i < num_of_streams; i++) { - EXPECT_CALL(control_endpoint_, Read) - .InSequence(control_endpoint_sequence) - .WillOnce(WithArgs<0, 1>( - [this, i](absl::AnyInvocable on_read, - grpc_event_engine::experimental::SliceBuffer* - buffer) mutable { - // Construct test frame for EventEngine read: headers (15 - // bytes), message(16 bytes), message padding (48 byte), - // trailers (15 bytes). - const std::string frame_header = { - static_cast(0x80), // frame type = fragment - 0x03, // flag = has header + has trailer - 0x00, - 0x00, - static_cast(i + 1), // stream id = 1 - 0x00, - 0x00, - 0x00, - 0x1a, // header length = 26 - 0x00, - 0x00, - 0x00, - 0x08, // message length = 8 - 0x00, - 0x00, - 0x00, - 0x38, // message padding =56 - 0x00, - 0x00, - 0x00, - 0x0f, // trailer length = 15 - 0x00, - 0x00, - 0x00}; - // Schedule mock_endpoint to read buffer. - grpc_event_engine::experimental::Slice slice( - grpc_slice_from_cpp_string(frame_header)); - buffer->Append(std::move(slice)); - // Execute read callback later to control when read starts. - read_callback.push_back(std::move(on_read)); - // Return false to mock EventEngine read not finish. - return false; - })); - EXPECT_CALL(control_endpoint_, Read) - .InSequence(control_endpoint_sequence) - .WillOnce(WithArgs<1>( - [](grpc_event_engine::experimental::SliceBuffer* buffer) { - // Encoded string of header ":path: /demo.Service/Step". - const std::string header = { - 0x10, 0x05, 0x3a, 0x70, 0x61, 0x74, 0x68, 0x12, 0x2f, - 0x64, 0x65, 0x6d, 0x6f, 0x2e, 0x53, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x2f, 0x53, 0x74, 0x65, 0x70}; - // Encoded string of trailer "grpc-status: 0". - const std::string trailers = {0x10, 0x0b, 0x67, 0x72, 0x70, - 0x63, 0x2d, 0x73, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x01, 0x30}; - // Schedule mock_endpoint to read buffer. - grpc_event_engine::experimental::Slice slice( - grpc_slice_from_cpp_string(header + trailers)); - buffer->Append(std::move(slice)); - return true; - })); - } - EXPECT_CALL(control_endpoint_, Read) - .InSequence(control_endpoint_sequence) - .WillOnce(Return(false)); - for (int i = 0; i < num_of_streams; i++) { - EXPECT_CALL(data_endpoint_, Read) - .InSequence(data_endpoint_sequence) - .WillOnce(WithArgs<1>( - [this](grpc_event_engine::experimental::SliceBuffer* buffer) { - const std::string message_padding = { - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; - grpc_event_engine::experimental::Slice slice( - grpc_slice_from_cpp_string(message_padding + message)); - buffer->Append(std::move(slice)); - return true; - })); - } - } - // Initial ClientTransport with read expecations - void InitialClientTransport(int num_of_streams) { - // Read expectaions need to be added before transport initialization since - // reader_ activity loop is started in ClientTransport initialization, - AddReadExpectations(num_of_streams); - client_transport_ = std::make_unique( - std::make_unique( - std::unique_ptr(control_endpoint_ptr_), - SliceBuffer()), - std::make_unique( - std::unique_ptr(data_endpoint_ptr_), SliceBuffer()), - std::static_pointer_cast( - event_engine_)); - } - // Create client to server test messages. + pipe_client_to_server_messages_second_(arena_.get()) {} + std::vector CreateMessages(int num_of_messages) { std::vector messages; for (int i = 0; i < num_of_messages; i++) { @@ -232,238 +125,103 @@ class ClientTransportTest : public ::testing::Test { } return messages; } - // Wait for last stream read to finish. - auto Wait() { - return [this]() mutable -> Poll { - MutexLock lock(&mu_); - if (last_stream_read_done_) { - return Result{}; - } else { - waker_ = Activity::current()->MakeNonOwningWaker(); - return Pending(); - } - }; - } - // Wake up the pending Wait() promise. - void Wakeup() { - MutexLock lock(&mu_); - last_stream_read_done_ = true; - waker_.Wakeup(); - } private: - struct Result {}; - Mutex mu_; - Waker waker_ ABSL_GUARDED_BY(mu_); - bool last_stream_read_done_ ABSL_GUARDED_BY(mu_) = false; MockEndpoint* control_endpoint_ptr_; MockEndpoint* data_endpoint_ptr_; size_t initial_arena_size = 1024; MemoryAllocator memory_allocator_; - Sequence control_endpoint_sequence; - Sequence data_endpoint_sequence; protected: MockEndpoint& control_endpoint_; MockEndpoint& data_endpoint_; std::shared_ptr event_engine_; - std::unique_ptr client_transport_; + ClientTransport client_transport_; ScopedArenaPtr arena_; Pipe pipe_client_to_server_messages_; - Pipe pipe_server_to_client_messages_; - Pipe pipe_server_intial_metadata_; // Added for mutliple streams tests. Pipe pipe_client_to_server_messages_second_; - Pipe pipe_server_to_client_messages_second_; - Pipe pipe_server_intial_metadata_second_; - std::vector> read_callback; - // Added to verify received message payload. - const std::string message = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}; + + const absl::Status kDummyErrorStatus = + absl::ErrnoToStatus(5566, "just an error"); + static constexpr size_t kDummyRequestSize = 5566u; }; TEST_F(ClientTransportTest, AddOneStream) { - InitialClientTransport(1); auto messages = CreateMessages(1); ClientMetadataHandle md; - auto args = CallArgs{std::move(md), - ClientInitialMetadataOutstandingToken::Empty(), - nullptr, - &pipe_server_intial_metadata_.sender, - &pipe_client_to_server_messages_.receiver, - &pipe_server_to_client_messages_.sender}; + auto args = CallArgs{ + std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, + nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; StrictMock> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(control_endpoint_, Write).WillOnce(Return(true)); EXPECT_CALL(data_endpoint_, Write).WillOnce(Return(true)); auto activity = MakeActivity( Seq( - // Concurrently: write and read messages in client transport. - Join( - // Send messages to call_args.client_to_server_messages pipe, - // which will be eventually sent to control/data endpoints. - Seq(pipe_client_to_server_messages_.sender.Push( - std::move(messages[0])), - [this] { - pipe_client_to_server_messages_.sender.Close(); - return absl::OkStatus(); - }), - // Add first stream with call_args into client transport. - Seq(Join(client_transport_->AddStream(std::move(args)), - [this]() { - // Concurrently: start read from control endpoints. - read_callback[0](absl::OkStatus()); - return absl::OkStatus(); - }), - [](std::tuple, - absl::Status> - ret) { - // AddStream will finish with server trailers: - // "grpc-status:0". - EXPECT_EQ(std::get<0>(ret) - .value() - ->get(GrpcStatusMetadata()) - .value(), - grpc_status_code::GRPC_STATUS_OK); - return absl::OkStatus(); - }), - // Receive messages from control/data endpoints. - Seq( - // Receive server initial metadata. - Map(pipe_server_intial_metadata_.receiver.Next(), - [](NextResult r) { - // Expect value: ":path: /demo.Service/Step" - EXPECT_TRUE(r.has_value()); - EXPECT_EQ(r.value() - ->get_pointer(HttpPathMetadata()) - ->as_string_view(), - "/demo.Service/Step"); - return absl::OkStatus(); - }), - // Receive server to client messages. - Map(pipe_server_to_client_messages_.receiver.Next(), - [this](NextResult r) { - EXPECT_TRUE(r.has_value()); - EXPECT_EQ(r.value()->payload()->JoinIntoString(), - message); - return absl::OkStatus(); - }), - [this]() { - // Close pipes after receive message. - pipe_server_to_client_messages_.sender.Close(); - pipe_server_intial_metadata_.sender.Close(); - return absl::OkStatus(); - })), + // Concurrently: send message into the pipe, and receive from the + // pipe. + Join(Seq(pipe_client_to_server_messages_.sender.Push( + std::move(messages[0])), + [this] { + this->pipe_client_to_server_messages_.sender.Close(); + return absl::OkStatus(); + }), + client_transport_.AddStream(std::move(args))), // Once complete, verify successful sending and the received value. - [](const std::tuple& ret) { + [](const std::tuple& ret) { EXPECT_TRUE(std::get<0>(ret).ok()); EXPECT_TRUE(std::get<1>(ret).ok()); - EXPECT_TRUE(std::get<2>(ret).ok()); return absl::OkStatus(); }), - EventEngineWakeupScheduler( - std::static_pointer_cast< - grpc_event_engine::experimental::EventEngine>(event_engine_)), + InlineWakeupScheduler(), [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); // Wait until ClientTransport's internal activities to finish. event_engine_->TickUntilIdle(); event_engine_->UnsetGlobalHooks(); } -TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) { - InitialClientTransport(1); +TEST_F(ClientTransportTest, AddOneStreamWithEEFailed) { auto messages = CreateMessages(1); ClientMetadataHandle md; - auto args = CallArgs{std::move(md), - ClientInitialMetadataOutstandingToken::Empty(), - nullptr, - &pipe_server_intial_metadata_.sender, - &pipe_client_to_server_messages_.receiver, - &pipe_server_to_client_messages_.sender}; + auto args = CallArgs{ + std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, + nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; StrictMock> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(control_endpoint_, Write) .WillOnce( - WithArgs<0>([](absl::AnyInvocable on_write) { - on_write(absl::InternalError("control endpoint write failed.")); + WithArgs<0>([this](absl::AnyInvocable on_write) { + on_write(this->kDummyErrorStatus); return false; })); EXPECT_CALL(data_endpoint_, Write) .WillOnce( - WithArgs<0>([](absl::AnyInvocable on_write) { - on_write(absl::InternalError("control endpoint write failed.")); + WithArgs<0>([this](absl::AnyInvocable on_write) { + on_write(this->kDummyErrorStatus); return false; })); auto activity = MakeActivity( Seq( - // Concurrently: write and read messages in client transport. - Join( - // Send messages to call_args.client_to_server_messages pipe, - // which will be eventually sent to control/data endpoints. - Seq(pipe_client_to_server_messages_.sender.Push( - std::move(messages[0])), - [this] { - pipe_client_to_server_messages_.sender.Close(); - return absl::OkStatus(); - }), - // Add first stream with call_args into client transport. - Seq(Join(client_transport_->AddStream(std::move(args)), - [this]() { - // Start read from endpoints. - read_callback[0](absl::OkStatus()); - return absl::OkStatus(); - }), - [](std::tuple, - absl::Status> - ret) { - // AddStream will finish with server trailers: - // "grpc-status:0". - EXPECT_EQ(std::get<0>(ret) - .value() - ->get(GrpcStatusMetadata()) - .value(), - grpc_status_code::GRPC_STATUS_OK); - return absl::OkStatus(); - }), - // Receive messages from control/data endpoints. - Seq( - // Receive server initial metadata. - Map(pipe_server_intial_metadata_.receiver.Next(), - [](NextResult r) { - // Expect value: ":path: /demo.Service/Step" - EXPECT_TRUE(r.has_value()); - EXPECT_EQ(r.value() - ->get_pointer(HttpPathMetadata()) - ->as_string_view(), - "/demo.Service/Step"); - return absl::OkStatus(); - }), - // Receive server to client messages. - Map(pipe_server_to_client_messages_.receiver.Next(), - [this](NextResult r) { - EXPECT_TRUE(r.has_value()); - EXPECT_EQ(r.value()->payload()->JoinIntoString(), - message); - return absl::OkStatus(); - }), - [this] { - // Close pipes after receive message. - pipe_server_to_client_messages_.sender.Close(); - pipe_server_intial_metadata_.sender.Close(); - return absl::OkStatus(); - })), + // Concurrently: send message into the pipe, and receive from the + // pipe. + Join(Seq(pipe_client_to_server_messages_.sender.Push( + std::move(messages[0])), + [this] { + this->pipe_client_to_server_messages_.sender.Close(); + return absl::OkStatus(); + }), + client_transport_.AddStream(std::move(args))), // Once complete, verify successful sending and the received value. - [](const std::tuple& ret) { + [](const std::tuple& ret) { // TODO(ladynana): change these expectations to errors after the - // writer activity closes transport for write failures. + // writer activity closes transport for EE failures. EXPECT_TRUE(std::get<0>(ret).ok()); EXPECT_TRUE(std::get<1>(ret).ok()); - EXPECT_TRUE(std::get<2>(ret).ok()); return absl::OkStatus(); }), - EventEngineWakeupScheduler( - std::static_pointer_cast< - grpc_event_engine::experimental::EventEngine>(event_engine_)), + InlineWakeupScheduler(), [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); // Wait until ClientTransport's internal activities to finish. event_engine_->TickUntilIdle(); @@ -471,90 +229,37 @@ TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) { } TEST_F(ClientTransportTest, AddOneStreamMultipleMessages) { - InitialClientTransport(1); auto messages = CreateMessages(3); ClientMetadataHandle md; - auto args = CallArgs{std::move(md), - ClientInitialMetadataOutstandingToken::Empty(), - nullptr, - &pipe_server_intial_metadata_.sender, - &pipe_client_to_server_messages_.receiver, - &pipe_server_to_client_messages_.sender}; + auto args = CallArgs{ + std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, + nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; StrictMock> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(control_endpoint_, Write).Times(3).WillRepeatedly(Return(true)); EXPECT_CALL(data_endpoint_, Write).Times(3).WillRepeatedly(Return(true)); auto activity = MakeActivity( Seq( - // Concurrently: write and read messages in client transport. - Join( - // Send messages to call_args.client_to_server_messages pipe, - // which will be eventually sent to control/data endpoints. - Seq(pipe_client_to_server_messages_.sender.Push( - std::move(messages[0])), - pipe_client_to_server_messages_.sender.Push( - std::move(messages[1])), - pipe_client_to_server_messages_.sender.Push( - std::move(messages[2])), - [this] { - pipe_client_to_server_messages_.sender.Close(); - return absl::OkStatus(); - }), - // Add first stream with call_args into client transport. - Seq(Join(client_transport_->AddStream(std::move(args)), - [this]() { - // Start read from endpoints. - read_callback[0](absl::OkStatus()); - return absl::OkStatus(); - }), - [](std::tuple, - absl::Status> - ret) { - // AddStream finish with trailers "grpc-status:0". - EXPECT_EQ(std::get<0>(ret) - .value() - ->get(GrpcStatusMetadata()) - .value(), - grpc_status_code::GRPC_STATUS_OK); - return absl::OkStatus(); - }), - // Receive messages from control/data endpoints. - Seq( - // Receive server initial metadata. - Map(pipe_server_intial_metadata_.receiver.Next(), - [](NextResult r) { - // Expect value: ":path: /demo.Service/Step" - EXPECT_TRUE(r.has_value()); - EXPECT_EQ(r.value() - ->get_pointer(HttpPathMetadata()) - ->as_string_view(), - "/demo.Service/Step"); - return absl::OkStatus(); - }), - // Receive server to client messages. - Map(pipe_server_to_client_messages_.receiver.Next(), - [this](NextResult r) { - EXPECT_TRUE(r.has_value()); - EXPECT_EQ(r.value()->payload()->JoinIntoString(), - message); - return absl::OkStatus(); - }), - [this] { - // Close pipes after receive message. - pipe_server_to_client_messages_.sender.Close(); - pipe_server_intial_metadata_.sender.Close(); - return absl::OkStatus(); - })), + // Concurrently: send messages into the pipe, and receive from the + // pipe. + Join(Seq(pipe_client_to_server_messages_.sender.Push( + std::move(messages[0])), + pipe_client_to_server_messages_.sender.Push( + std::move(messages[1])), + pipe_client_to_server_messages_.sender.Push( + std::move(messages[2])), + [this] { + this->pipe_client_to_server_messages_.sender.Close(); + return absl::OkStatus(); + }), + client_transport_.AddStream(std::move(args))), // Once complete, verify successful sending and the received value. - [](const std::tuple& ret) { + [](const std::tuple& ret) { EXPECT_TRUE(std::get<0>(ret).ok()); EXPECT_TRUE(std::get<1>(ret).ok()); - EXPECT_TRUE(std::get<2>(ret).ok()); return absl::OkStatus(); }), - EventEngineWakeupScheduler( - std::static_pointer_cast< - grpc_event_engine::experimental::EventEngine>(event_engine_)), + InlineWakeupScheduler(), [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); // Wait until ClientTransport's internal activities to finish. event_engine_->TickUntilIdle(); @@ -562,160 +267,51 @@ TEST_F(ClientTransportTest, AddOneStreamMultipleMessages) { } TEST_F(ClientTransportTest, AddMultipleStreams) { - InitialClientTransport(2); auto messages = CreateMessages(2); - ClientMetadataHandle first_stream_md; - ClientMetadataHandle second_stream_md; - auto first_stream_args = - CallArgs{std::move(first_stream_md), - ClientInitialMetadataOutstandingToken::Empty(), - nullptr, - &pipe_server_intial_metadata_.sender, - &pipe_client_to_server_messages_.receiver, - &pipe_server_to_client_messages_.sender}; - auto second_stream_args = - CallArgs{std::move(second_stream_md), - ClientInitialMetadataOutstandingToken::Empty(), - nullptr, - &pipe_server_intial_metadata_second_.sender, - &pipe_client_to_server_messages_second_.receiver, - &pipe_server_to_client_messages_second_.sender}; + ClientMetadataHandle md; + auto first_stream_args = CallArgs{ + std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, + nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; + auto second_stream_args = CallArgs{ + std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, + nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr}; StrictMock> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(control_endpoint_, Write).Times(2).WillRepeatedly(Return(true)); EXPECT_CALL(data_endpoint_, Write).Times(2).WillRepeatedly(Return(true)); auto activity = MakeActivity( Seq( - // Concurrently: write and read messages from client transport. + // Concurrently: send messages into the pipe, and receive from the + // pipe. Join( - // Send messages to first stream's - // call_args.client_to_server_messages pipe, which will be - // eventually sent to control/data endpoints. + // Send message to first stream pipe. Seq(pipe_client_to_server_messages_.sender.Push( std::move(messages[0])), [this] { pipe_client_to_server_messages_.sender.Close(); return absl::OkStatus(); }), - // Send messages to second stream's - // call_args.client_to_server_messages pipe, which will be - // eventually sent to control/data endpoints. + // Send message to second stream pipe. Seq(pipe_client_to_server_messages_second_.sender.Push( std::move(messages[1])), [this] { pipe_client_to_server_messages_second_.sender.Close(); return absl::OkStatus(); }), - // Add first stream with call_args into client transport. - Seq(Join(client_transport_->AddStream( - std::move(first_stream_args)), - [this] { - read_callback[0](absl::OkStatus()); - return absl::OkStatus(); - }), - [](std::tuple, - absl::Status> - ret) { - // AddStream finish with trailers "grpc-status:0". - EXPECT_EQ(std::get<0>(ret) - .value() - ->get(GrpcStatusMetadata()) - .value(), - grpc_status_code::GRPC_STATUS_OK); - return absl::OkStatus(); - }), - // Add second stream with call_args into client transport. - Seq(Join(client_transport_->AddStream( - std::move(second_stream_args)), - Seq(Wait(), - [this] { - // Wait until first stream read finished to start - // the second read. - read_callback[1](absl::OkStatus()); - return absl::OkStatus(); - })), - [](std::tuple, - absl::Status> - ret) { - // AddStream finish with trailers "grpc-status:0". - EXPECT_EQ(std::get<0>(ret) - .value() - ->get(GrpcStatusMetadata()) - .value(), - grpc_status_code::GRPC_STATUS_OK); - return absl::OkStatus(); - }), - // Receive first stream's messages from control/data endpoints. - Seq( - // Receive server initial metadata. - Map(pipe_server_intial_metadata_.receiver.Next(), - [](NextResult r) { - // Expect value: ":path: /demo.Service/Step" - EXPECT_TRUE(r.has_value()); - EXPECT_EQ(r.value() - ->get_pointer(HttpPathMetadata()) - ->as_string_view(), - "/demo.Service/Step"); - return absl::OkStatus(); - }), - // Receive server to client messages. - Map(pipe_server_to_client_messages_.receiver.Next(), - [this](NextResult r) { - EXPECT_TRUE(r.has_value()); - EXPECT_EQ(r.value()->payload()->JoinIntoString(), - message); - return absl::OkStatus(); - }), - [this] { - // Wake up the sencond stream read after first stream read - // finished. - Wakeup(); - // Close pipes after receive message. - pipe_server_to_client_messages_.sender.Close(); - pipe_server_intial_metadata_.sender.Close(); - return absl::OkStatus(); - }), - // Receive second stream's messages from control/data endpoints. - Seq( - // Receive server initial metadata. - Map(pipe_server_intial_metadata_second_.receiver.Next(), - [](NextResult r) { - // Expect value: ":path: /demo.Service/Step" - EXPECT_TRUE(r.has_value()); - EXPECT_EQ(r.value() - ->get_pointer(HttpPathMetadata()) - ->as_string_view(), - "/demo.Service/Step"); - return absl::OkStatus(); - }), - // Receive server to client messages. - Map(pipe_server_to_client_messages_second_.receiver.Next(), - [this](NextResult r) { - EXPECT_TRUE(r.has_value()); - EXPECT_EQ(r.value()->payload()->JoinIntoString(), - message); - return absl::OkStatus(); - }), - [this] { - // Close pipes after receive message. - pipe_server_to_client_messages_second_.sender.Close(); - pipe_server_intial_metadata_second_.sender.Close(); - return absl::OkStatus(); - })), + // Receive message from first stream pipe. + client_transport_.AddStream(std::move(first_stream_args)), + // Receive message from second stream pipe. + client_transport_.AddStream(std::move(second_stream_args))), // Once complete, verify successful sending and the received value. [](const std::tuple& ret) { + absl::Status>& ret) { EXPECT_TRUE(std::get<0>(ret).ok()); EXPECT_TRUE(std::get<1>(ret).ok()); EXPECT_TRUE(std::get<2>(ret).ok()); EXPECT_TRUE(std::get<3>(ret).ok()); - EXPECT_TRUE(std::get<4>(ret).ok()); - EXPECT_TRUE(std::get<5>(ret).ok()); return absl::OkStatus(); }), - EventEngineWakeupScheduler( - std::static_pointer_cast< - grpc_event_engine::experimental::EventEngine>(event_engine_)), + InlineWakeupScheduler(), [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); // Wait until ClientTransport's internal activities to finish. event_engine_->TickUntilIdle(); @@ -723,35 +319,24 @@ TEST_F(ClientTransportTest, AddMultipleStreams) { } TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) { - InitialClientTransport(2); auto messages = CreateMessages(6); - ClientMetadataHandle first_stream_md; - ClientMetadataHandle second_stream_md; - auto first_stream_args = - CallArgs{std::move(first_stream_md), - ClientInitialMetadataOutstandingToken::Empty(), - nullptr, - &pipe_server_intial_metadata_.sender, - &pipe_client_to_server_messages_.receiver, - &pipe_server_to_client_messages_.sender}; - auto second_stream_args = - CallArgs{std::move(second_stream_md), - ClientInitialMetadataOutstandingToken::Empty(), - nullptr, - &pipe_server_intial_metadata_second_.sender, - &pipe_client_to_server_messages_second_.receiver, - &pipe_server_to_client_messages_second_.sender}; + ClientMetadataHandle md; + auto first_stream_args = CallArgs{ + std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, + nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; + auto second_stream_args = CallArgs{ + std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, + nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr}; StrictMock> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(control_endpoint_, Write).Times(6).WillRepeatedly(Return(true)); EXPECT_CALL(data_endpoint_, Write).Times(6).WillRepeatedly(Return(true)); auto activity = MakeActivity( Seq( - // Concurrently: write and read messages in client transport. + // Concurrently: send messages into the pipe, and receive from the + // pipe. Join( - // Send messages to first stream's - // call_args.client_to_server_messages pipe, which will be - // eventually sent to control/data endpoints. + // Send messages to first stream pipe. Seq(pipe_client_to_server_messages_.sender.Push( std::move(messages[0])), pipe_client_to_server_messages_.sender.Push( @@ -762,9 +347,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) { pipe_client_to_server_messages_.sender.Close(); return absl::OkStatus(); }), - // Send messages to second stream's - // call_args.client_to_server_messages pipe, which will be - // eventually sent to control/data endpoints. + // Send messages to second stream pipe. Seq(pipe_client_to_server_messages_second_.sender.Push( std::move(messages[3])), pipe_client_to_server_messages_second_.sender.Push( @@ -775,116 +358,20 @@ TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) { pipe_client_to_server_messages_second_.sender.Close(); return absl::OkStatus(); }), - // Add first stream with call_args into client transport. - Seq(Join(client_transport_->AddStream( - std::move(first_stream_args)), - [this] { - read_callback[0](absl::OkStatus()); - return absl::OkStatus(); - }), - [](std::tuple, - absl::Status> - ret) { - // AddStream finish with trailers "grpc-status:0". - EXPECT_EQ(std::get<0>(ret) - .value() - ->get(GrpcStatusMetadata()) - .value(), - grpc_status_code::GRPC_STATUS_OK); - return absl::OkStatus(); - }), - // Add second stream with call_args into client transport. - Seq(Join(client_transport_->AddStream( - std::move(second_stream_args)), - Seq(Wait(), - [this] { - // Wait until first stream read finished to start - // the second read. - read_callback[1](absl::OkStatus()); - return absl::OkStatus(); - })), - [](std::tuple, - absl::Status> - ret) { - // AddStream finish with trailers "grpc-status:0". - EXPECT_EQ(std::get<0>(ret) - .value() - ->get(GrpcStatusMetadata()) - .value(), - grpc_status_code::GRPC_STATUS_OK); - return absl::OkStatus(); - }), - // Receive first stream's messages from control/data endpoints. - Seq( - // Receive server initial metadata. - Map(pipe_server_intial_metadata_.receiver.Next(), - [](NextResult r) { - // Expect value: ":path: /demo.Service/Step" - EXPECT_TRUE(r.has_value()); - EXPECT_EQ(r.value() - ->get_pointer(HttpPathMetadata()) - ->as_string_view(), - "/demo.Service/Step"); - return absl::OkStatus(); - }), - // Receive server to client messages. - Map(pipe_server_to_client_messages_.receiver.Next(), - [this](NextResult r) { - EXPECT_TRUE(r.has_value()); - EXPECT_EQ(r.value()->payload()->JoinIntoString(), - message); - return absl::OkStatus(); - }), - [this] { - // Wake up the sencond stream read after first stream read - // finished. - Wakeup(); - // Close pipes after receive message. - pipe_server_to_client_messages_.sender.Close(); - pipe_server_intial_metadata_.sender.Close(); - return absl::OkStatus(); - }), - // Receive second stream's messages from control/data endpoints. - Seq( - // Receive server initial metadata. - Map(pipe_server_intial_metadata_second_.receiver.Next(), - [](NextResult r) { - // Expect value: ":path: /demo.Service/Step" - EXPECT_TRUE(r.has_value()); - EXPECT_EQ(r.value() - ->get_pointer(HttpPathMetadata()) - ->as_string_view(), - "/demo.Service/Step"); - return absl::OkStatus(); - }), - // Receive server to client messages. - Map(pipe_server_to_client_messages_second_.receiver.Next(), - [this](NextResult r) { - EXPECT_TRUE(r.has_value()); - EXPECT_EQ(r.value()->payload()->JoinIntoString(), - message); - return absl::OkStatus(); - }), - [this] { - // Close pipes after receive message. - pipe_server_to_client_messages_second_.sender.Close(); - pipe_server_intial_metadata_second_.sender.Close(); - return absl::OkStatus(); - })), + // Receive messages from first stream pipe. + client_transport_.AddStream(std::move(first_stream_args)), + // Receive messages from second stream pipe. + client_transport_.AddStream(std::move(second_stream_args))), // Once complete, verify successful sending and the received value. [](const std::tuple& ret) { + absl::Status>& ret) { EXPECT_TRUE(std::get<0>(ret).ok()); EXPECT_TRUE(std::get<1>(ret).ok()); EXPECT_TRUE(std::get<2>(ret).ok()); EXPECT_TRUE(std::get<3>(ret).ok()); - EXPECT_TRUE(std::get<4>(ret).ok()); - EXPECT_TRUE(std::get<5>(ret).ok()); return absl::OkStatus(); }), - EventEngineWakeupScheduler( - std::static_pointer_cast< - grpc_event_engine::experimental::EventEngine>(event_engine_)), + InlineWakeupScheduler(), [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); // Wait until ClientTransport's internal activities to finish. event_engine_->TickUntilIdle(); diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index e89f9827246..c35ba3e2033 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -2189,7 +2189,8 @@ "ci_platforms": [ "linux", "mac", - "posix" + "posix", + "windows" ], "cpu_cost": 1.0, "exclude_configs": [], @@ -2201,9 +2202,10 @@ "platforms": [ "linux", "mac", - "posix" + "posix", + "windows" ], - "uses_polling": true + "uses_polling": false }, { "args": [],