From ce75ec23a1a9c5239834b92da4ce0992d367a39c Mon Sep 17 00:00:00 2001 From: nanahpang <31627465+nanahpang@users.noreply.github.com> Date: Mon, 9 Oct 2023 14:09:54 -0700 Subject: [PATCH] [chaotic-good] Initial change of chaotic-good client-read path. (#34191) This is the initial change of chaotic-good client transport read path, which is a following PR of the client transport write path at #33876. There's a pending work of handling endpoint failures in the transport. It will be added after we have the inter-activity pipe with close function. <!-- If you know who should review your pull request, please assign it to that person, otherwise the pull request would get assigned randomly. If your pull request is for a specific language, please add the appropriate lang label. --> --- CMakeLists.txt | 80 +- build_autogenerated.yaml | 7 +- src/core/BUILD | 17 +- .../chaotic_good/client_transport.cc | 113 ++- .../transport/chaotic_good/client_transport.h | 137 +++- src/core/ext/transport/chaotic_good/frame.cc | 13 +- src/core/ext/transport/chaotic_good/frame.h | 1 + .../ext/transport/chaotic_good/frame_header.h | 4 + src/core/lib/transport/promise_endpoint.cc | 8 +- test/core/transport/chaotic_good/BUILD | 8 +- .../chaotic_good/client_transport_test.cc | 721 +++++++++++++++--- tools/run_tests/generated/tests.json | 6 +- 12 files changed, 916 insertions(+), 199 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 03e508a8a7d..6c74a9835a5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -948,7 +948,9 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx client_ssl_test) endif() add_dependencies(buildtests_cxx client_streaming_test) - add_dependencies(buildtests_cxx client_transport_test) + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + add_dependencies(buildtests_cxx client_transport_test) + endif() add_dependencies(buildtests_cxx cmdline_test) add_dependencies(buildtests_cxx codegen_test_full) add_dependencies(buildtests_cxx codegen_test_minimal) @@ -9317,47 +9319,49 @@ target_link_libraries(client_streaming_test endif() if(gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) -add_executable(client_transport_test - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h - ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h - src/core/ext/transport/chaotic_good/client_transport.cc - src/core/ext/transport/chaotic_good/frame.cc - src/core/ext/transport/chaotic_good/frame_header.cc - src/core/lib/transport/promise_endpoint.cc - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc - test/core/transport/chaotic_good/client_transport_test.cc -) -target_compile_features(client_transport_test PUBLIC cxx_std_14) -target_include_directories(client_transport_test - PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - ${_gRPC_RE2_INCLUDE_DIR} - ${_gRPC_SSL_INCLUDE_DIR} - ${_gRPC_UPB_GENERATED_DIR} - ${_gRPC_UPB_GRPC_GENERATED_DIR} - ${_gRPC_UPB_INCLUDE_DIR} - ${_gRPC_XXHASH_INCLUDE_DIR} - ${_gRPC_ZLIB_INCLUDE_DIR} - third_party/googletest/googletest/include - third_party/googletest/googletest - third_party/googletest/googlemock/include - third_party/googletest/googlemock - ${_gRPC_PROTO_GENS_DIR} -) + add_executable(client_transport_test + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h + src/core/ext/transport/chaotic_good/client_transport.cc + src/core/ext/transport/chaotic_good/frame.cc + src/core/ext/transport/chaotic_good/frame_header.cc + src/core/lib/transport/promise_endpoint.cc + test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc + test/core/transport/chaotic_good/client_transport_test.cc + ) + target_compile_features(client_transport_test PUBLIC cxx_std_14) + target_include_directories(client_transport_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} + ) -target_link_libraries(client_transport_test - ${_gRPC_ALLTARGETS_LIBRARIES} - gtest - ${_gRPC_PROTOBUF_LIBRARIES} - grpc_test_util -) + target_link_libraries(client_transport_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gtest + ${_gRPC_PROTOBUF_LIBRARIES} + grpc_test_util + ) +endif() endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 7f914be517c..55605a4c7bf 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -7049,12 +7049,13 @@ targets: - src/core/ext/transport/chaotic_good/frame_header.h - src/core/lib/promise/detail/join_state.h - src/core/lib/promise/event_engine_wakeup_scheduler.h + - src/core/lib/promise/inter_activity_pipe.h - src/core/lib/promise/join.h - src/core/lib/promise/mpsc.h + - src/core/lib/promise/try_join.h - src/core/lib/promise/wait_set.h - src/core/lib/transport/promise_endpoint.h - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h - - test/core/promise/test_wakeup_schedulers.h src: - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto - src/core/ext/transport/chaotic_good/client_transport.cc @@ -7067,6 +7068,10 @@ targets: - gtest - protobuf - grpc_test_util + platforms: + - linux + - posix + - mac uses_polling: false - name: cmdline_test gtest: true diff --git a/src/core/BUILD b/src/core/BUILD index 3ac22e4dbc9..6e550e7564e 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -5948,6 +5948,7 @@ grpc_cc_library( "arena", "bitset", "chaotic_good_frame_header", + "context", "no_destruct", "slice", "slice_buffer", @@ -6116,30 +6117,42 @@ grpc_cc_library( ], external_deps = [ "absl/base:core_headers", + "absl/random", + "absl/random:bit_gen_ref", "absl/status", "absl/status:statusor", + "absl/types:optional", "absl/types:variant", ], language = "c++", deps = [ "activity", + "arena", "chaotic_good_frame", "chaotic_good_frame_header", "event_engine_wakeup_scheduler", "for_each", "grpc_promise_endpoint", - "join", + "if", + "inter_activity_pipe", "loop", "match", + "memory_quota", "mpsc", "pipe", - "seq", + "poll", + "resource_quota", "slice", "slice_buffer", + "try_join", + "try_seq", + "//:exec_ctx", "//:gpr", "//:gpr_platform", "//:grpc_base", "//:hpack_encoder", + "//:hpack_parser", + "//:ref_counted_ptr", ], ) diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc index 88364499d2c..9091b81ccdd 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.cc +++ b/src/core/ext/transport/chaotic_good/client_transport.cc @@ -20,6 +20,8 @@ #include <string> #include <tuple> +#include "absl/random/bit_gen_ref.h" +#include "absl/random/random.h" #include "absl/status/statusor.h" #include <grpc/event_engine/event_engine.h> @@ -30,10 +32,14 @@ #include "src/core/ext/transport/chaotic_good/frame_header.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" #include "src/core/lib/gprpp/match.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/event_engine_wakeup_scheduler.h" -#include "src/core/lib/promise/join.h" #include "src/core/lib/promise/loop.h" +#include "src/core/lib/promise/try_join.h" +#include "src/core/lib/resource_quota/arena.h" +#include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_internal.h" @@ -52,9 +58,14 @@ ClientTransport::ClientTransport( control_endpoint_write_buffer_(SliceBuffer()), data_endpoint_write_buffer_(SliceBuffer()), hpack_compressor_(std::make_unique<HPackCompressor>()), + hpack_parser_(std::make_unique<HPackParser>()), + memory_allocator_( + ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( + "client_transport")), + arena_(MakeScopedArena(1024, &memory_allocator_)), event_engine_(event_engine) { auto write_loop = Loop([this] { - return Seq( + return TrySeq( // Get next outgoing frame. this->outgoing_frames_.Next(), // Construct data buffers that will be sent to the endpoints. @@ -71,6 +82,8 @@ ClientTransport::ClientTransport( control_endpoint_write_buffer_.c_slice_buffer() ->slices[0]))) .value(); + // TODO(ladynana): add message_padding calculation by + // accumulating bytes sent. std::string message_padding(frame_header.message_padding, '0'); Slice slice(grpc_slice_from_cpp_string(message_padding)); @@ -90,20 +103,16 @@ ClientTransport::ClientTransport( }, // Write buffers to corresponding endpoints concurrently. [this]() { - return Join(this->control_endpoint_->Write( - std::move(control_endpoint_write_buffer_)), - this->data_endpoint_->Write( - std::move(data_endpoint_write_buffer_))); + return TryJoin( + control_endpoint_->Write( + std::move(control_endpoint_write_buffer_)), + data_endpoint_->Write(std::move(data_endpoint_write_buffer_))); }, - // Finish writes and return status. - [](std::tuple<absl::Status, absl::Status> ret) - -> LoopCtl<absl::Status> { - // If writes failed, return failure status. - if (!(std::get<0>(ret).ok() || std::get<1>(ret).ok())) { - // TODO(ladynana): handle the promise endpoint write failures with - // closing the transport. - return absl::InternalError("Promise endpoint writes failed."); - } + // Finish writes to difference endpoints and continue the loop. + []() -> LoopCtl<absl::Status> { + // The write failures will be caught in TrySeq and exit loop. + // Therefore, only need to return Continue() in the last lambda + // function. return Continue(); }); }); @@ -113,7 +122,79 @@ ClientTransport::ClientTransport( [](absl::Status status) { GPR_ASSERT(status.code() == absl::StatusCode::kCancelled || status.code() == absl::StatusCode::kInternal); - }); + // TODO(ladynana): handle the promise endpoint write failures with + // outgoing_frames.close() once available. + }, + // Hold Arena in activity for GetContext<Arena> usage. + arena_.get()); + auto read_loop = Loop([this] { + return TrySeq( + // Read frame header from control endpoint. + // TODO(ladynana): remove memcpy in ReadSlice. + this->control_endpoint_->ReadSlice(FrameHeader::frame_header_size_), + // Read different parts of the server frame from control/data endpoints + // based on frame header. + [this](Slice read_buffer) mutable { + frame_header_ = std::make_shared<FrameHeader>( + FrameHeader::Parse( + reinterpret_cast<const uint8_t*>( + GRPC_SLICE_START_PTR(read_buffer.c_slice()))) + .value()); + // Read header and trailers from control endpoint. + // Read message padding and message from data endpoint. + return TryJoin( + control_endpoint_->Read(frame_header_->GetFrameLength()), + data_endpoint_->Read(frame_header_->message_padding + + frame_header_->message_length)); + }, + // Construct and send the server frame to corresponding stream. + [this](std::tuple<SliceBuffer, SliceBuffer> ret) mutable { + control_endpoint_read_buffer_ = std::move(std::get<0>(ret)); + // Discard message padding and only keep message in data read buffer. + std::get<1>(ret).MoveLastNBytesIntoSliceBuffer( + frame_header_->message_length, data_endpoint_read_buffer_); + ServerFragmentFrame frame; + // Initialized to get this_cpu() info in global_stat(). + ExecCtx exec_ctx; + // Deserialize frame from read buffer. + absl::BitGen bitgen; + auto status = frame.Deserialize(hpack_parser_.get(), *frame_header_, + absl::BitGenRef(bitgen), + control_endpoint_read_buffer_); + GPR_ASSERT(status.ok()); + // Move message into frame. + frame.message = arena_->MakePooled<Message>( + std::move(data_endpoint_read_buffer_), 0); + std::shared_ptr< + InterActivityPipe<ServerFrame, server_frame_queue_size_>::Sender> + sender; + { + MutexLock lock(&mu_); + sender = stream_map_[frame.stream_id]; + } + return sender->Push(ServerFrame(std::move(frame))); + }, + // Check if send frame to corresponding stream successfully. + [](bool ret) -> LoopCtl<absl::Status> { + if (ret) { + // Send incoming frames successfully. + return Continue(); + } else { + return absl::InternalError("Send incoming frames failed."); + } + }); + }); + reader_ = MakeActivity( + // Continuously read next incoming frames from promise endpoints. + std::move(read_loop), EventEngineWakeupScheduler(event_engine_), + [](absl::Status status) { + GPR_ASSERT(status.code() == absl::StatusCode::kCancelled || + status.code() == absl::StatusCode::kInternal); + // TODO(ladynana): handle the promise endpoint read failures with + // iterating stream_map_ and close all the pipes once available. + }, + // Hold Arena in activity for GetContext<Arena> usage. + arena_.get()); } } // namespace chaotic_good diff --git a/src/core/ext/transport/chaotic_good/client_transport.h b/src/core/ext/transport/chaotic_good/client_transport.h index d8e9cca3632..548da1681b7 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.h +++ b/src/core/ext/transport/chaotic_good/client_transport.h @@ -19,25 +19,40 @@ #include <stdint.h> +#include <cstddef> #include <initializer_list> // IWYU pragma: keep +#include <map> #include <memory> +#include <tuple> #include <type_traits> #include <utility> #include "absl/base/thread_annotations.h" #include "absl/status/status.h" +#include "absl/types/optional.h" #include "absl/types/variant.h" #include <grpc/event_engine/event_engine.h> +#include <grpc/event_engine/memory_allocator.h> +#include <grpc/support/log.h> #include "src/core/ext/transport/chaotic_good/frame.h" +#include "src/core/ext/transport/chaotic_good/frame_header.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" +#include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/for_each.h" +#include "src/core/lib/promise/if.h" +#include "src/core/lib/promise/inter_activity_pipe.h" +#include "src/core/lib/promise/loop.h" #include "src/core/lib/promise/mpsc.h" #include "src/core/lib/promise/pipe.h" -#include "src/core/lib/promise/seq.h" +#include "src/core/lib/promise/poll.h" +#include "src/core/lib/promise/try_join.h" +#include "src/core/lib/promise/try_seq.h" +#include "src/core/lib/resource_quota/arena.h" +#include "src/core/lib/resource_quota/memory_quota.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/transport/promise_endpoint.h" #include "src/core/lib/transport/transport.h" @@ -55,56 +70,124 @@ class ClientTransport { if (writer_ != nullptr) { writer_.reset(); } + if (reader_ != nullptr) { + reader_.reset(); + } } auto AddStream(CallArgs call_args) { // At this point, the connection is set up. // Start sending data frames. uint64_t stream_id; + InterActivityPipe<ServerFrame, server_frame_queue_size_> server_frames; { MutexLock lock(&mu_); stream_id = next_stream_id_++; + stream_map_.insert( + std::pair<uint32_t, + std::shared_ptr<InterActivityPipe< + ServerFrame, server_frame_queue_size_>::Sender>>( + stream_id, std::make_shared<InterActivityPipe< + ServerFrame, server_frame_queue_size_>::Sender>( + std::move(server_frames.sender)))); } - return Seq( - // Continuously send data frame with client to server messages. - ForEach(std::move(*call_args.client_to_server_messages), - [stream_id, initial_frame = true, - client_initial_metadata = - std::move(call_args.client_initial_metadata), - outgoing_frames = outgoing_frames_.MakeSender()]( - MessageHandle result) mutable { - ClientFragmentFrame frame; - frame.stream_id = stream_id; - frame.message = std::move(result); - if (initial_frame) { - // Send initial frame with client intial metadata. - frame.headers = std::move(client_initial_metadata); - initial_frame = false; - } - return Seq( - outgoing_frames.Send(ClientFrame(std::move(frame))), - [](bool success) -> absl::Status { - if (!success) { - return absl::InternalError( - "Send frame to outgoing_frames failed."); - } - return absl::OkStatus(); - }); - })); + return TrySeq( + TryJoin( + // Continuously send client frame with client to server messages. + ForEach(std::move(*call_args.client_to_server_messages), + [stream_id, initial_frame = true, + client_initial_metadata = + std::move(call_args.client_initial_metadata), + outgoing_frames = outgoing_frames_.MakeSender()]( + MessageHandle result) mutable { + ClientFragmentFrame frame; + frame.stream_id = stream_id; + frame.message = std::move(result); + if (initial_frame) { + // Send initial frame with client intial metadata. + frame.headers = std::move(client_initial_metadata); + initial_frame = false; + } + return TrySeq( + outgoing_frames.Send(ClientFrame(std::move(frame))), + [](bool success) -> absl::Status { + if (!success) { + return absl::InternalError( + "Send frame to outgoing_frames failed."); + } + return absl::OkStatus(); + }); + }), + // Continuously receive server frames from endpoints and save + // results to call_args. + Loop([server_initial_metadata = call_args.server_initial_metadata, + server_to_client_messages = + call_args.server_to_client_messages, + receiver = std::move(server_frames.receiver)]() mutable { + return TrySeq( + // Receive incoming server frame. + receiver.Next(), + // Save incomming frame results to call_args. + [server_initial_metadata, server_to_client_messages]( + absl::optional<ServerFrame> server_frame) mutable { + GPR_ASSERT(server_frame.has_value()); + auto frame = std::move( + absl::get<ServerFragmentFrame>(*server_frame)); + return TrySeq( + If((frame.headers != nullptr), + [server_initial_metadata, + headers = std::move(frame.headers)]() mutable { + return server_initial_metadata->Push( + std::move(headers)); + }, + [] { return false; }), + If((frame.message != nullptr), + [server_to_client_messages, + message = std::move(frame.message)]() mutable { + return server_to_client_messages->Push( + std::move(message)); + }, + [] { return false; }), + If((frame.trailers != nullptr), + [trailers = std::move(frame.trailers)]() mutable + -> LoopCtl<ServerMetadataHandle> { + return std::move(trailers); + }, + []() -> LoopCtl<ServerMetadataHandle> { + return Continue(); + })); + }); + })), + [](std::tuple<Empty, ServerMetadataHandle> ret) { + return std::move(std::get<1>(ret)); + }); } private: // Max buffer is set to 4, so that for stream writes each time it will queue // at most 2 frames. MpscReceiver<ClientFrame> outgoing_frames_; + // Queue size of each stream pipe is set to 2, so that for each stream read it + // will queue at most 2 frames. + static const size_t server_frame_queue_size_ = 2; Mutex mu_; uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1; + // Map of stream incoming server frames, key is stream_id. + std::map<uint32_t, std::shared_ptr<InterActivityPipe< + ServerFrame, server_frame_queue_size_>::Sender>> + stream_map_ ABSL_GUARDED_BY(mu_); ActivityPtr writer_; ActivityPtr reader_; std::unique_ptr<PromiseEndpoint> control_endpoint_; std::unique_ptr<PromiseEndpoint> data_endpoint_; SliceBuffer control_endpoint_write_buffer_; SliceBuffer data_endpoint_write_buffer_; + SliceBuffer control_endpoint_read_buffer_; + SliceBuffer data_endpoint_read_buffer_; std::unique_ptr<HPackCompressor> hpack_compressor_; + std::unique_ptr<HPackParser> hpack_parser_; + std::shared_ptr<FrameHeader> frame_header_; + MemoryAllocator memory_allocator_; + ScopedArenaPtr arena_; // Use to synchronize writer_ and reader_ activity with outside activities; std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_; }; diff --git a/src/core/ext/transport/chaotic_good/frame.cc b/src/core/ext/transport/chaotic_good/frame.cc index ac353d6bd7c..ce14614c8ce 100644 --- a/src/core/ext/transport/chaotic_good/frame.cc +++ b/src/core/ext/transport/chaotic_good/frame.cc @@ -31,6 +31,8 @@ #include "src/core/lib/gprpp/bitset.h" #include "src/core/lib/gprpp/no_destruct.h" #include "src/core/lib/gprpp/status_helper.h" +#include "src/core/lib/promise/context.h" +#include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" @@ -116,7 +118,9 @@ absl::StatusOr<Arena::PoolPtr<Metadata>> ReadMetadata( absl::BitGenRef bitsrc) { if (!maybe_slices.ok()) return maybe_slices.status(); auto& slices = *maybe_slices; - Arena::PoolPtr<Metadata> metadata; + auto arena = GetContext<Arena>(); + GPR_ASSERT(arena != nullptr); + Arena::PoolPtr<Metadata> metadata = arena->MakePooled<Metadata>(arena); parser->BeginFrame( metadata.get(), std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), @@ -212,11 +216,18 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser, ReadMetadata<ServerMetadata>(parser, deserializer.ReceiveHeaders(), header.stream_id, true, false, bitsrc); if (!r.ok()) return r.status(); + if (r.value() != nullptr) { + headers = std::move(r.value()); + } } if (header.flags.is_set(1)) { auto r = ReadMetadata<ServerMetadata>(parser, deserializer.ReceiveTrailers(), header.stream_id, false, false, bitsrc); + if (!r.ok()) return r.status(); + if (r.value() != nullptr) { + trailers = std::move(r.value()); + } } return deserializer.Finish(); } diff --git a/src/core/ext/transport/chaotic_good/frame.h b/src/core/ext/transport/chaotic_good/frame.h index 9f3538af71f..7f93f5a0926 100644 --- a/src/core/ext/transport/chaotic_good/frame.h +++ b/src/core/ext/transport/chaotic_good/frame.h @@ -96,6 +96,7 @@ struct ServerFragmentFrame final : public FrameInterface { uint32_t stream_id; ServerMetadataHandle headers; + MessageHandle message; ServerMetadataHandle trailers; bool operator==(const ServerFragmentFrame& other) const { diff --git a/src/core/ext/transport/chaotic_good/frame_header.h b/src/core/ext/transport/chaotic_good/frame_header.h index a8210cf2574..7834c7d0a83 100644 --- a/src/core/ext/transport/chaotic_good/frame_header.h +++ b/src/core/ext/transport/chaotic_good/frame_header.h @@ -17,6 +17,8 @@ #include <grpc/support/port_platform.h> +#include <stddef.h> + #include <cstdint> #include "absl/status/statusor.h" @@ -55,6 +57,8 @@ struct FrameHeader { message_padding == h.message_padding && trailer_length == h.trailer_length; } + // Frame header size is fixed to 24 bytes. + static constexpr size_t frame_header_size_ = 24; }; } // namespace chaotic_good diff --git a/src/core/lib/transport/promise_endpoint.cc b/src/core/lib/transport/promise_endpoint.cc index 660d183a647..e9bc70a2e16 100644 --- a/src/core/lib/transport/promise_endpoint.cc +++ b/src/core/lib/transport/promise_endpoint.cc @@ -46,10 +46,10 @@ PromiseEndpoint::PromiseEndpoint( } PromiseEndpoint::~PromiseEndpoint() { - // Last write result has not been polled. - GPR_ASSERT(!write_result_.has_value()); - // Last read result has not been polled. - GPR_ASSERT(!read_result_.has_value()); + // Promise endpoint close when last write result has not been polled. + write_result_.reset(); + // Promise endpoint close when last read result has not been polled. + read_result_.reset(); } const grpc_event_engine::experimental::EventEngine::ResolvedAddress& diff --git a/test/core/transport/chaotic_good/BUILD b/test/core/transport/chaotic_good/BUILD index 9c7cd464cdb..b0e25425af2 100644 --- a/test/core/transport/chaotic_good/BUILD +++ b/test/core/transport/chaotic_good/BUILD @@ -87,20 +87,27 @@ grpc_cc_test( srcs = ["client_transport_test.cc"], external_deps = [ "absl/functional:any_invocable", + "absl/status:statusor", "absl/strings:str_format", + "absl/types:optional", "gtest", ], language = "C++", + # TODO(ladynana): remove the no_windows tag. + tags = ["no_windows"], uses_event_engine = False, uses_polling = False, deps = [ "//:grpc", + "//:grpc_public_hdrs", "//:iomgr_timer", "//:ref_counted_ptr", "//src/core:activity", "//src/core:arena", "//src/core:chaotic_good_client_transport", + "//src/core:event_engine_wakeup_scheduler", "//src/core:join", + "//src/core:map", "//src/core:memory_quota", "//src/core:pipe", "//src/core:resource_quota", @@ -109,6 +116,5 @@ grpc_cc_test( "//src/core:slice_buffer", "//test/core/event_engine/fuzzing_event_engine", "//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto", - "//test/core/promise:test_wakeup_schedulers", ], ) diff --git a/test/core/transport/chaotic_good/client_transport_test.cc b/test/core/transport/chaotic_good/client_transport_test.cc index b922eaa037a..f2376d125e2 100644 --- a/test/core/transport/chaotic_good/client_transport_test.cc +++ b/test/core/transport/chaotic_good/client_transport_test.cc @@ -16,27 +16,32 @@ // IWYU pragma: no_include <sys/socket.h> -#include <stdio.h> - #include <algorithm> // IWYU pragma: keep #include <memory> +#include <string> // IWYU pragma: keep #include <tuple> #include <vector> // IWYU pragma: keep #include "absl/functional/any_invocable.h" +#include "absl/status/statusor.h" // IWYU pragma: keep #include "absl/strings/str_format.h" // IWYU pragma: keep +#include "absl/types/optional.h" // IWYU pragma: keep #include "gmock/gmock.h" #include "gtest/gtest.h" #include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/memory_allocator.h> +#include <grpc/event_engine/slice.h> // IWYU pragma: keep #include <grpc/event_engine/slice_buffer.h> #include <grpc/grpc.h> +#include <grpc/status.h> // IWYU pragma: keep #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/timer_manager.h" #include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/event_engine_wakeup_scheduler.h" #include "src/core/lib/promise/join.h" +#include "src/core/lib/promise/map.h" #include "src/core/lib/promise/pipe.h" #include "src/core/lib/promise/seq.h" #include "src/core/lib/resource_quota/arena.h" @@ -44,12 +49,14 @@ #include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" +#include "src/core/lib/slice/slice_internal.h" // IWYU pragma: keep +#include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" -#include "test/core/promise/test_wakeup_schedulers.h" using testing::MockFunction; using testing::Return; +using testing::Sequence; using testing::StrictMock; using testing::WithArgs; @@ -103,19 +110,117 @@ class ClientTransportTest : public ::testing::Test { return options; }(), fuzzing_event_engine::Actions())), - client_transport_( - std::make_unique<PromiseEndpoint>( - std::unique_ptr<MockEndpoint>(control_endpoint_ptr_), - SliceBuffer()), - std::make_unique<PromiseEndpoint>( - std::unique_ptr<MockEndpoint>(data_endpoint_ptr_), - SliceBuffer()), - std::static_pointer_cast< - grpc_event_engine::experimental::EventEngine>(event_engine_)), arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)), pipe_client_to_server_messages_(arena_.get()), - pipe_client_to_server_messages_second_(arena_.get()) {} - + pipe_server_to_client_messages_(arena_.get()), + pipe_server_intial_metadata_(arena_.get()), + pipe_client_to_server_messages_second_(arena_.get()), + pipe_server_to_client_messages_second_(arena_.get()), + pipe_server_intial_metadata_second_(arena_.get()) {} + // Expect how client transport will read from control/data endpoints with a + // test frame. + void AddReadExpectations(int num_of_streams) { + for (int i = 0; i < num_of_streams; i++) { + EXPECT_CALL(control_endpoint_, Read) + .InSequence(control_endpoint_sequence) + .WillOnce(WithArgs<0, 1>( + [this, i](absl::AnyInvocable<void(absl::Status)> on_read, + grpc_event_engine::experimental::SliceBuffer* + buffer) mutable { + // Construct test frame for EventEngine read: headers (15 + // bytes), message(16 bytes), message padding (48 byte), + // trailers (15 bytes). + const std::string frame_header = { + static_cast<char>(0x80), // frame type = fragment + 0x03, // flag = has header + has trailer + 0x00, + 0x00, + static_cast<char>(i + 1), // stream id = 1 + 0x00, + 0x00, + 0x00, + 0x1a, // header length = 26 + 0x00, + 0x00, + 0x00, + 0x08, // message length = 8 + 0x00, + 0x00, + 0x00, + 0x38, // message padding =56 + 0x00, + 0x00, + 0x00, + 0x0f, // trailer length = 15 + 0x00, + 0x00, + 0x00}; + // Schedule mock_endpoint to read buffer. + grpc_event_engine::experimental::Slice slice( + grpc_slice_from_cpp_string(frame_header)); + buffer->Append(std::move(slice)); + // Execute read callback later to control when read starts. + read_callback.push_back(std::move(on_read)); + // Return false to mock EventEngine read not finish. + return false; + })); + EXPECT_CALL(control_endpoint_, Read) + .InSequence(control_endpoint_sequence) + .WillOnce(WithArgs<1>( + [](grpc_event_engine::experimental::SliceBuffer* buffer) { + // Encoded string of header ":path: /demo.Service/Step". + const std::string header = { + 0x10, 0x05, 0x3a, 0x70, 0x61, 0x74, 0x68, 0x12, 0x2f, + 0x64, 0x65, 0x6d, 0x6f, 0x2e, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x2f, 0x53, 0x74, 0x65, 0x70}; + // Encoded string of trailer "grpc-status: 0". + const std::string trailers = {0x10, 0x0b, 0x67, 0x72, 0x70, + 0x63, 0x2d, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x01, 0x30}; + // Schedule mock_endpoint to read buffer. + grpc_event_engine::experimental::Slice slice( + grpc_slice_from_cpp_string(header + trailers)); + buffer->Append(std::move(slice)); + return true; + })); + } + EXPECT_CALL(control_endpoint_, Read) + .InSequence(control_endpoint_sequence) + .WillOnce(Return(false)); + for (int i = 0; i < num_of_streams; i++) { + EXPECT_CALL(data_endpoint_, Read) + .InSequence(data_endpoint_sequence) + .WillOnce(WithArgs<1>( + [this](grpc_event_engine::experimental::SliceBuffer* buffer) { + const std::string message_padding = { + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + grpc_event_engine::experimental::Slice slice( + grpc_slice_from_cpp_string(message_padding + message)); + buffer->Append(std::move(slice)); + return true; + })); + } + } + // Initial ClientTransport with read expecations + void InitialClientTransport(int num_of_streams) { + // Read expectaions need to be added before transport initialization since + // reader_ activity loop is started in ClientTransport initialization, + AddReadExpectations(num_of_streams); + client_transport_ = std::make_unique<ClientTransport>( + std::make_unique<PromiseEndpoint>( + std::unique_ptr<MockEndpoint>(control_endpoint_ptr_), + SliceBuffer()), + std::make_unique<PromiseEndpoint>( + std::unique_ptr<MockEndpoint>(data_endpoint_ptr_), SliceBuffer()), + std::static_pointer_cast<grpc_event_engine::experimental::EventEngine>( + event_engine_)); + } + // Create client to server test messages. std::vector<MessageHandle> CreateMessages(int num_of_messages) { std::vector<MessageHandle> messages; for (int i = 0; i < num_of_messages; i++) { @@ -127,103 +232,238 @@ class ClientTransportTest : public ::testing::Test { } return messages; } + // Wait for last stream read to finish. + auto Wait() { + return [this]() mutable -> Poll<Result> { + MutexLock lock(&mu_); + if (last_stream_read_done_) { + return Result{}; + } else { + waker_ = Activity::current()->MakeNonOwningWaker(); + return Pending(); + } + }; + } + // Wake up the pending Wait() promise. + void Wakeup() { + MutexLock lock(&mu_); + last_stream_read_done_ = true; + waker_.Wakeup(); + } private: + struct Result {}; + Mutex mu_; + Waker waker_ ABSL_GUARDED_BY(mu_); + bool last_stream_read_done_ ABSL_GUARDED_BY(mu_) = false; MockEndpoint* control_endpoint_ptr_; MockEndpoint* data_endpoint_ptr_; size_t initial_arena_size = 1024; MemoryAllocator memory_allocator_; + Sequence control_endpoint_sequence; + Sequence data_endpoint_sequence; protected: MockEndpoint& control_endpoint_; MockEndpoint& data_endpoint_; std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine> event_engine_; - ClientTransport client_transport_; + std::unique_ptr<ClientTransport> client_transport_; ScopedArenaPtr arena_; Pipe<MessageHandle> pipe_client_to_server_messages_; + Pipe<MessageHandle> pipe_server_to_client_messages_; + Pipe<ServerMetadataHandle> pipe_server_intial_metadata_; // Added for mutliple streams tests. Pipe<MessageHandle> pipe_client_to_server_messages_second_; - - const absl::Status kDummyErrorStatus = - absl::ErrnoToStatus(5566, "just an error"); - static constexpr size_t kDummyRequestSize = 5566u; + Pipe<MessageHandle> pipe_server_to_client_messages_second_; + Pipe<ServerMetadataHandle> pipe_server_intial_metadata_second_; + std::vector<absl::AnyInvocable<void(absl::Status)>> read_callback; + // Added to verify received message payload. + const std::string message = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}; }; TEST_F(ClientTransportTest, AddOneStream) { + InitialClientTransport(1); auto messages = CreateMessages(1); ClientMetadataHandle md; - auto args = CallArgs{ - std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, - nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; + auto args = CallArgs{std::move(md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_.sender, + &pipe_client_to_server_messages_.receiver, + &pipe_server_to_client_messages_.sender}; StrictMock<MockFunction<void(absl::Status)>> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(control_endpoint_, Write).WillOnce(Return(true)); EXPECT_CALL(data_endpoint_, Write).WillOnce(Return(true)); auto activity = MakeActivity( Seq( - // Concurrently: send message into the pipe, and receive from the - // pipe. - Join(Seq(pipe_client_to_server_messages_.sender.Push( - std::move(messages[0])), - [this] { - this->pipe_client_to_server_messages_.sender.Close(); - return absl::OkStatus(); - }), - client_transport_.AddStream(std::move(args))), + // Concurrently: write and read messages in client transport. + Join( + // Send messages to call_args.client_to_server_messages pipe, + // which will be eventually sent to control/data endpoints. + Seq(pipe_client_to_server_messages_.sender.Push( + std::move(messages[0])), + [this] { + pipe_client_to_server_messages_.sender.Close(); + return absl::OkStatus(); + }), + // Add first stream with call_args into client transport. + Seq(Join(client_transport_->AddStream(std::move(args)), + [this]() { + // Concurrently: start read from control endpoints. + read_callback[0](absl::OkStatus()); + return absl::OkStatus(); + }), + [](std::tuple<absl::StatusOr<ServerMetadataHandle>, + absl::Status> + ret) { + // AddStream will finish with server trailers: + // "grpc-status:0". + EXPECT_EQ(std::get<0>(ret) + .value() + ->get(GrpcStatusMetadata()) + .value(), + grpc_status_code::GRPC_STATUS_OK); + return absl::OkStatus(); + }), + // Receive messages from control/data endpoints. + Seq( + // Receive server initial metadata. + Map(pipe_server_intial_metadata_.receiver.Next(), + [](NextResult<ServerMetadataHandle> r) { + // Expect value: ":path: /demo.Service/Step" + EXPECT_TRUE(r.has_value()); + EXPECT_EQ(r.value() + ->get_pointer(HttpPathMetadata()) + ->as_string_view(), + "/demo.Service/Step"); + return absl::OkStatus(); + }), + // Receive server to client messages. + Map(pipe_server_to_client_messages_.receiver.Next(), + [this](NextResult<MessageHandle> r) { + EXPECT_TRUE(r.has_value()); + EXPECT_EQ(r.value()->payload()->JoinIntoString(), + message); + return absl::OkStatus(); + }), + [this]() { + // Close pipes after receive message. + pipe_server_to_client_messages_.sender.Close(); + pipe_server_intial_metadata_.sender.Close(); + return absl::OkStatus(); + })), // Once complete, verify successful sending and the received value. - [](const std::tuple<absl::Status, absl::Status>& ret) { + [](const std::tuple<absl::Status, absl::Status, absl::Status>& ret) { EXPECT_TRUE(std::get<0>(ret).ok()); EXPECT_TRUE(std::get<1>(ret).ok()); + EXPECT_TRUE(std::get<2>(ret).ok()); return absl::OkStatus(); }), - InlineWakeupScheduler(), + EventEngineWakeupScheduler( + std::static_pointer_cast< + grpc_event_engine::experimental::EventEngine>(event_engine_)), [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); // Wait until ClientTransport's internal activities to finish. event_engine_->TickUntilIdle(); event_engine_->UnsetGlobalHooks(); } -TEST_F(ClientTransportTest, AddOneStreamWithEEFailed) { +TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) { + InitialClientTransport(1); auto messages = CreateMessages(1); ClientMetadataHandle md; - auto args = CallArgs{ - std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, - nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; + auto args = CallArgs{std::move(md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_.sender, + &pipe_client_to_server_messages_.receiver, + &pipe_server_to_client_messages_.sender}; StrictMock<MockFunction<void(absl::Status)>> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(control_endpoint_, Write) .WillOnce( - WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) { - on_write(this->kDummyErrorStatus); + WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) { + on_write(absl::InternalError("control endpoint write failed.")); return false; })); EXPECT_CALL(data_endpoint_, Write) .WillOnce( - WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) { - on_write(this->kDummyErrorStatus); + WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) { + on_write(absl::InternalError("control endpoint write failed.")); return false; })); auto activity = MakeActivity( Seq( - // Concurrently: send message into the pipe, and receive from the - // pipe. - Join(Seq(pipe_client_to_server_messages_.sender.Push( - std::move(messages[0])), - [this] { - this->pipe_client_to_server_messages_.sender.Close(); - return absl::OkStatus(); - }), - client_transport_.AddStream(std::move(args))), + // Concurrently: write and read messages in client transport. + Join( + // Send messages to call_args.client_to_server_messages pipe, + // which will be eventually sent to control/data endpoints. + Seq(pipe_client_to_server_messages_.sender.Push( + std::move(messages[0])), + [this] { + pipe_client_to_server_messages_.sender.Close(); + return absl::OkStatus(); + }), + // Add first stream with call_args into client transport. + Seq(Join(client_transport_->AddStream(std::move(args)), + [this]() { + // Start read from endpoints. + read_callback[0](absl::OkStatus()); + return absl::OkStatus(); + }), + [](std::tuple<absl::StatusOr<ServerMetadataHandle>, + absl::Status> + ret) { + // AddStream will finish with server trailers: + // "grpc-status:0". + EXPECT_EQ(std::get<0>(ret) + .value() + ->get(GrpcStatusMetadata()) + .value(), + grpc_status_code::GRPC_STATUS_OK); + return absl::OkStatus(); + }), + // Receive messages from control/data endpoints. + Seq( + // Receive server initial metadata. + Map(pipe_server_intial_metadata_.receiver.Next(), + [](NextResult<ServerMetadataHandle> r) { + // Expect value: ":path: /demo.Service/Step" + EXPECT_TRUE(r.has_value()); + EXPECT_EQ(r.value() + ->get_pointer(HttpPathMetadata()) + ->as_string_view(), + "/demo.Service/Step"); + return absl::OkStatus(); + }), + // Receive server to client messages. + Map(pipe_server_to_client_messages_.receiver.Next(), + [this](NextResult<MessageHandle> r) { + EXPECT_TRUE(r.has_value()); + EXPECT_EQ(r.value()->payload()->JoinIntoString(), + message); + return absl::OkStatus(); + }), + [this] { + // Close pipes after receive message. + pipe_server_to_client_messages_.sender.Close(); + pipe_server_intial_metadata_.sender.Close(); + return absl::OkStatus(); + })), // Once complete, verify successful sending and the received value. - [](const std::tuple<absl::Status, absl::Status>& ret) { + [](const std::tuple<absl::Status, absl::Status, absl::Status>& ret) { // TODO(ladynana): change these expectations to errors after the - // writer activity closes transport for EE failures. + // writer activity closes transport for write failures. EXPECT_TRUE(std::get<0>(ret).ok()); EXPECT_TRUE(std::get<1>(ret).ok()); + EXPECT_TRUE(std::get<2>(ret).ok()); return absl::OkStatus(); }), - InlineWakeupScheduler(), + EventEngineWakeupScheduler( + std::static_pointer_cast< + grpc_event_engine::experimental::EventEngine>(event_engine_)), [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); // Wait until ClientTransport's internal activities to finish. event_engine_->TickUntilIdle(); @@ -231,37 +471,90 @@ TEST_F(ClientTransportTest, AddOneStreamWithEEFailed) { } TEST_F(ClientTransportTest, AddOneStreamMultipleMessages) { + InitialClientTransport(1); auto messages = CreateMessages(3); ClientMetadataHandle md; - auto args = CallArgs{ - std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, - nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; + auto args = CallArgs{std::move(md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_.sender, + &pipe_client_to_server_messages_.receiver, + &pipe_server_to_client_messages_.sender}; StrictMock<MockFunction<void(absl::Status)>> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(control_endpoint_, Write).Times(3).WillRepeatedly(Return(true)); EXPECT_CALL(data_endpoint_, Write).Times(3).WillRepeatedly(Return(true)); auto activity = MakeActivity( Seq( - // Concurrently: send messages into the pipe, and receive from the - // pipe. - Join(Seq(pipe_client_to_server_messages_.sender.Push( - std::move(messages[0])), - pipe_client_to_server_messages_.sender.Push( - std::move(messages[1])), - pipe_client_to_server_messages_.sender.Push( - std::move(messages[2])), - [this] { - this->pipe_client_to_server_messages_.sender.Close(); - return absl::OkStatus(); - }), - client_transport_.AddStream(std::move(args))), + // Concurrently: write and read messages in client transport. + Join( + // Send messages to call_args.client_to_server_messages pipe, + // which will be eventually sent to control/data endpoints. + Seq(pipe_client_to_server_messages_.sender.Push( + std::move(messages[0])), + pipe_client_to_server_messages_.sender.Push( + std::move(messages[1])), + pipe_client_to_server_messages_.sender.Push( + std::move(messages[2])), + [this] { + pipe_client_to_server_messages_.sender.Close(); + return absl::OkStatus(); + }), + // Add first stream with call_args into client transport. + Seq(Join(client_transport_->AddStream(std::move(args)), + [this]() { + // Start read from endpoints. + read_callback[0](absl::OkStatus()); + return absl::OkStatus(); + }), + [](std::tuple<absl::StatusOr<ServerMetadataHandle>, + absl::Status> + ret) { + // AddStream finish with trailers "grpc-status:0". + EXPECT_EQ(std::get<0>(ret) + .value() + ->get(GrpcStatusMetadata()) + .value(), + grpc_status_code::GRPC_STATUS_OK); + return absl::OkStatus(); + }), + // Receive messages from control/data endpoints. + Seq( + // Receive server initial metadata. + Map(pipe_server_intial_metadata_.receiver.Next(), + [](NextResult<ServerMetadataHandle> r) { + // Expect value: ":path: /demo.Service/Step" + EXPECT_TRUE(r.has_value()); + EXPECT_EQ(r.value() + ->get_pointer(HttpPathMetadata()) + ->as_string_view(), + "/demo.Service/Step"); + return absl::OkStatus(); + }), + // Receive server to client messages. + Map(pipe_server_to_client_messages_.receiver.Next(), + [this](NextResult<MessageHandle> r) { + EXPECT_TRUE(r.has_value()); + EXPECT_EQ(r.value()->payload()->JoinIntoString(), + message); + return absl::OkStatus(); + }), + [this] { + // Close pipes after receive message. + pipe_server_to_client_messages_.sender.Close(); + pipe_server_intial_metadata_.sender.Close(); + return absl::OkStatus(); + })), // Once complete, verify successful sending and the received value. - [](const std::tuple<absl::Status, absl::Status>& ret) { + [](const std::tuple<absl::Status, absl::Status, absl::Status>& ret) { EXPECT_TRUE(std::get<0>(ret).ok()); EXPECT_TRUE(std::get<1>(ret).ok()); + EXPECT_TRUE(std::get<2>(ret).ok()); return absl::OkStatus(); }), - InlineWakeupScheduler(), + EventEngineWakeupScheduler( + std::static_pointer_cast< + grpc_event_engine::experimental::EventEngine>(event_engine_)), [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); // Wait until ClientTransport's internal activities to finish. event_engine_->TickUntilIdle(); @@ -269,51 +562,160 @@ TEST_F(ClientTransportTest, AddOneStreamMultipleMessages) { } TEST_F(ClientTransportTest, AddMultipleStreams) { + InitialClientTransport(2); auto messages = CreateMessages(2); - ClientMetadataHandle md; - auto first_stream_args = CallArgs{ - std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, - nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; - auto second_stream_args = CallArgs{ - std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, - nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr}; + ClientMetadataHandle first_stream_md; + ClientMetadataHandle second_stream_md; + auto first_stream_args = + CallArgs{std::move(first_stream_md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_.sender, + &pipe_client_to_server_messages_.receiver, + &pipe_server_to_client_messages_.sender}; + auto second_stream_args = + CallArgs{std::move(second_stream_md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_second_.sender, + &pipe_client_to_server_messages_second_.receiver, + &pipe_server_to_client_messages_second_.sender}; StrictMock<MockFunction<void(absl::Status)>> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(control_endpoint_, Write).Times(2).WillRepeatedly(Return(true)); EXPECT_CALL(data_endpoint_, Write).Times(2).WillRepeatedly(Return(true)); auto activity = MakeActivity( Seq( - // Concurrently: send messages into the pipe, and receive from the - // pipe. + // Concurrently: write and read messages from client transport. Join( - // Send message to first stream pipe. + // Send messages to first stream's + // call_args.client_to_server_messages pipe, which will be + // eventually sent to control/data endpoints. Seq(pipe_client_to_server_messages_.sender.Push( std::move(messages[0])), [this] { pipe_client_to_server_messages_.sender.Close(); return absl::OkStatus(); }), - // Send message to second stream pipe. + // Send messages to second stream's + // call_args.client_to_server_messages pipe, which will be + // eventually sent to control/data endpoints. Seq(pipe_client_to_server_messages_second_.sender.Push( std::move(messages[1])), [this] { pipe_client_to_server_messages_second_.sender.Close(); return absl::OkStatus(); }), - // Receive message from first stream pipe. - client_transport_.AddStream(std::move(first_stream_args)), - // Receive message from second stream pipe. - client_transport_.AddStream(std::move(second_stream_args))), + // Add first stream with call_args into client transport. + Seq(Join(client_transport_->AddStream( + std::move(first_stream_args)), + [this] { + read_callback[0](absl::OkStatus()); + return absl::OkStatus(); + }), + [](std::tuple<absl::StatusOr<ServerMetadataHandle>, + absl::Status> + ret) { + // AddStream finish with trailers "grpc-status:0". + EXPECT_EQ(std::get<0>(ret) + .value() + ->get(GrpcStatusMetadata()) + .value(), + grpc_status_code::GRPC_STATUS_OK); + return absl::OkStatus(); + }), + // Add second stream with call_args into client transport. + Seq(Join(client_transport_->AddStream( + std::move(second_stream_args)), + Seq(Wait(), + [this] { + // Wait until first stream read finished to start + // the second read. + read_callback[1](absl::OkStatus()); + return absl::OkStatus(); + })), + [](std::tuple<absl::StatusOr<ServerMetadataHandle>, + absl::Status> + ret) { + // AddStream finish with trailers "grpc-status:0". + EXPECT_EQ(std::get<0>(ret) + .value() + ->get(GrpcStatusMetadata()) + .value(), + grpc_status_code::GRPC_STATUS_OK); + return absl::OkStatus(); + }), + // Receive first stream's messages from control/data endpoints. + Seq( + // Receive server initial metadata. + Map(pipe_server_intial_metadata_.receiver.Next(), + [](NextResult<ServerMetadataHandle> r) { + // Expect value: ":path: /demo.Service/Step" + EXPECT_TRUE(r.has_value()); + EXPECT_EQ(r.value() + ->get_pointer(HttpPathMetadata()) + ->as_string_view(), + "/demo.Service/Step"); + return absl::OkStatus(); + }), + // Receive server to client messages. + Map(pipe_server_to_client_messages_.receiver.Next(), + [this](NextResult<MessageHandle> r) { + EXPECT_TRUE(r.has_value()); + EXPECT_EQ(r.value()->payload()->JoinIntoString(), + message); + return absl::OkStatus(); + }), + [this] { + // Wake up the sencond stream read after first stream read + // finished. + Wakeup(); + // Close pipes after receive message. + pipe_server_to_client_messages_.sender.Close(); + pipe_server_intial_metadata_.sender.Close(); + return absl::OkStatus(); + }), + // Receive second stream's messages from control/data endpoints. + Seq( + // Receive server initial metadata. + Map(pipe_server_intial_metadata_second_.receiver.Next(), + [](NextResult<ServerMetadataHandle> r) { + // Expect value: ":path: /demo.Service/Step" + EXPECT_TRUE(r.has_value()); + EXPECT_EQ(r.value() + ->get_pointer(HttpPathMetadata()) + ->as_string_view(), + "/demo.Service/Step"); + return absl::OkStatus(); + }), + // Receive server to client messages. + Map(pipe_server_to_client_messages_second_.receiver.Next(), + [this](NextResult<MessageHandle> r) { + EXPECT_TRUE(r.has_value()); + EXPECT_EQ(r.value()->payload()->JoinIntoString(), + message); + return absl::OkStatus(); + }), + [this] { + // Close pipes after receive message. + pipe_server_to_client_messages_second_.sender.Close(); + pipe_server_intial_metadata_second_.sender.Close(); + return absl::OkStatus(); + })), // Once complete, verify successful sending and the received value. [](const std::tuple<absl::Status, absl::Status, absl::Status, - absl::Status>& ret) { + absl::Status, absl::Status, absl::Status>& ret) { EXPECT_TRUE(std::get<0>(ret).ok()); EXPECT_TRUE(std::get<1>(ret).ok()); EXPECT_TRUE(std::get<2>(ret).ok()); EXPECT_TRUE(std::get<3>(ret).ok()); + EXPECT_TRUE(std::get<4>(ret).ok()); + EXPECT_TRUE(std::get<5>(ret).ok()); return absl::OkStatus(); }), - InlineWakeupScheduler(), + EventEngineWakeupScheduler( + std::static_pointer_cast< + grpc_event_engine::experimental::EventEngine>(event_engine_)), [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); // Wait until ClientTransport's internal activities to finish. event_engine_->TickUntilIdle(); @@ -321,24 +723,35 @@ TEST_F(ClientTransportTest, AddMultipleStreams) { } TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) { + InitialClientTransport(2); auto messages = CreateMessages(6); - ClientMetadataHandle md; - auto first_stream_args = CallArgs{ - std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, - nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; - auto second_stream_args = CallArgs{ - std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, - nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr}; + ClientMetadataHandle first_stream_md; + ClientMetadataHandle second_stream_md; + auto first_stream_args = + CallArgs{std::move(first_stream_md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_.sender, + &pipe_client_to_server_messages_.receiver, + &pipe_server_to_client_messages_.sender}; + auto second_stream_args = + CallArgs{std::move(second_stream_md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_second_.sender, + &pipe_client_to_server_messages_second_.receiver, + &pipe_server_to_client_messages_second_.sender}; StrictMock<MockFunction<void(absl::Status)>> on_done; EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(control_endpoint_, Write).Times(6).WillRepeatedly(Return(true)); EXPECT_CALL(data_endpoint_, Write).Times(6).WillRepeatedly(Return(true)); auto activity = MakeActivity( Seq( - // Concurrently: send messages into the pipe, and receive from the - // pipe. + // Concurrently: write and read messages in client transport. Join( - // Send messages to first stream pipe. + // Send messages to first stream's + // call_args.client_to_server_messages pipe, which will be + // eventually sent to control/data endpoints. Seq(pipe_client_to_server_messages_.sender.Push( std::move(messages[0])), pipe_client_to_server_messages_.sender.Push( @@ -349,7 +762,9 @@ TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) { pipe_client_to_server_messages_.sender.Close(); return absl::OkStatus(); }), - // Send messages to second stream pipe. + // Send messages to second stream's + // call_args.client_to_server_messages pipe, which will be + // eventually sent to control/data endpoints. Seq(pipe_client_to_server_messages_second_.sender.Push( std::move(messages[3])), pipe_client_to_server_messages_second_.sender.Push( @@ -360,20 +775,116 @@ TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) { pipe_client_to_server_messages_second_.sender.Close(); return absl::OkStatus(); }), - // Receive messages from first stream pipe. - client_transport_.AddStream(std::move(first_stream_args)), - // Receive messages from second stream pipe. - client_transport_.AddStream(std::move(second_stream_args))), + // Add first stream with call_args into client transport. + Seq(Join(client_transport_->AddStream( + std::move(first_stream_args)), + [this] { + read_callback[0](absl::OkStatus()); + return absl::OkStatus(); + }), + [](std::tuple<absl::StatusOr<ServerMetadataHandle>, + absl::Status> + ret) { + // AddStream finish with trailers "grpc-status:0". + EXPECT_EQ(std::get<0>(ret) + .value() + ->get(GrpcStatusMetadata()) + .value(), + grpc_status_code::GRPC_STATUS_OK); + return absl::OkStatus(); + }), + // Add second stream with call_args into client transport. + Seq(Join(client_transport_->AddStream( + std::move(second_stream_args)), + Seq(Wait(), + [this] { + // Wait until first stream read finished to start + // the second read. + read_callback[1](absl::OkStatus()); + return absl::OkStatus(); + })), + [](std::tuple<absl::StatusOr<ServerMetadataHandle>, + absl::Status> + ret) { + // AddStream finish with trailers "grpc-status:0". + EXPECT_EQ(std::get<0>(ret) + .value() + ->get(GrpcStatusMetadata()) + .value(), + grpc_status_code::GRPC_STATUS_OK); + return absl::OkStatus(); + }), + // Receive first stream's messages from control/data endpoints. + Seq( + // Receive server initial metadata. + Map(pipe_server_intial_metadata_.receiver.Next(), + [](NextResult<ServerMetadataHandle> r) { + // Expect value: ":path: /demo.Service/Step" + EXPECT_TRUE(r.has_value()); + EXPECT_EQ(r.value() + ->get_pointer(HttpPathMetadata()) + ->as_string_view(), + "/demo.Service/Step"); + return absl::OkStatus(); + }), + // Receive server to client messages. + Map(pipe_server_to_client_messages_.receiver.Next(), + [this](NextResult<MessageHandle> r) { + EXPECT_TRUE(r.has_value()); + EXPECT_EQ(r.value()->payload()->JoinIntoString(), + message); + return absl::OkStatus(); + }), + [this] { + // Wake up the sencond stream read after first stream read + // finished. + Wakeup(); + // Close pipes after receive message. + pipe_server_to_client_messages_.sender.Close(); + pipe_server_intial_metadata_.sender.Close(); + return absl::OkStatus(); + }), + // Receive second stream's messages from control/data endpoints. + Seq( + // Receive server initial metadata. + Map(pipe_server_intial_metadata_second_.receiver.Next(), + [](NextResult<ServerMetadataHandle> r) { + // Expect value: ":path: /demo.Service/Step" + EXPECT_TRUE(r.has_value()); + EXPECT_EQ(r.value() + ->get_pointer(HttpPathMetadata()) + ->as_string_view(), + "/demo.Service/Step"); + return absl::OkStatus(); + }), + // Receive server to client messages. + Map(pipe_server_to_client_messages_second_.receiver.Next(), + [this](NextResult<MessageHandle> r) { + EXPECT_TRUE(r.has_value()); + EXPECT_EQ(r.value()->payload()->JoinIntoString(), + message); + return absl::OkStatus(); + }), + [this] { + // Close pipes after receive message. + pipe_server_to_client_messages_second_.sender.Close(); + pipe_server_intial_metadata_second_.sender.Close(); + return absl::OkStatus(); + })), // Once complete, verify successful sending and the received value. [](const std::tuple<absl::Status, absl::Status, absl::Status, - absl::Status>& ret) { + absl::Status, absl::Status, absl::Status>& ret) { EXPECT_TRUE(std::get<0>(ret).ok()); EXPECT_TRUE(std::get<1>(ret).ok()); EXPECT_TRUE(std::get<2>(ret).ok()); EXPECT_TRUE(std::get<3>(ret).ok()); + EXPECT_TRUE(std::get<4>(ret).ok()); + EXPECT_TRUE(std::get<5>(ret).ok()); return absl::OkStatus(); }), - InlineWakeupScheduler(), + EventEngineWakeupScheduler( + std::static_pointer_cast< + grpc_event_engine::experimental::EventEngine>(event_engine_)), [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); // Wait until ClientTransport's internal activities to finish. event_engine_->TickUntilIdle(); diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 4ebf1ca5a1e..284de928758 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -2189,8 +2189,7 @@ "ci_platforms": [ "linux", "mac", - "posix", - "windows" + "posix" ], "cpu_cost": 1.0, "exclude_configs": [], @@ -2202,8 +2201,7 @@ "platforms": [ "linux", "mac", - "posix", - "windows" + "posix" ], "uses_polling": false },