[chaotic-good] Initial change of chaotic-good client-read path. (#34191)

This is the initial change of chaotic-good client transport read path,
which is a following PR of the client transport write path at #33876.
There's a pending work of handling endpoint failures in the transport.
It will be added after we have the inter-activity pipe with close
function.
<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/34635/head
nanahpang 1 year ago committed by GitHub
parent cd21b57af0
commit ce75ec23a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      CMakeLists.txt
  2. 7
      build_autogenerated.yaml
  3. 17
      src/core/BUILD
  4. 109
      src/core/ext/transport/chaotic_good/client_transport.cc
  5. 91
      src/core/ext/transport/chaotic_good/client_transport.h
  6. 13
      src/core/ext/transport/chaotic_good/frame.cc
  7. 1
      src/core/ext/transport/chaotic_good/frame.h
  8. 4
      src/core/ext/transport/chaotic_good/frame_header.h
  9. 8
      src/core/lib/transport/promise_endpoint.cc
  10. 8
      test/core/transport/chaotic_good/BUILD
  11. 685
      test/core/transport/chaotic_good/client_transport_test.cc
  12. 6
      tools/run_tests/generated/tests.json

4
CMakeLists.txt generated

@ -948,7 +948,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx client_ssl_test) add_dependencies(buildtests_cxx client_ssl_test)
endif() endif()
add_dependencies(buildtests_cxx client_streaming_test) add_dependencies(buildtests_cxx client_streaming_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx client_transport_test) add_dependencies(buildtests_cxx client_transport_test)
endif()
add_dependencies(buildtests_cxx cmdline_test) add_dependencies(buildtests_cxx cmdline_test)
add_dependencies(buildtests_cxx codegen_test_full) add_dependencies(buildtests_cxx codegen_test_full)
add_dependencies(buildtests_cxx codegen_test_minimal) add_dependencies(buildtests_cxx codegen_test_minimal)
@ -9317,6 +9319,7 @@ target_link_libraries(client_streaming_test
endif() endif()
if(gRPC_BUILD_TESTS) if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(client_transport_test add_executable(client_transport_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
@ -9358,6 +9361,7 @@ target_link_libraries(client_transport_test
) )
endif()
endif() endif()
if(gRPC_BUILD_TESTS) if(gRPC_BUILD_TESTS)

@ -7049,12 +7049,13 @@ targets:
- src/core/ext/transport/chaotic_good/frame_header.h - src/core/ext/transport/chaotic_good/frame_header.h
- src/core/lib/promise/detail/join_state.h - src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/event_engine_wakeup_scheduler.h - src/core/lib/promise/event_engine_wakeup_scheduler.h
- src/core/lib/promise/inter_activity_pipe.h
- src/core/lib/promise/join.h - src/core/lib/promise/join.h
- src/core/lib/promise/mpsc.h - src/core/lib/promise/mpsc.h
- src/core/lib/promise/try_join.h
- src/core/lib/promise/wait_set.h - src/core/lib/promise/wait_set.h
- src/core/lib/transport/promise_endpoint.h - src/core/lib/transport/promise_endpoint.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/promise/test_wakeup_schedulers.h
src: src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- src/core/ext/transport/chaotic_good/client_transport.cc - src/core/ext/transport/chaotic_good/client_transport.cc
@ -7067,6 +7068,10 @@ targets:
- gtest - gtest
- protobuf - protobuf
- grpc_test_util - grpc_test_util
platforms:
- linux
- posix
- mac
uses_polling: false uses_polling: false
- name: cmdline_test - name: cmdline_test
gtest: true gtest: true

@ -5948,6 +5948,7 @@ grpc_cc_library(
"arena", "arena",
"bitset", "bitset",
"chaotic_good_frame_header", "chaotic_good_frame_header",
"context",
"no_destruct", "no_destruct",
"slice", "slice",
"slice_buffer", "slice_buffer",
@ -6116,30 +6117,42 @@ grpc_cc_library(
], ],
external_deps = [ external_deps = [
"absl/base:core_headers", "absl/base:core_headers",
"absl/random",
"absl/random:bit_gen_ref",
"absl/status", "absl/status",
"absl/status:statusor", "absl/status:statusor",
"absl/types:optional",
"absl/types:variant", "absl/types:variant",
], ],
language = "c++", language = "c++",
deps = [ deps = [
"activity", "activity",
"arena",
"chaotic_good_frame", "chaotic_good_frame",
"chaotic_good_frame_header", "chaotic_good_frame_header",
"event_engine_wakeup_scheduler", "event_engine_wakeup_scheduler",
"for_each", "for_each",
"grpc_promise_endpoint", "grpc_promise_endpoint",
"join", "if",
"inter_activity_pipe",
"loop", "loop",
"match", "match",
"memory_quota",
"mpsc", "mpsc",
"pipe", "pipe",
"seq", "poll",
"resource_quota",
"slice", "slice",
"slice_buffer", "slice_buffer",
"try_join",
"try_seq",
"//:exec_ctx",
"//:gpr", "//:gpr",
"//:gpr_platform", "//:gpr_platform",
"//:grpc_base", "//:grpc_base",
"//:hpack_encoder", "//:hpack_encoder",
"//:hpack_parser",
"//:ref_counted_ptr",
], ],
) )

@ -20,6 +20,8 @@
#include <string> #include <string>
#include <tuple> #include <tuple>
#include "absl/random/bit_gen_ref.h"
#include "absl/random/random.h"
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
@ -30,10 +32,14 @@
#include "src/core/ext/transport/chaotic_good/frame_header.h" #include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h" #include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/loop.h" #include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/try_join.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
@ -52,9 +58,14 @@ ClientTransport::ClientTransport(
control_endpoint_write_buffer_(SliceBuffer()), control_endpoint_write_buffer_(SliceBuffer()),
data_endpoint_write_buffer_(SliceBuffer()), data_endpoint_write_buffer_(SliceBuffer()),
hpack_compressor_(std::make_unique<HPackCompressor>()), hpack_compressor_(std::make_unique<HPackCompressor>()),
hpack_parser_(std::make_unique<HPackParser>()),
memory_allocator_(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
"client_transport")),
arena_(MakeScopedArena(1024, &memory_allocator_)),
event_engine_(event_engine) { event_engine_(event_engine) {
auto write_loop = Loop([this] { auto write_loop = Loop([this] {
return Seq( return TrySeq(
// Get next outgoing frame. // Get next outgoing frame.
this->outgoing_frames_.Next(), this->outgoing_frames_.Next(),
// Construct data buffers that will be sent to the endpoints. // Construct data buffers that will be sent to the endpoints.
@ -71,6 +82,8 @@ ClientTransport::ClientTransport(
control_endpoint_write_buffer_.c_slice_buffer() control_endpoint_write_buffer_.c_slice_buffer()
->slices[0]))) ->slices[0])))
.value(); .value();
// TODO(ladynana): add message_padding calculation by
// accumulating bytes sent.
std::string message_padding(frame_header.message_padding, std::string message_padding(frame_header.message_padding,
'0'); '0');
Slice slice(grpc_slice_from_cpp_string(message_padding)); Slice slice(grpc_slice_from_cpp_string(message_padding));
@ -90,20 +103,16 @@ ClientTransport::ClientTransport(
}, },
// Write buffers to corresponding endpoints concurrently. // Write buffers to corresponding endpoints concurrently.
[this]() { [this]() {
return Join(this->control_endpoint_->Write( return TryJoin(
control_endpoint_->Write(
std::move(control_endpoint_write_buffer_)), std::move(control_endpoint_write_buffer_)),
this->data_endpoint_->Write( data_endpoint_->Write(std::move(data_endpoint_write_buffer_)));
std::move(data_endpoint_write_buffer_)));
}, },
// Finish writes and return status. // Finish writes to difference endpoints and continue the loop.
[](std::tuple<absl::Status, absl::Status> ret) []() -> LoopCtl<absl::Status> {
-> LoopCtl<absl::Status> { // The write failures will be caught in TrySeq and exit loop.
// If writes failed, return failure status. // Therefore, only need to return Continue() in the last lambda
if (!(std::get<0>(ret).ok() || std::get<1>(ret).ok())) { // function.
// TODO(ladynana): handle the promise endpoint write failures with
// closing the transport.
return absl::InternalError("Promise endpoint writes failed.");
}
return Continue(); return Continue();
}); });
}); });
@ -113,7 +122,79 @@ ClientTransport::ClientTransport(
[](absl::Status status) { [](absl::Status status) {
GPR_ASSERT(status.code() == absl::StatusCode::kCancelled || GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
status.code() == absl::StatusCode::kInternal); status.code() == absl::StatusCode::kInternal);
// TODO(ladynana): handle the promise endpoint write failures with
// outgoing_frames.close() once available.
},
// Hold Arena in activity for GetContext<Arena> usage.
arena_.get());
auto read_loop = Loop([this] {
return TrySeq(
// Read frame header from control endpoint.
// TODO(ladynana): remove memcpy in ReadSlice.
this->control_endpoint_->ReadSlice(FrameHeader::frame_header_size_),
// Read different parts of the server frame from control/data endpoints
// based on frame header.
[this](Slice read_buffer) mutable {
frame_header_ = std::make_shared<FrameHeader>(
FrameHeader::Parse(
reinterpret_cast<const uint8_t*>(
GRPC_SLICE_START_PTR(read_buffer.c_slice())))
.value());
// Read header and trailers from control endpoint.
// Read message padding and message from data endpoint.
return TryJoin(
control_endpoint_->Read(frame_header_->GetFrameLength()),
data_endpoint_->Read(frame_header_->message_padding +
frame_header_->message_length));
},
// Construct and send the server frame to corresponding stream.
[this](std::tuple<SliceBuffer, SliceBuffer> ret) mutable {
control_endpoint_read_buffer_ = std::move(std::get<0>(ret));
// Discard message padding and only keep message in data read buffer.
std::get<1>(ret).MoveLastNBytesIntoSliceBuffer(
frame_header_->message_length, data_endpoint_read_buffer_);
ServerFragmentFrame frame;
// Initialized to get this_cpu() info in global_stat().
ExecCtx exec_ctx;
// Deserialize frame from read buffer.
absl::BitGen bitgen;
auto status = frame.Deserialize(hpack_parser_.get(), *frame_header_,
absl::BitGenRef(bitgen),
control_endpoint_read_buffer_);
GPR_ASSERT(status.ok());
// Move message into frame.
frame.message = arena_->MakePooled<Message>(
std::move(data_endpoint_read_buffer_), 0);
std::shared_ptr<
InterActivityPipe<ServerFrame, server_frame_queue_size_>::Sender>
sender;
{
MutexLock lock(&mu_);
sender = stream_map_[frame.stream_id];
}
return sender->Push(ServerFrame(std::move(frame)));
},
// Check if send frame to corresponding stream successfully.
[](bool ret) -> LoopCtl<absl::Status> {
if (ret) {
// Send incoming frames successfully.
return Continue();
} else {
return absl::InternalError("Send incoming frames failed.");
}
}); });
});
reader_ = MakeActivity(
// Continuously read next incoming frames from promise endpoints.
std::move(read_loop), EventEngineWakeupScheduler(event_engine_),
[](absl::Status status) {
GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
status.code() == absl::StatusCode::kInternal);
// TODO(ladynana): handle the promise endpoint read failures with
// iterating stream_map_ and close all the pipes once available.
},
// Hold Arena in activity for GetContext<Arena> usage.
arena_.get());
} }
} // namespace chaotic_good } // namespace chaotic_good

@ -19,25 +19,40 @@
#include <stdint.h> #include <stdint.h>
#include <cstddef>
#include <initializer_list> // IWYU pragma: keep #include <initializer_list> // IWYU pragma: keep
#include <map>
#include <memory> #include <memory>
#include <tuple>
#include <type_traits> #include <type_traits>
#include <utility> #include <utility>
#include "absl/base/thread_annotations.h" #include "absl/base/thread_annotations.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h" #include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/support/log.h>
#include "src/core/ext/transport/chaotic_good/frame.h" #include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/for_each.h" #include "src/core/lib/promise/for_each.h"
#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/inter_activity_pipe.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/mpsc.h" #include "src/core/lib/promise/mpsc.h"
#include "src/core/lib/promise/pipe.h" #include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h" #include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/try_join.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/promise_endpoint.h" #include "src/core/lib/transport/promise_endpoint.h"
#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport.h"
@ -55,17 +70,29 @@ class ClientTransport {
if (writer_ != nullptr) { if (writer_ != nullptr) {
writer_.reset(); writer_.reset();
} }
if (reader_ != nullptr) {
reader_.reset();
}
} }
auto AddStream(CallArgs call_args) { auto AddStream(CallArgs call_args) {
// At this point, the connection is set up. // At this point, the connection is set up.
// Start sending data frames. // Start sending data frames.
uint64_t stream_id; uint64_t stream_id;
InterActivityPipe<ServerFrame, server_frame_queue_size_> server_frames;
{ {
MutexLock lock(&mu_); MutexLock lock(&mu_);
stream_id = next_stream_id_++; stream_id = next_stream_id_++;
stream_map_.insert(
std::pair<uint32_t,
std::shared_ptr<InterActivityPipe<
ServerFrame, server_frame_queue_size_>::Sender>>(
stream_id, std::make_shared<InterActivityPipe<
ServerFrame, server_frame_queue_size_>::Sender>(
std::move(server_frames.sender))));
} }
return Seq( return TrySeq(
// Continuously send data frame with client to server messages. TryJoin(
// Continuously send client frame with client to server messages.
ForEach(std::move(*call_args.client_to_server_messages), ForEach(std::move(*call_args.client_to_server_messages),
[stream_id, initial_frame = true, [stream_id, initial_frame = true,
client_initial_metadata = client_initial_metadata =
@ -80,7 +107,7 @@ class ClientTransport {
frame.headers = std::move(client_initial_metadata); frame.headers = std::move(client_initial_metadata);
initial_frame = false; initial_frame = false;
} }
return Seq( return TrySeq(
outgoing_frames.Send(ClientFrame(std::move(frame))), outgoing_frames.Send(ClientFrame(std::move(frame))),
[](bool success) -> absl::Status { [](bool success) -> absl::Status {
if (!success) { if (!success) {
@ -89,22 +116,78 @@ class ClientTransport {
} }
return absl::OkStatus(); return absl::OkStatus();
}); });
}),
// Continuously receive server frames from endpoints and save
// results to call_args.
Loop([server_initial_metadata = call_args.server_initial_metadata,
server_to_client_messages =
call_args.server_to_client_messages,
receiver = std::move(server_frames.receiver)]() mutable {
return TrySeq(
// Receive incoming server frame.
receiver.Next(),
// Save incomming frame results to call_args.
[server_initial_metadata, server_to_client_messages](
absl::optional<ServerFrame> server_frame) mutable {
GPR_ASSERT(server_frame.has_value());
auto frame = std::move(
absl::get<ServerFragmentFrame>(*server_frame));
return TrySeq(
If((frame.headers != nullptr),
[server_initial_metadata,
headers = std::move(frame.headers)]() mutable {
return server_initial_metadata->Push(
std::move(headers));
},
[] { return false; }),
If((frame.message != nullptr),
[server_to_client_messages,
message = std::move(frame.message)]() mutable {
return server_to_client_messages->Push(
std::move(message));
},
[] { return false; }),
If((frame.trailers != nullptr),
[trailers = std::move(frame.trailers)]() mutable
-> LoopCtl<ServerMetadataHandle> {
return std::move(trailers);
},
[]() -> LoopCtl<ServerMetadataHandle> {
return Continue();
})); }));
});
})),
[](std::tuple<Empty, ServerMetadataHandle> ret) {
return std::move(std::get<1>(ret));
});
} }
private: private:
// Max buffer is set to 4, so that for stream writes each time it will queue // Max buffer is set to 4, so that for stream writes each time it will queue
// at most 2 frames. // at most 2 frames.
MpscReceiver<ClientFrame> outgoing_frames_; MpscReceiver<ClientFrame> outgoing_frames_;
// Queue size of each stream pipe is set to 2, so that for each stream read it
// will queue at most 2 frames.
static const size_t server_frame_queue_size_ = 2;
Mutex mu_; Mutex mu_;
uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1; uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1;
// Map of stream incoming server frames, key is stream_id.
std::map<uint32_t, std::shared_ptr<InterActivityPipe<
ServerFrame, server_frame_queue_size_>::Sender>>
stream_map_ ABSL_GUARDED_BY(mu_);
ActivityPtr writer_; ActivityPtr writer_;
ActivityPtr reader_; ActivityPtr reader_;
std::unique_ptr<PromiseEndpoint> control_endpoint_; std::unique_ptr<PromiseEndpoint> control_endpoint_;
std::unique_ptr<PromiseEndpoint> data_endpoint_; std::unique_ptr<PromiseEndpoint> data_endpoint_;
SliceBuffer control_endpoint_write_buffer_; SliceBuffer control_endpoint_write_buffer_;
SliceBuffer data_endpoint_write_buffer_; SliceBuffer data_endpoint_write_buffer_;
SliceBuffer control_endpoint_read_buffer_;
SliceBuffer data_endpoint_read_buffer_;
std::unique_ptr<HPackCompressor> hpack_compressor_; std::unique_ptr<HPackCompressor> hpack_compressor_;
std::unique_ptr<HPackParser> hpack_parser_;
std::shared_ptr<FrameHeader> frame_header_;
MemoryAllocator memory_allocator_;
ScopedArenaPtr arena_;
// Use to synchronize writer_ and reader_ activity with outside activities; // Use to synchronize writer_ and reader_ activity with outside activities;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_; std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
}; };

@ -31,6 +31,8 @@
#include "src/core/lib/gprpp/bitset.h" #include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/gprpp/no_destruct.h" #include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_buffer.h"
@ -116,7 +118,9 @@ absl::StatusOr<Arena::PoolPtr<Metadata>> ReadMetadata(
absl::BitGenRef bitsrc) { absl::BitGenRef bitsrc) {
if (!maybe_slices.ok()) return maybe_slices.status(); if (!maybe_slices.ok()) return maybe_slices.status();
auto& slices = *maybe_slices; auto& slices = *maybe_slices;
Arena::PoolPtr<Metadata> metadata; auto arena = GetContext<Arena>();
GPR_ASSERT(arena != nullptr);
Arena::PoolPtr<Metadata> metadata = arena->MakePooled<Metadata>(arena);
parser->BeginFrame( parser->BeginFrame(
metadata.get(), std::numeric_limits<uint32_t>::max(), metadata.get(), std::numeric_limits<uint32_t>::max(),
std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(),
@ -212,11 +216,18 @@ absl::Status ServerFragmentFrame::Deserialize(HPackParser* parser,
ReadMetadata<ServerMetadata>(parser, deserializer.ReceiveHeaders(), ReadMetadata<ServerMetadata>(parser, deserializer.ReceiveHeaders(),
header.stream_id, true, false, bitsrc); header.stream_id, true, false, bitsrc);
if (!r.ok()) return r.status(); if (!r.ok()) return r.status();
if (r.value() != nullptr) {
headers = std::move(r.value());
}
} }
if (header.flags.is_set(1)) { if (header.flags.is_set(1)) {
auto r = auto r =
ReadMetadata<ServerMetadata>(parser, deserializer.ReceiveTrailers(), ReadMetadata<ServerMetadata>(parser, deserializer.ReceiveTrailers(),
header.stream_id, false, false, bitsrc); header.stream_id, false, false, bitsrc);
if (!r.ok()) return r.status();
if (r.value() != nullptr) {
trailers = std::move(r.value());
}
} }
return deserializer.Finish(); return deserializer.Finish();
} }

@ -96,6 +96,7 @@ struct ServerFragmentFrame final : public FrameInterface {
uint32_t stream_id; uint32_t stream_id;
ServerMetadataHandle headers; ServerMetadataHandle headers;
MessageHandle message;
ServerMetadataHandle trailers; ServerMetadataHandle trailers;
bool operator==(const ServerFragmentFrame& other) const { bool operator==(const ServerFragmentFrame& other) const {

@ -17,6 +17,8 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <stddef.h>
#include <cstdint> #include <cstdint>
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
@ -55,6 +57,8 @@ struct FrameHeader {
message_padding == h.message_padding && message_padding == h.message_padding &&
trailer_length == h.trailer_length; trailer_length == h.trailer_length;
} }
// Frame header size is fixed to 24 bytes.
static constexpr size_t frame_header_size_ = 24;
}; };
} // namespace chaotic_good } // namespace chaotic_good

@ -46,10 +46,10 @@ PromiseEndpoint::PromiseEndpoint(
} }
PromiseEndpoint::~PromiseEndpoint() { PromiseEndpoint::~PromiseEndpoint() {
// Last write result has not been polled. // Promise endpoint close when last write result has not been polled.
GPR_ASSERT(!write_result_.has_value()); write_result_.reset();
// Last read result has not been polled. // Promise endpoint close when last read result has not been polled.
GPR_ASSERT(!read_result_.has_value()); read_result_.reset();
} }
const grpc_event_engine::experimental::EventEngine::ResolvedAddress& const grpc_event_engine::experimental::EventEngine::ResolvedAddress&

@ -87,20 +87,27 @@ grpc_cc_test(
srcs = ["client_transport_test.cc"], srcs = ["client_transport_test.cc"],
external_deps = [ external_deps = [
"absl/functional:any_invocable", "absl/functional:any_invocable",
"absl/status:statusor",
"absl/strings:str_format", "absl/strings:str_format",
"absl/types:optional",
"gtest", "gtest",
], ],
language = "C++", language = "C++",
# TODO(ladynana): remove the no_windows tag.
tags = ["no_windows"],
uses_event_engine = False, uses_event_engine = False,
uses_polling = False, uses_polling = False,
deps = [ deps = [
"//:grpc", "//:grpc",
"//:grpc_public_hdrs",
"//:iomgr_timer", "//:iomgr_timer",
"//:ref_counted_ptr", "//:ref_counted_ptr",
"//src/core:activity", "//src/core:activity",
"//src/core:arena", "//src/core:arena",
"//src/core:chaotic_good_client_transport", "//src/core:chaotic_good_client_transport",
"//src/core:event_engine_wakeup_scheduler",
"//src/core:join", "//src/core:join",
"//src/core:map",
"//src/core:memory_quota", "//src/core:memory_quota",
"//src/core:pipe", "//src/core:pipe",
"//src/core:resource_quota", "//src/core:resource_quota",
@ -109,6 +116,5 @@ grpc_cc_test(
"//src/core:slice_buffer", "//src/core:slice_buffer",
"//test/core/event_engine/fuzzing_event_engine", "//test/core/event_engine/fuzzing_event_engine",
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto", "//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
"//test/core/promise:test_wakeup_schedulers",
], ],
) )

@ -16,27 +16,32 @@
// IWYU pragma: no_include <sys/socket.h> // IWYU pragma: no_include <sys/socket.h>
#include <stdio.h>
#include <algorithm> // IWYU pragma: keep #include <algorithm> // IWYU pragma: keep
#include <memory> #include <memory>
#include <string> // IWYU pragma: keep
#include <tuple> #include <tuple>
#include <vector> // IWYU pragma: keep #include <vector> // IWYU pragma: keep
#include "absl/functional/any_invocable.h" #include "absl/functional/any_invocable.h"
#include "absl/status/statusor.h" // IWYU pragma: keep
#include "absl/strings/str_format.h" // IWYU pragma: keep #include "absl/strings/str_format.h" // IWYU pragma: keep
#include "absl/types/optional.h" // IWYU pragma: keep
#include "gmock/gmock.h" #include "gmock/gmock.h"
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h> #include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice.h> // IWYU pragma: keep
#include <grpc/event_engine/slice_buffer.h> #include <grpc/event_engine/slice_buffer.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/status.h> // IWYU pragma: keep
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/timer_manager.h" #include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
#include "src/core/lib/promise/join.h" #include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h" #include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h" #include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/resource_quota/arena.h"
@ -44,12 +49,14 @@
#include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h" // IWYU pragma: keep
#include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/promise/test_wakeup_schedulers.h"
using testing::MockFunction; using testing::MockFunction;
using testing::Return; using testing::Return;
using testing::Sequence;
using testing::StrictMock; using testing::StrictMock;
using testing::WithArgs; using testing::WithArgs;
@ -103,19 +110,117 @@ class ClientTransportTest : public ::testing::Test {
return options; return options;
}(), }(),
fuzzing_event_engine::Actions())), fuzzing_event_engine::Actions())),
client_transport_( arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)),
pipe_client_to_server_messages_(arena_.get()),
pipe_server_to_client_messages_(arena_.get()),
pipe_server_intial_metadata_(arena_.get()),
pipe_client_to_server_messages_second_(arena_.get()),
pipe_server_to_client_messages_second_(arena_.get()),
pipe_server_intial_metadata_second_(arena_.get()) {}
// Expect how client transport will read from control/data endpoints with a
// test frame.
void AddReadExpectations(int num_of_streams) {
for (int i = 0; i < num_of_streams; i++) {
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence)
.WillOnce(WithArgs<0, 1>(
[this, i](absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer*
buffer) mutable {
// Construct test frame for EventEngine read: headers (15
// bytes), message(16 bytes), message padding (48 byte),
// trailers (15 bytes).
const std::string frame_header = {
static_cast<char>(0x80), // frame type = fragment
0x03, // flag = has header + has trailer
0x00,
0x00,
static_cast<char>(i + 1), // stream id = 1
0x00,
0x00,
0x00,
0x1a, // header length = 26
0x00,
0x00,
0x00,
0x08, // message length = 8
0x00,
0x00,
0x00,
0x38, // message padding =56
0x00,
0x00,
0x00,
0x0f, // trailer length = 15
0x00,
0x00,
0x00};
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(frame_header));
buffer->Append(std::move(slice));
// Execute read callback later to control when read starts.
read_callback.push_back(std::move(on_read));
// Return false to mock EventEngine read not finish.
return false;
}));
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence)
.WillOnce(WithArgs<1>(
[](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Encoded string of header ":path: /demo.Service/Step".
const std::string header = {
0x10, 0x05, 0x3a, 0x70, 0x61, 0x74, 0x68, 0x12, 0x2f,
0x64, 0x65, 0x6d, 0x6f, 0x2e, 0x53, 0x65, 0x72, 0x76,
0x69, 0x63, 0x65, 0x2f, 0x53, 0x74, 0x65, 0x70};
// Encoded string of trailer "grpc-status: 0".
const std::string trailers = {0x10, 0x0b, 0x67, 0x72, 0x70,
0x63, 0x2d, 0x73, 0x74, 0x61,
0x74, 0x75, 0x73, 0x01, 0x30};
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(header + trailers));
buffer->Append(std::move(slice));
return true;
}));
}
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence)
.WillOnce(Return(false));
for (int i = 0; i < num_of_streams; i++) {
EXPECT_CALL(data_endpoint_, Read)
.InSequence(data_endpoint_sequence)
.WillOnce(WithArgs<1>(
[this](grpc_event_engine::experimental::SliceBuffer* buffer) {
const std::string message_padding = {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(message_padding + message));
buffer->Append(std::move(slice));
return true;
}));
}
}
// Initial ClientTransport with read expecations
void InitialClientTransport(int num_of_streams) {
// Read expectaions need to be added before transport initialization since
// reader_ activity loop is started in ClientTransport initialization,
AddReadExpectations(num_of_streams);
client_transport_ = std::make_unique<ClientTransport>(
std::make_unique<PromiseEndpoint>( std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(control_endpoint_ptr_), std::unique_ptr<MockEndpoint>(control_endpoint_ptr_),
SliceBuffer()), SliceBuffer()),
std::make_unique<PromiseEndpoint>( std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(data_endpoint_ptr_), std::unique_ptr<MockEndpoint>(data_endpoint_ptr_), SliceBuffer()),
SliceBuffer()), std::static_pointer_cast<grpc_event_engine::experimental::EventEngine>(
std::static_pointer_cast< event_engine_));
grpc_event_engine::experimental::EventEngine>(event_engine_)), }
arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)), // Create client to server test messages.
pipe_client_to_server_messages_(arena_.get()),
pipe_client_to_server_messages_second_(arena_.get()) {}
std::vector<MessageHandle> CreateMessages(int num_of_messages) { std::vector<MessageHandle> CreateMessages(int num_of_messages) {
std::vector<MessageHandle> messages; std::vector<MessageHandle> messages;
for (int i = 0; i < num_of_messages; i++) { for (int i = 0; i < num_of_messages; i++) {
@ -127,103 +232,238 @@ class ClientTransportTest : public ::testing::Test {
} }
return messages; return messages;
} }
// Wait for last stream read to finish.
auto Wait() {
return [this]() mutable -> Poll<Result> {
MutexLock lock(&mu_);
if (last_stream_read_done_) {
return Result{};
} else {
waker_ = Activity::current()->MakeNonOwningWaker();
return Pending();
}
};
}
// Wake up the pending Wait() promise.
void Wakeup() {
MutexLock lock(&mu_);
last_stream_read_done_ = true;
waker_.Wakeup();
}
private: private:
struct Result {};
Mutex mu_;
Waker waker_ ABSL_GUARDED_BY(mu_);
bool last_stream_read_done_ ABSL_GUARDED_BY(mu_) = false;
MockEndpoint* control_endpoint_ptr_; MockEndpoint* control_endpoint_ptr_;
MockEndpoint* data_endpoint_ptr_; MockEndpoint* data_endpoint_ptr_;
size_t initial_arena_size = 1024; size_t initial_arena_size = 1024;
MemoryAllocator memory_allocator_; MemoryAllocator memory_allocator_;
Sequence control_endpoint_sequence;
Sequence data_endpoint_sequence;
protected: protected:
MockEndpoint& control_endpoint_; MockEndpoint& control_endpoint_;
MockEndpoint& data_endpoint_; MockEndpoint& data_endpoint_;
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine> std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
event_engine_; event_engine_;
ClientTransport client_transport_; std::unique_ptr<ClientTransport> client_transport_;
ScopedArenaPtr arena_; ScopedArenaPtr arena_;
Pipe<MessageHandle> pipe_client_to_server_messages_; Pipe<MessageHandle> pipe_client_to_server_messages_;
Pipe<MessageHandle> pipe_server_to_client_messages_;
Pipe<ServerMetadataHandle> pipe_server_intial_metadata_;
// Added for mutliple streams tests. // Added for mutliple streams tests.
Pipe<MessageHandle> pipe_client_to_server_messages_second_; Pipe<MessageHandle> pipe_client_to_server_messages_second_;
Pipe<MessageHandle> pipe_server_to_client_messages_second_;
const absl::Status kDummyErrorStatus = Pipe<ServerMetadataHandle> pipe_server_intial_metadata_second_;
absl::ErrnoToStatus(5566, "just an error"); std::vector<absl::AnyInvocable<void(absl::Status)>> read_callback;
static constexpr size_t kDummyRequestSize = 5566u; // Added to verify received message payload.
const std::string message = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
}; };
TEST_F(ClientTransportTest, AddOneStream) { TEST_F(ClientTransportTest, AddOneStream) {
InitialClientTransport(1);
auto messages = CreateMessages(1); auto messages = CreateMessages(1);
ClientMetadataHandle md; ClientMetadataHandle md;
auto args = CallArgs{ auto args = CallArgs{std::move(md),
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, ClientInitialMetadataOutstandingToken::Empty(),
nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done; StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).WillOnce(Return(true)); EXPECT_CALL(control_endpoint_, Write).WillOnce(Return(true));
EXPECT_CALL(data_endpoint_, Write).WillOnce(Return(true)); EXPECT_CALL(data_endpoint_, Write).WillOnce(Return(true));
auto activity = MakeActivity( auto activity = MakeActivity(
Seq( Seq(
// Concurrently: send message into the pipe, and receive from the // Concurrently: write and read messages in client transport.
// pipe. Join(
Join(Seq(pipe_client_to_server_messages_.sender.Push( // Send messages to call_args.client_to_server_messages pipe,
// which will be eventually sent to control/data endpoints.
Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])), std::move(messages[0])),
[this] { [this] {
this->pipe_client_to_server_messages_.sender.Close(); pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
// Add first stream with call_args into client transport.
Seq(Join(client_transport_->AddStream(std::move(args)),
[this]() {
// Concurrently: start read from control endpoints.
read_callback[0](absl::OkStatus());
return absl::OkStatus();
}),
[](std::tuple<absl::StatusOr<ServerMetadataHandle>,
absl::Status>
ret) {
// AddStream will finish with server trailers:
// "grpc-status:0".
EXPECT_EQ(std::get<0>(ret)
.value()
->get(GrpcStatusMetadata())
.value(),
grpc_status_code::GRPC_STATUS_OK);
return absl::OkStatus();
}),
// Receive messages from control/data endpoints.
Seq(
// Receive server initial metadata.
Map(pipe_server_intial_metadata_.receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
// Expect value: ":path: /demo.Service/Step"
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return absl::OkStatus(); return absl::OkStatus();
}), }),
client_transport_.AddStream(std::move(args))), // Receive server to client messages.
Map(pipe_server_to_client_messages_.receiver.Next(),
[this](NextResult<MessageHandle> r) {
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()->payload()->JoinIntoString(),
message);
return absl::OkStatus();
}),
[this]() {
// Close pipes after receive message.
pipe_server_to_client_messages_.sender.Close();
pipe_server_intial_metadata_.sender.Close();
return absl::OkStatus();
})),
// Once complete, verify successful sending and the received value. // Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status>& ret) { [](const std::tuple<absl::Status, absl::Status, absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok()); EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok()); EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok());
return absl::OkStatus(); return absl::OkStatus();
}), }),
InlineWakeupScheduler(), EventEngineWakeupScheduler(
std::static_pointer_cast<
grpc_event_engine::experimental::EventEngine>(event_engine_)),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); }); [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish. // Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle(); event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks(); event_engine_->UnsetGlobalHooks();
} }
TEST_F(ClientTransportTest, AddOneStreamWithEEFailed) { TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) {
InitialClientTransport(1);
auto messages = CreateMessages(1); auto messages = CreateMessages(1);
ClientMetadataHandle md; ClientMetadataHandle md;
auto args = CallArgs{ auto args = CallArgs{std::move(md),
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, ClientInitialMetadataOutstandingToken::Empty(),
nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done; StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write) EXPECT_CALL(control_endpoint_, Write)
.WillOnce( .WillOnce(
WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) { WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(this->kDummyErrorStatus); on_write(absl::InternalError("control endpoint write failed."));
return false; return false;
})); }));
EXPECT_CALL(data_endpoint_, Write) EXPECT_CALL(data_endpoint_, Write)
.WillOnce( .WillOnce(
WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) { WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(this->kDummyErrorStatus); on_write(absl::InternalError("control endpoint write failed."));
return false; return false;
})); }));
auto activity = MakeActivity( auto activity = MakeActivity(
Seq( Seq(
// Concurrently: send message into the pipe, and receive from the // Concurrently: write and read messages in client transport.
// pipe. Join(
Join(Seq(pipe_client_to_server_messages_.sender.Push( // Send messages to call_args.client_to_server_messages pipe,
// which will be eventually sent to control/data endpoints.
Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])), std::move(messages[0])),
[this] { [this] {
this->pipe_client_to_server_messages_.sender.Close(); pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
// Add first stream with call_args into client transport.
Seq(Join(client_transport_->AddStream(std::move(args)),
[this]() {
// Start read from endpoints.
read_callback[0](absl::OkStatus());
return absl::OkStatus();
}),
[](std::tuple<absl::StatusOr<ServerMetadataHandle>,
absl::Status>
ret) {
// AddStream will finish with server trailers:
// "grpc-status:0".
EXPECT_EQ(std::get<0>(ret)
.value()
->get(GrpcStatusMetadata())
.value(),
grpc_status_code::GRPC_STATUS_OK);
return absl::OkStatus();
}),
// Receive messages from control/data endpoints.
Seq(
// Receive server initial metadata.
Map(pipe_server_intial_metadata_.receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
// Expect value: ":path: /demo.Service/Step"
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return absl::OkStatus(); return absl::OkStatus();
}), }),
client_transport_.AddStream(std::move(args))), // Receive server to client messages.
Map(pipe_server_to_client_messages_.receiver.Next(),
[this](NextResult<MessageHandle> r) {
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()->payload()->JoinIntoString(),
message);
return absl::OkStatus();
}),
[this] {
// Close pipes after receive message.
pipe_server_to_client_messages_.sender.Close();
pipe_server_intial_metadata_.sender.Close();
return absl::OkStatus();
})),
// Once complete, verify successful sending and the received value. // Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status>& ret) { [](const std::tuple<absl::Status, absl::Status, absl::Status>& ret) {
// TODO(ladynana): change these expectations to errors after the // TODO(ladynana): change these expectations to errors after the
// writer activity closes transport for EE failures. // writer activity closes transport for write failures.
EXPECT_TRUE(std::get<0>(ret).ok()); EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok()); EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok());
return absl::OkStatus(); return absl::OkStatus();
}), }),
InlineWakeupScheduler(), EventEngineWakeupScheduler(
std::static_pointer_cast<
grpc_event_engine::experimental::EventEngine>(event_engine_)),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); }); [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish. // Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle(); event_engine_->TickUntilIdle();
@ -231,37 +471,90 @@ TEST_F(ClientTransportTest, AddOneStreamWithEEFailed) {
} }
TEST_F(ClientTransportTest, AddOneStreamMultipleMessages) { TEST_F(ClientTransportTest, AddOneStreamMultipleMessages) {
InitialClientTransport(1);
auto messages = CreateMessages(3); auto messages = CreateMessages(3);
ClientMetadataHandle md; ClientMetadataHandle md;
auto args = CallArgs{ auto args = CallArgs{std::move(md),
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, ClientInitialMetadataOutstandingToken::Empty(),
nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done; StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).Times(3).WillRepeatedly(Return(true)); EXPECT_CALL(control_endpoint_, Write).Times(3).WillRepeatedly(Return(true));
EXPECT_CALL(data_endpoint_, Write).Times(3).WillRepeatedly(Return(true)); EXPECT_CALL(data_endpoint_, Write).Times(3).WillRepeatedly(Return(true));
auto activity = MakeActivity( auto activity = MakeActivity(
Seq( Seq(
// Concurrently: send messages into the pipe, and receive from the // Concurrently: write and read messages in client transport.
// pipe. Join(
Join(Seq(pipe_client_to_server_messages_.sender.Push( // Send messages to call_args.client_to_server_messages pipe,
// which will be eventually sent to control/data endpoints.
Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])), std::move(messages[0])),
pipe_client_to_server_messages_.sender.Push( pipe_client_to_server_messages_.sender.Push(
std::move(messages[1])), std::move(messages[1])),
pipe_client_to_server_messages_.sender.Push( pipe_client_to_server_messages_.sender.Push(
std::move(messages[2])), std::move(messages[2])),
[this] { [this] {
this->pipe_client_to_server_messages_.sender.Close(); pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
// Add first stream with call_args into client transport.
Seq(Join(client_transport_->AddStream(std::move(args)),
[this]() {
// Start read from endpoints.
read_callback[0](absl::OkStatus());
return absl::OkStatus();
}),
[](std::tuple<absl::StatusOr<ServerMetadataHandle>,
absl::Status>
ret) {
// AddStream finish with trailers "grpc-status:0".
EXPECT_EQ(std::get<0>(ret)
.value()
->get(GrpcStatusMetadata())
.value(),
grpc_status_code::GRPC_STATUS_OK);
return absl::OkStatus();
}),
// Receive messages from control/data endpoints.
Seq(
// Receive server initial metadata.
Map(pipe_server_intial_metadata_.receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
// Expect value: ":path: /demo.Service/Step"
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return absl::OkStatus();
}),
// Receive server to client messages.
Map(pipe_server_to_client_messages_.receiver.Next(),
[this](NextResult<MessageHandle> r) {
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()->payload()->JoinIntoString(),
message);
return absl::OkStatus(); return absl::OkStatus();
}), }),
client_transport_.AddStream(std::move(args))), [this] {
// Close pipes after receive message.
pipe_server_to_client_messages_.sender.Close();
pipe_server_intial_metadata_.sender.Close();
return absl::OkStatus();
})),
// Once complete, verify successful sending and the received value. // Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status>& ret) { [](const std::tuple<absl::Status, absl::Status, absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok()); EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok()); EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok());
return absl::OkStatus(); return absl::OkStatus();
}), }),
InlineWakeupScheduler(), EventEngineWakeupScheduler(
std::static_pointer_cast<
grpc_event_engine::experimental::EventEngine>(event_engine_)),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); }); [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish. // Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle(); event_engine_->TickUntilIdle();
@ -269,51 +562,160 @@ TEST_F(ClientTransportTest, AddOneStreamMultipleMessages) {
} }
TEST_F(ClientTransportTest, AddMultipleStreams) { TEST_F(ClientTransportTest, AddMultipleStreams) {
InitialClientTransport(2);
auto messages = CreateMessages(2); auto messages = CreateMessages(2);
ClientMetadataHandle md; ClientMetadataHandle first_stream_md;
auto first_stream_args = CallArgs{ ClientMetadataHandle second_stream_md;
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, auto first_stream_args =
nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; CallArgs{std::move(first_stream_md),
auto second_stream_args = CallArgs{ ClientInitialMetadataOutstandingToken::Empty(),
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, nullptr,
nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr}; &pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
auto second_stream_args =
CallArgs{std::move(second_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_second_.sender,
&pipe_client_to_server_messages_second_.receiver,
&pipe_server_to_client_messages_second_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done; StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).Times(2).WillRepeatedly(Return(true)); EXPECT_CALL(control_endpoint_, Write).Times(2).WillRepeatedly(Return(true));
EXPECT_CALL(data_endpoint_, Write).Times(2).WillRepeatedly(Return(true)); EXPECT_CALL(data_endpoint_, Write).Times(2).WillRepeatedly(Return(true));
auto activity = MakeActivity( auto activity = MakeActivity(
Seq( Seq(
// Concurrently: send messages into the pipe, and receive from the // Concurrently: write and read messages from client transport.
// pipe.
Join( Join(
// Send message to first stream pipe. // Send messages to first stream's
// call_args.client_to_server_messages pipe, which will be
// eventually sent to control/data endpoints.
Seq(pipe_client_to_server_messages_.sender.Push( Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])), std::move(messages[0])),
[this] { [this] {
pipe_client_to_server_messages_.sender.Close(); pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus(); return absl::OkStatus();
}), }),
// Send message to second stream pipe. // Send messages to second stream's
// call_args.client_to_server_messages pipe, which will be
// eventually sent to control/data endpoints.
Seq(pipe_client_to_server_messages_second_.sender.Push( Seq(pipe_client_to_server_messages_second_.sender.Push(
std::move(messages[1])), std::move(messages[1])),
[this] { [this] {
pipe_client_to_server_messages_second_.sender.Close(); pipe_client_to_server_messages_second_.sender.Close();
return absl::OkStatus(); return absl::OkStatus();
}), }),
// Receive message from first stream pipe. // Add first stream with call_args into client transport.
client_transport_.AddStream(std::move(first_stream_args)), Seq(Join(client_transport_->AddStream(
// Receive message from second stream pipe. std::move(first_stream_args)),
client_transport_.AddStream(std::move(second_stream_args))), [this] {
read_callback[0](absl::OkStatus());
return absl::OkStatus();
}),
[](std::tuple<absl::StatusOr<ServerMetadataHandle>,
absl::Status>
ret) {
// AddStream finish with trailers "grpc-status:0".
EXPECT_EQ(std::get<0>(ret)
.value()
->get(GrpcStatusMetadata())
.value(),
grpc_status_code::GRPC_STATUS_OK);
return absl::OkStatus();
}),
// Add second stream with call_args into client transport.
Seq(Join(client_transport_->AddStream(
std::move(second_stream_args)),
Seq(Wait(),
[this] {
// Wait until first stream read finished to start
// the second read.
read_callback[1](absl::OkStatus());
return absl::OkStatus();
})),
[](std::tuple<absl::StatusOr<ServerMetadataHandle>,
absl::Status>
ret) {
// AddStream finish with trailers "grpc-status:0".
EXPECT_EQ(std::get<0>(ret)
.value()
->get(GrpcStatusMetadata())
.value(),
grpc_status_code::GRPC_STATUS_OK);
return absl::OkStatus();
}),
// Receive first stream's messages from control/data endpoints.
Seq(
// Receive server initial metadata.
Map(pipe_server_intial_metadata_.receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
// Expect value: ":path: /demo.Service/Step"
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return absl::OkStatus();
}),
// Receive server to client messages.
Map(pipe_server_to_client_messages_.receiver.Next(),
[this](NextResult<MessageHandle> r) {
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()->payload()->JoinIntoString(),
message);
return absl::OkStatus();
}),
[this] {
// Wake up the sencond stream read after first stream read
// finished.
Wakeup();
// Close pipes after receive message.
pipe_server_to_client_messages_.sender.Close();
pipe_server_intial_metadata_.sender.Close();
return absl::OkStatus();
}),
// Receive second stream's messages from control/data endpoints.
Seq(
// Receive server initial metadata.
Map(pipe_server_intial_metadata_second_.receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
// Expect value: ":path: /demo.Service/Step"
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return absl::OkStatus();
}),
// Receive server to client messages.
Map(pipe_server_to_client_messages_second_.receiver.Next(),
[this](NextResult<MessageHandle> r) {
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()->payload()->JoinIntoString(),
message);
return absl::OkStatus();
}),
[this] {
// Close pipes after receive message.
pipe_server_to_client_messages_second_.sender.Close();
pipe_server_intial_metadata_second_.sender.Close();
return absl::OkStatus();
})),
// Once complete, verify successful sending and the received value. // Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status, absl::Status, [](const std::tuple<absl::Status, absl::Status, absl::Status,
absl::Status>& ret) { absl::Status, absl::Status, absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok()); EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok()); EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok()); EXPECT_TRUE(std::get<2>(ret).ok());
EXPECT_TRUE(std::get<3>(ret).ok()); EXPECT_TRUE(std::get<3>(ret).ok());
EXPECT_TRUE(std::get<4>(ret).ok());
EXPECT_TRUE(std::get<5>(ret).ok());
return absl::OkStatus(); return absl::OkStatus();
}), }),
InlineWakeupScheduler(), EventEngineWakeupScheduler(
std::static_pointer_cast<
grpc_event_engine::experimental::EventEngine>(event_engine_)),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); }); [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish. // Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle(); event_engine_->TickUntilIdle();
@ -321,24 +723,35 @@ TEST_F(ClientTransportTest, AddMultipleStreams) {
} }
TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) { TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) {
InitialClientTransport(2);
auto messages = CreateMessages(6); auto messages = CreateMessages(6);
ClientMetadataHandle md; ClientMetadataHandle first_stream_md;
auto first_stream_args = CallArgs{ ClientMetadataHandle second_stream_md;
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, auto first_stream_args =
nullptr, &pipe_client_to_server_messages_.receiver, nullptr}; CallArgs{std::move(first_stream_md),
auto second_stream_args = CallArgs{ ClientInitialMetadataOutstandingToken::Empty(),
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr, nullptr,
nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr}; &pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
auto second_stream_args =
CallArgs{std::move(second_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_second_.sender,
&pipe_client_to_server_messages_second_.receiver,
&pipe_server_to_client_messages_second_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done; StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus())); EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).Times(6).WillRepeatedly(Return(true)); EXPECT_CALL(control_endpoint_, Write).Times(6).WillRepeatedly(Return(true));
EXPECT_CALL(data_endpoint_, Write).Times(6).WillRepeatedly(Return(true)); EXPECT_CALL(data_endpoint_, Write).Times(6).WillRepeatedly(Return(true));
auto activity = MakeActivity( auto activity = MakeActivity(
Seq( Seq(
// Concurrently: send messages into the pipe, and receive from the // Concurrently: write and read messages in client transport.
// pipe.
Join( Join(
// Send messages to first stream pipe. // Send messages to first stream's
// call_args.client_to_server_messages pipe, which will be
// eventually sent to control/data endpoints.
Seq(pipe_client_to_server_messages_.sender.Push( Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])), std::move(messages[0])),
pipe_client_to_server_messages_.sender.Push( pipe_client_to_server_messages_.sender.Push(
@ -349,7 +762,9 @@ TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) {
pipe_client_to_server_messages_.sender.Close(); pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus(); return absl::OkStatus();
}), }),
// Send messages to second stream pipe. // Send messages to second stream's
// call_args.client_to_server_messages pipe, which will be
// eventually sent to control/data endpoints.
Seq(pipe_client_to_server_messages_second_.sender.Push( Seq(pipe_client_to_server_messages_second_.sender.Push(
std::move(messages[3])), std::move(messages[3])),
pipe_client_to_server_messages_second_.sender.Push( pipe_client_to_server_messages_second_.sender.Push(
@ -360,20 +775,116 @@ TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) {
pipe_client_to_server_messages_second_.sender.Close(); pipe_client_to_server_messages_second_.sender.Close();
return absl::OkStatus(); return absl::OkStatus();
}), }),
// Receive messages from first stream pipe. // Add first stream with call_args into client transport.
client_transport_.AddStream(std::move(first_stream_args)), Seq(Join(client_transport_->AddStream(
// Receive messages from second stream pipe. std::move(first_stream_args)),
client_transport_.AddStream(std::move(second_stream_args))), [this] {
read_callback[0](absl::OkStatus());
return absl::OkStatus();
}),
[](std::tuple<absl::StatusOr<ServerMetadataHandle>,
absl::Status>
ret) {
// AddStream finish with trailers "grpc-status:0".
EXPECT_EQ(std::get<0>(ret)
.value()
->get(GrpcStatusMetadata())
.value(),
grpc_status_code::GRPC_STATUS_OK);
return absl::OkStatus();
}),
// Add second stream with call_args into client transport.
Seq(Join(client_transport_->AddStream(
std::move(second_stream_args)),
Seq(Wait(),
[this] {
// Wait until first stream read finished to start
// the second read.
read_callback[1](absl::OkStatus());
return absl::OkStatus();
})),
[](std::tuple<absl::StatusOr<ServerMetadataHandle>,
absl::Status>
ret) {
// AddStream finish with trailers "grpc-status:0".
EXPECT_EQ(std::get<0>(ret)
.value()
->get(GrpcStatusMetadata())
.value(),
grpc_status_code::GRPC_STATUS_OK);
return absl::OkStatus();
}),
// Receive first stream's messages from control/data endpoints.
Seq(
// Receive server initial metadata.
Map(pipe_server_intial_metadata_.receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
// Expect value: ":path: /demo.Service/Step"
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return absl::OkStatus();
}),
// Receive server to client messages.
Map(pipe_server_to_client_messages_.receiver.Next(),
[this](NextResult<MessageHandle> r) {
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()->payload()->JoinIntoString(),
message);
return absl::OkStatus();
}),
[this] {
// Wake up the sencond stream read after first stream read
// finished.
Wakeup();
// Close pipes after receive message.
pipe_server_to_client_messages_.sender.Close();
pipe_server_intial_metadata_.sender.Close();
return absl::OkStatus();
}),
// Receive second stream's messages from control/data endpoints.
Seq(
// Receive server initial metadata.
Map(pipe_server_intial_metadata_second_.receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
// Expect value: ":path: /demo.Service/Step"
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return absl::OkStatus();
}),
// Receive server to client messages.
Map(pipe_server_to_client_messages_second_.receiver.Next(),
[this](NextResult<MessageHandle> r) {
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()->payload()->JoinIntoString(),
message);
return absl::OkStatus();
}),
[this] {
// Close pipes after receive message.
pipe_server_to_client_messages_second_.sender.Close();
pipe_server_intial_metadata_second_.sender.Close();
return absl::OkStatus();
})),
// Once complete, verify successful sending and the received value. // Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status, absl::Status, [](const std::tuple<absl::Status, absl::Status, absl::Status,
absl::Status>& ret) { absl::Status, absl::Status, absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok()); EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok()); EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok()); EXPECT_TRUE(std::get<2>(ret).ok());
EXPECT_TRUE(std::get<3>(ret).ok()); EXPECT_TRUE(std::get<3>(ret).ok());
EXPECT_TRUE(std::get<4>(ret).ok());
EXPECT_TRUE(std::get<5>(ret).ok());
return absl::OkStatus(); return absl::OkStatus();
}), }),
InlineWakeupScheduler(), EventEngineWakeupScheduler(
std::static_pointer_cast<
grpc_event_engine::experimental::EventEngine>(event_engine_)),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); }); [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish. // Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle(); event_engine_->TickUntilIdle();

@ -2189,8 +2189,7 @@
"ci_platforms": [ "ci_platforms": [
"linux", "linux",
"mac", "mac",
"posix", "posix"
"windows"
], ],
"cpu_cost": 1.0, "cpu_cost": 1.0,
"exclude_configs": [], "exclude_configs": [],
@ -2202,8 +2201,7 @@
"platforms": [ "platforms": [
"linux", "linux",
"mac", "mac",
"posix", "posix"
"windows"
], ],
"uses_polling": false "uses_polling": false
}, },

Loading…
Cancel
Save