[chaotic-good] Add chaotic good client transport read (roll-forward) (#34657)

Roll forward #34191, which is reverted due to error `2023-10-09
22:01:18,569 FAILED: cmake/build/client_transport_test
--gtest_filter=ClientTransportTest.AddMultipleStreams
GRPC_POLL_STRATEGY=none` (Removed uses_event_engine=False,
uses_polling=False in test build).

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/34761/head
nanahpang 1 year ago committed by GitHub
parent 65d4df25ed
commit a78145514d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      CMakeLists.txt
  2. 8
      build_autogenerated.yaml
  3. 18
      src/core/BUILD
  4. 110
      src/core/ext/transport/chaotic_good/client_transport.cc
  5. 96
      src/core/ext/transport/chaotic_good/client_transport.h
  6. 6
      src/core/ext/transport/chaotic_good/frame.cc
  7. 1
      src/core/ext/transport/chaotic_good/frame.h
  8. 8
      src/core/lib/transport/promise_endpoint.cc
  9. 10
      test/core/transport/chaotic_good/BUILD
  10. 683
      test/core/transport/chaotic_good/client_transport_test.cc
  11. 8
      tools/run_tests/generated/tests.json

4
CMakeLists.txt generated

@ -948,7 +948,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx client_ssl_test)
endif()
add_dependencies(buildtests_cxx client_streaming_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx client_transport_test)
endif()
add_dependencies(buildtests_cxx cmdline_test)
add_dependencies(buildtests_cxx codegen_test_full)
add_dependencies(buildtests_cxx codegen_test_minimal)
@ -9337,6 +9339,7 @@ target_link_libraries(client_streaming_test
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(client_transport_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
@ -9378,6 +9381,7 @@ target_link_libraries(client_transport_test
)
endif()
endif()
if(gRPC_BUILD_TESTS)

@ -7069,12 +7069,13 @@ targets:
- src/core/ext/transport/chaotic_good/frame_header.h
- src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/event_engine_wakeup_scheduler.h
- src/core/lib/promise/inter_activity_pipe.h
- src/core/lib/promise/join.h
- src/core/lib/promise/mpsc.h
- src/core/lib/promise/try_join.h
- src/core/lib/promise/wait_set.h
- src/core/lib/transport/promise_endpoint.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/promise/test_wakeup_schedulers.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- src/core/ext/transport/chaotic_good/client_transport.cc
@ -7087,7 +7088,10 @@ targets:
- gtest
- protobuf
- grpc_test_util
uses_polling: false
platforms:
- linux
- posix
- mac
- name: cmdline_test
gtest: true
build: test

@ -6020,6 +6020,7 @@ grpc_cc_library(
"arena",
"bitset",
"chaotic_good_frame_header",
"context",
"no_destruct",
"slice",
"slice_buffer",
@ -6182,29 +6183,42 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/random",
"absl/random:bit_gen_ref",
"absl/status",
"absl/status:statusor",
"absl/types:optional",
"absl/types:variant",
],
language = "c++",
deps = [
"activity",
"arena",
"chaotic_good_frame",
"chaotic_good_frame_header",
"event_engine_wakeup_scheduler",
"for_each",
"grpc_promise_endpoint",
"join",
"if",
"inter_activity_pipe",
"loop",
"match",
"memory_quota",
"mpsc",
"pipe",
"seq",
"poll",
"resource_quota",
"slice",
"slice_buffer",
"try_join",
"try_seq",
"//:exec_ctx",
"//:gpr",
"//:gpr_platform",
"//:grpc_base",
"//:hpack_encoder",
"//:hpack_parser",
"//:ref_counted_ptr",
],
)

@ -20,17 +20,26 @@
#include <string>
#include <tuple>
#include "absl/random/bit_gen_ref.h"
#include "absl/random/random.h"
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/try_join.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
@ -49,9 +58,14 @@ ClientTransport::ClientTransport(
control_endpoint_write_buffer_(SliceBuffer()),
data_endpoint_write_buffer_(SliceBuffer()),
hpack_compressor_(std::make_unique<HPackCompressor>()),
hpack_parser_(std::make_unique<HPackParser>()),
memory_allocator_(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
"client_transport")),
arena_(MakeScopedArena(1024, &memory_allocator_)),
event_engine_(event_engine) {
auto write_loop = Loop([this] {
return Seq(
return TrySeq(
// Get next outgoing frame.
this->outgoing_frames_.Next(),
// Construct data buffers that will be sent to the endpoints.
@ -81,20 +95,16 @@ ClientTransport::ClientTransport(
},
// Write buffers to corresponding endpoints concurrently.
[this]() {
return Join(this->control_endpoint_->Write(
return TryJoin(
control_endpoint_->Write(
std::move(control_endpoint_write_buffer_)),
this->data_endpoint_->Write(
std::move(data_endpoint_write_buffer_)));
data_endpoint_->Write(std::move(data_endpoint_write_buffer_)));
},
// Finish writes and return status.
[](std::tuple<absl::Status, absl::Status> ret)
-> LoopCtl<absl::Status> {
// If writes failed, return failure status.
if (!(std::get<0>(ret).ok() || std::get<1>(ret).ok())) {
// TODO(ladynana): handle the promise endpoint write failures with
// closing the transport.
return absl::InternalError("Promise endpoint writes failed.");
}
// Finish writes to difference endpoints and continue the loop.
[]() -> LoopCtl<absl::Status> {
// The write failures will be caught in TrySeq and exit loop.
// Therefore, only need to return Continue() in the last lambda
// function.
return Continue();
});
});
@ -104,7 +114,79 @@ ClientTransport::ClientTransport(
[](absl::Status status) {
GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
status.code() == absl::StatusCode::kInternal);
// TODO(ladynana): handle the promise endpoint write failures with
// outgoing_frames.close() once available.
},
// Hold Arena in activity for GetContext<Arena> usage.
arena_.get());
auto read_loop = Loop([this] {
return TrySeq(
// Read frame header from control endpoint.
// TODO(ladynana): remove memcpy in ReadSlice.
this->control_endpoint_->ReadSlice(FrameHeader::frame_header_size_),
// Read different parts of the server frame from control/data endpoints
// based on frame header.
[this](Slice read_buffer) mutable {
frame_header_ = std::make_shared<FrameHeader>(
FrameHeader::Parse(
reinterpret_cast<const uint8_t*>(
GRPC_SLICE_START_PTR(read_buffer.c_slice())))
.value());
// Read header and trailers from control endpoint.
// Read message padding and message from data endpoint.
return TryJoin(
control_endpoint_->Read(frame_header_->GetFrameLength()),
data_endpoint_->Read(frame_header_->message_padding +
frame_header_->message_length));
},
// Construct and send the server frame to corresponding stream.
[this](std::tuple<SliceBuffer, SliceBuffer> ret) mutable {
control_endpoint_read_buffer_ = std::move(std::get<0>(ret));
// Discard message padding and only keep message in data read buffer.
std::get<1>(ret).MoveLastNBytesIntoSliceBuffer(
frame_header_->message_length, data_endpoint_read_buffer_);
ServerFragmentFrame frame;
// Initialized to get this_cpu() info in global_stat().
ExecCtx exec_ctx;
// Deserialize frame from read buffer.
absl::BitGen bitgen;
auto status = frame.Deserialize(hpack_parser_.get(), *frame_header_,
absl::BitGenRef(bitgen),
control_endpoint_read_buffer_);
GPR_ASSERT(status.ok());
// Move message into frame.
frame.message = arena_->MakePooled<Message>(
std::move(data_endpoint_read_buffer_), 0);
std::shared_ptr<
InterActivityPipe<ServerFrame, server_frame_queue_size_>::Sender>
sender;
{
MutexLock lock(&mu_);
sender = stream_map_[frame.frame_header.stream_id];
}
return sender->Push(ServerFrame(std::move(frame)));
},
// Check if send frame to corresponding stream successfully.
[](bool ret) -> LoopCtl<absl::Status> {
if (ret) {
// Send incoming frames successfully.
return Continue();
} else {
return absl::InternalError("Send incoming frames failed.");
}
});
});
reader_ = MakeActivity(
// Continuously read next incoming frames from promise endpoints.
std::move(read_loop), EventEngineWakeupScheduler(event_engine_),
[](absl::Status status) {
GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
status.code() == absl::StatusCode::kInternal);
// TODO(ladynana): handle the promise endpoint read failures with
// iterating stream_map_ and close all the pipes once available.
},
// Hold Arena in activity for GetContext<Arena> usage.
arena_.get());
}
} // namespace chaotic_good

@ -21,25 +21,38 @@
#include <stdint.h>
#include <initializer_list> // IWYU pragma: keep
#include <map>
#include <memory>
#include <tuple>
#include <type_traits>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/support/log.h>
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/for_each.h"
#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/inter_activity_pipe.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/mpsc.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/try_join.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep
#include "src/core/lib/transport/promise_endpoint.h"
@ -58,18 +71,31 @@ class ClientTransport {
if (writer_ != nullptr) {
writer_.reset();
}
if (reader_ != nullptr) {
reader_.reset();
}
}
auto AddStream(CallArgs call_args) {
// At this point, the connection is set up.
// Start sending data frames.
uint32_t stream_id;
InterActivityPipe<ServerFrame, server_frame_queue_size_> server_frames;
{
MutexLock lock(&mu_);
stream_id = next_stream_id_++;
stream_map_.insert(
std::pair<uint32_t,
std::shared_ptr<InterActivityPipe<
ServerFrame, server_frame_queue_size_>::Sender>>(
stream_id, std::make_shared<InterActivityPipe<
ServerFrame, server_frame_queue_size_>::Sender>(
std::move(server_frames.sender))));
}
return Seq(
// Continuously send data frame with client to server messages.
ForEach(std::move(*call_args.client_to_server_messages),
return TrySeq(
TryJoin(
// Continuously send client frame with client to server messages.
ForEach(
std::move(*call_args.client_to_server_messages),
[stream_id, initial_frame = true,
client_initial_metadata =
std::move(call_args.client_initial_metadata),
@ -89,7 +115,7 @@ class ClientTransport {
frame.headers = std::move(client_initial_metadata);
initial_frame = false;
}
return Seq(
return TrySeq(
outgoing_frames.Send(ClientFrame(std::move(frame))),
[](bool success) -> absl::Status {
if (!success) {
@ -98,24 +124,80 @@ class ClientTransport {
}
return absl::OkStatus();
});
}),
// Continuously receive server frames from endpoints and save
// results to call_args.
Loop([server_initial_metadata = call_args.server_initial_metadata,
server_to_client_messages =
call_args.server_to_client_messages,
receiver = std::move(server_frames.receiver)]() mutable {
return TrySeq(
// Receive incoming server frame.
receiver.Next(),
// Save incomming frame results to call_args.
[server_initial_metadata, server_to_client_messages](
absl::optional<ServerFrame> server_frame) mutable {
GPR_ASSERT(server_frame.has_value());
auto frame = std::move(
absl::get<ServerFragmentFrame>(*server_frame));
return TrySeq(
If((frame.headers != nullptr),
[server_initial_metadata,
headers = std::move(frame.headers)]() mutable {
return server_initial_metadata->Push(
std::move(headers));
},
[] { return false; }),
If((frame.message != nullptr),
[server_to_client_messages,
message = std::move(frame.message)]() mutable {
return server_to_client_messages->Push(
std::move(message));
},
[] { return false; }),
If((frame.trailers != nullptr),
[trailers = std::move(frame.trailers)]() mutable
-> LoopCtl<ServerMetadataHandle> {
return std::move(trailers);
},
[]() -> LoopCtl<ServerMetadataHandle> {
return Continue();
}));
});
})),
[](std::tuple<Empty, ServerMetadataHandle> ret) {
return std::move(std::get<1>(ret));
});
}
private:
// Max buffer is set to 4, so that for stream writes each time it will queue
// at most 2 frames.
MpscReceiver<ClientFrame> outgoing_frames_;
Mutex mu_;
uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1;
// Queue size of each stream pipe is set to 2, so that for each stream read it
// will queue at most 2 frames.
static const size_t server_frame_queue_size_ = 2;
// Assigned aligned bytes from setting frame.
size_t aligned_bytes = 64;
Mutex mu_;
uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1;
// Map of stream incoming server frames, key is stream_id.
std::map<uint32_t, std::shared_ptr<InterActivityPipe<
ServerFrame, server_frame_queue_size_>::Sender>>
stream_map_ ABSL_GUARDED_BY(mu_);
ActivityPtr writer_;
ActivityPtr reader_;
std::unique_ptr<PromiseEndpoint> control_endpoint_;
std::unique_ptr<PromiseEndpoint> data_endpoint_;
SliceBuffer control_endpoint_write_buffer_;
SliceBuffer data_endpoint_write_buffer_;
SliceBuffer control_endpoint_read_buffer_;
SliceBuffer data_endpoint_read_buffer_;
std::unique_ptr<HPackCompressor> hpack_compressor_;
std::unique_ptr<HPackParser> hpack_parser_;
std::shared_ptr<FrameHeader> frame_header_;
MemoryAllocator memory_allocator_;
ScopedArenaPtr arena_;
// Use to synchronize writer_ and reader_ activity with outside activities;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
};

@ -32,6 +32,8 @@
#include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
@ -130,7 +132,9 @@ absl::StatusOr<Arena::PoolPtr<Metadata>> ReadMetadata(
absl::BitGenRef bitsrc) {
if (!maybe_slices.ok()) return maybe_slices.status();
auto& slices = *maybe_slices;
Arena::PoolPtr<Metadata> metadata;
auto arena = GetContext<Arena>();
GPR_ASSERT(arena != nullptr);
Arena::PoolPtr<Metadata> metadata = arena->MakePooled<Metadata>(arena);
parser->BeginFrame(
metadata.get(), std::numeric_limits<uint32_t>::max(),
std::numeric_limits<uint32_t>::max(),

@ -97,6 +97,7 @@ struct ServerFragmentFrame final : public FrameInterface {
FrameHeader frame_header;
ServerMetadataHandle headers;
MessageHandle message;
ServerMetadataHandle trailers;
bool operator==(const ServerFragmentFrame& other) const {

@ -46,10 +46,10 @@ PromiseEndpoint::PromiseEndpoint(
}
PromiseEndpoint::~PromiseEndpoint() {
// Last write result has not been polled.
GPR_ASSERT(!write_result_.has_value());
// Last read result has not been polled.
GPR_ASSERT(!read_result_.has_value());
// Promise endpoint close when last write result has not been polled.
write_result_.reset();
// Promise endpoint close when last read result has not been polled.
read_result_.reset();
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&

@ -87,20 +87,25 @@ grpc_cc_test(
srcs = ["client_transport_test.cc"],
external_deps = [
"absl/functional:any_invocable",
"absl/status:statusor",
"absl/strings:str_format",
"absl/types:optional",
"gtest",
],
language = "C++",
uses_event_engine = False,
uses_polling = False,
# TODO(ladynana): remove the no_windows tag.
tags = ["no_windows"],
deps = [
"//:grpc",
"//:grpc_public_hdrs",
"//:iomgr_timer",
"//:ref_counted_ptr",
"//src/core:activity",
"//src/core:arena",
"//src/core:chaotic_good_client_transport",
"//src/core:event_engine_wakeup_scheduler",
"//src/core:join",
"//src/core:map",
"//src/core:memory_quota",
"//src/core:pipe",
"//src/core:resource_quota",
@ -109,6 +114,5 @@ grpc_cc_test(
"//src/core:slice_buffer",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
"//test/core/promise:test_wakeup_schedulers",
],
)

@ -18,23 +18,30 @@
#include <algorithm> // IWYU pragma: keep
#include <memory>
#include <string> // IWYU pragma: keep
#include <tuple>
#include <vector> // IWYU pragma: keep
#include "absl/functional/any_invocable.h"
#include "absl/status/statusor.h" // IWYU pragma: keep
#include "absl/strings/str_format.h" // IWYU pragma: keep
#include "absl/types/optional.h" // IWYU pragma: keep
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice.h> // IWYU pragma: keep
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/grpc.h>
#include <grpc/status.h> // IWYU pragma: keep
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/arena.h"
@ -42,12 +49,14 @@
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h" // IWYU pragma: keep
#include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/promise/test_wakeup_schedulers.h"
using testing::MockFunction;
using testing::Return;
using testing::Sequence;
using testing::StrictMock;
using testing::WithArgs;
@ -101,19 +110,117 @@ class ClientTransportTest : public ::testing::Test {
return options;
}(),
fuzzing_event_engine::Actions())),
client_transport_(
arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)),
pipe_client_to_server_messages_(arena_.get()),
pipe_server_to_client_messages_(arena_.get()),
pipe_server_intial_metadata_(arena_.get()),
pipe_client_to_server_messages_second_(arena_.get()),
pipe_server_to_client_messages_second_(arena_.get()),
pipe_server_intial_metadata_second_(arena_.get()) {}
// Expect how client transport will read from control/data endpoints with a
// test frame.
void AddReadExpectations(int num_of_streams) {
for (int i = 0; i < num_of_streams; i++) {
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence)
.WillOnce(WithArgs<0, 1>(
[this, i](absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer*
buffer) mutable {
// Construct test frame for EventEngine read: headers (15
// bytes), message(16 bytes), message padding (48 byte),
// trailers (15 bytes).
const std::string frame_header = {
static_cast<char>(0x80), // frame type = fragment
0x03, // flag = has header + has trailer
0x00,
0x00,
static_cast<char>(i + 1), // stream id = 1
0x00,
0x00,
0x00,
0x1a, // header length = 26
0x00,
0x00,
0x00,
0x08, // message length = 8
0x00,
0x00,
0x00,
0x38, // message padding =56
0x00,
0x00,
0x00,
0x0f, // trailer length = 15
0x00,
0x00,
0x00};
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(frame_header));
buffer->Append(std::move(slice));
// Execute read callback later to control when read starts.
read_callback.push_back(std::move(on_read));
// Return false to mock EventEngine read not finish.
return false;
}));
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence)
.WillOnce(WithArgs<1>(
[](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Encoded string of header ":path: /demo.Service/Step".
const std::string header = {
0x10, 0x05, 0x3a, 0x70, 0x61, 0x74, 0x68, 0x12, 0x2f,
0x64, 0x65, 0x6d, 0x6f, 0x2e, 0x53, 0x65, 0x72, 0x76,
0x69, 0x63, 0x65, 0x2f, 0x53, 0x74, 0x65, 0x70};
// Encoded string of trailer "grpc-status: 0".
const std::string trailers = {0x10, 0x0b, 0x67, 0x72, 0x70,
0x63, 0x2d, 0x73, 0x74, 0x61,
0x74, 0x75, 0x73, 0x01, 0x30};
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(header + trailers));
buffer->Append(std::move(slice));
return true;
}));
}
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence)
.WillOnce(Return(false));
for (int i = 0; i < num_of_streams; i++) {
EXPECT_CALL(data_endpoint_, Read)
.InSequence(data_endpoint_sequence)
.WillOnce(WithArgs<1>(
[this](grpc_event_engine::experimental::SliceBuffer* buffer) {
const std::string message_padding = {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(message_padding + message));
buffer->Append(std::move(slice));
return true;
}));
}
}
// Initial ClientTransport with read expecations
void InitialClientTransport(int num_of_streams) {
// Read expectaions need to be added before transport initialization since
// reader_ activity loop is started in ClientTransport initialization,
AddReadExpectations(num_of_streams);
client_transport_ = std::make_unique<ClientTransport>(
std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(control_endpoint_ptr_),
SliceBuffer()),
std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(data_endpoint_ptr_),
SliceBuffer()),
std::static_pointer_cast<
grpc_event_engine::experimental::EventEngine>(event_engine_)),
arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)),
pipe_client_to_server_messages_(arena_.get()),
pipe_client_to_server_messages_second_(arena_.get()) {}
std::unique_ptr<MockEndpoint>(data_endpoint_ptr_), SliceBuffer()),
std::static_pointer_cast<grpc_event_engine::experimental::EventEngine>(
event_engine_));
}
// Create client to server test messages.
std::vector<MessageHandle> CreateMessages(int num_of_messages) {
std::vector<MessageHandle> messages;
for (int i = 0; i < num_of_messages; i++) {
@ -125,103 +232,238 @@ class ClientTransportTest : public ::testing::Test {
}
return messages;
}
// Wait for last stream read to finish.
auto Wait() {
return [this]() mutable -> Poll<Result> {
MutexLock lock(&mu_);
if (last_stream_read_done_) {
return Result{};
} else {
waker_ = Activity::current()->MakeNonOwningWaker();
return Pending();
}
};
}
// Wake up the pending Wait() promise.
void Wakeup() {
MutexLock lock(&mu_);
last_stream_read_done_ = true;
waker_.Wakeup();
}
private:
struct Result {};
Mutex mu_;
Waker waker_ ABSL_GUARDED_BY(mu_);
bool last_stream_read_done_ ABSL_GUARDED_BY(mu_) = false;
MockEndpoint* control_endpoint_ptr_;
MockEndpoint* data_endpoint_ptr_;
size_t initial_arena_size = 1024;
MemoryAllocator memory_allocator_;
Sequence control_endpoint_sequence;
Sequence data_endpoint_sequence;
protected:
MockEndpoint& control_endpoint_;
MockEndpoint& data_endpoint_;
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
event_engine_;
ClientTransport client_transport_;
std::unique_ptr<ClientTransport> client_transport_;
ScopedArenaPtr arena_;
Pipe<MessageHandle> pipe_client_to_server_messages_;
Pipe<MessageHandle> pipe_server_to_client_messages_;
Pipe<ServerMetadataHandle> pipe_server_intial_metadata_;
// Added for mutliple streams tests.
Pipe<MessageHandle> pipe_client_to_server_messages_second_;
const absl::Status kDummyErrorStatus =
absl::ErrnoToStatus(5566, "just an error");
static constexpr size_t kDummyRequestSize = 5566u;
Pipe<MessageHandle> pipe_server_to_client_messages_second_;
Pipe<ServerMetadataHandle> pipe_server_intial_metadata_second_;
std::vector<absl::AnyInvocable<void(absl::Status)>> read_callback;
// Added to verify received message payload.
const std::string message = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
};
TEST_F(ClientTransportTest, AddOneStream) {
InitialClientTransport(1);
auto messages = CreateMessages(1);
ClientMetadataHandle md;
auto args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
auto args = CallArgs{std::move(md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).WillOnce(Return(true));
EXPECT_CALL(data_endpoint_, Write).WillOnce(Return(true));
auto activity = MakeActivity(
Seq(
// Concurrently: send message into the pipe, and receive from the
// pipe.
Join(Seq(pipe_client_to_server_messages_.sender.Push(
// Concurrently: write and read messages in client transport.
Join(
// Send messages to call_args.client_to_server_messages pipe,
// which will be eventually sent to control/data endpoints.
Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])),
[this] {
this->pipe_client_to_server_messages_.sender.Close();
pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
client_transport_.AddStream(std::move(args))),
// Add first stream with call_args into client transport.
Seq(Join(client_transport_->AddStream(std::move(args)),
[this]() {
// Concurrently: start read from control endpoints.
read_callback[0](absl::OkStatus());
return absl::OkStatus();
}),
[](std::tuple<absl::StatusOr<ServerMetadataHandle>,
absl::Status>
ret) {
// AddStream will finish with server trailers:
// "grpc-status:0".
EXPECT_EQ(std::get<0>(ret)
.value()
->get(GrpcStatusMetadata())
.value(),
grpc_status_code::GRPC_STATUS_OK);
return absl::OkStatus();
}),
// Receive messages from control/data endpoints.
Seq(
// Receive server initial metadata.
Map(pipe_server_intial_metadata_.receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
// Expect value: ":path: /demo.Service/Step"
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return absl::OkStatus();
}),
// Receive server to client messages.
Map(pipe_server_to_client_messages_.receiver.Next(),
[this](NextResult<MessageHandle> r) {
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()->payload()->JoinIntoString(),
message);
return absl::OkStatus();
}),
[this]() {
// Close pipes after receive message.
pipe_server_to_client_messages_.sender.Close();
pipe_server_intial_metadata_.sender.Close();
return absl::OkStatus();
})),
// Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status>& ret) {
[](const std::tuple<absl::Status, absl::Status, absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok());
return absl::OkStatus();
}),
InlineWakeupScheduler(),
EventEngineWakeupScheduler(
std::static_pointer_cast<
grpc_event_engine::experimental::EventEngine>(event_engine_)),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
TEST_F(ClientTransportTest, AddOneStreamWithEEFailed) {
TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) {
InitialClientTransport(1);
auto messages = CreateMessages(1);
ClientMetadataHandle md;
auto args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
auto args = CallArgs{std::move(md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write)
.WillOnce(
WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(this->kDummyErrorStatus);
WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(absl::InternalError("control endpoint write failed."));
return false;
}));
EXPECT_CALL(data_endpoint_, Write)
.WillOnce(
WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(this->kDummyErrorStatus);
WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(absl::InternalError("control endpoint write failed."));
return false;
}));
auto activity = MakeActivity(
Seq(
// Concurrently: send message into the pipe, and receive from the
// pipe.
Join(Seq(pipe_client_to_server_messages_.sender.Push(
// Concurrently: write and read messages in client transport.
Join(
// Send messages to call_args.client_to_server_messages pipe,
// which will be eventually sent to control/data endpoints.
Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])),
[this] {
this->pipe_client_to_server_messages_.sender.Close();
pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
client_transport_.AddStream(std::move(args))),
// Add first stream with call_args into client transport.
Seq(Join(client_transport_->AddStream(std::move(args)),
[this]() {
// Start read from endpoints.
read_callback[0](absl::OkStatus());
return absl::OkStatus();
}),
[](std::tuple<absl::StatusOr<ServerMetadataHandle>,
absl::Status>
ret) {
// AddStream will finish with server trailers:
// "grpc-status:0".
EXPECT_EQ(std::get<0>(ret)
.value()
->get(GrpcStatusMetadata())
.value(),
grpc_status_code::GRPC_STATUS_OK);
return absl::OkStatus();
}),
// Receive messages from control/data endpoints.
Seq(
// Receive server initial metadata.
Map(pipe_server_intial_metadata_.receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
// Expect value: ":path: /demo.Service/Step"
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return absl::OkStatus();
}),
// Receive server to client messages.
Map(pipe_server_to_client_messages_.receiver.Next(),
[this](NextResult<MessageHandle> r) {
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()->payload()->JoinIntoString(),
message);
return absl::OkStatus();
}),
[this] {
// Close pipes after receive message.
pipe_server_to_client_messages_.sender.Close();
pipe_server_intial_metadata_.sender.Close();
return absl::OkStatus();
})),
// Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status>& ret) {
[](const std::tuple<absl::Status, absl::Status, absl::Status>& ret) {
// TODO(ladynana): change these expectations to errors after the
// writer activity closes transport for EE failures.
// writer activity closes transport for write failures.
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok());
return absl::OkStatus();
}),
InlineWakeupScheduler(),
EventEngineWakeupScheduler(
std::static_pointer_cast<
grpc_event_engine::experimental::EventEngine>(event_engine_)),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
@ -229,37 +471,90 @@ TEST_F(ClientTransportTest, AddOneStreamWithEEFailed) {
}
TEST_F(ClientTransportTest, AddOneStreamMultipleMessages) {
InitialClientTransport(1);
auto messages = CreateMessages(3);
ClientMetadataHandle md;
auto args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
auto args = CallArgs{std::move(md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).Times(3).WillRepeatedly(Return(true));
EXPECT_CALL(data_endpoint_, Write).Times(3).WillRepeatedly(Return(true));
auto activity = MakeActivity(
Seq(
// Concurrently: send messages into the pipe, and receive from the
// pipe.
Join(Seq(pipe_client_to_server_messages_.sender.Push(
// Concurrently: write and read messages in client transport.
Join(
// Send messages to call_args.client_to_server_messages pipe,
// which will be eventually sent to control/data endpoints.
Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])),
pipe_client_to_server_messages_.sender.Push(
std::move(messages[1])),
pipe_client_to_server_messages_.sender.Push(
std::move(messages[2])),
[this] {
this->pipe_client_to_server_messages_.sender.Close();
pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
// Add first stream with call_args into client transport.
Seq(Join(client_transport_->AddStream(std::move(args)),
[this]() {
// Start read from endpoints.
read_callback[0](absl::OkStatus());
return absl::OkStatus();
}),
client_transport_.AddStream(std::move(args))),
[](std::tuple<absl::StatusOr<ServerMetadataHandle>,
absl::Status>
ret) {
// AddStream finish with trailers "grpc-status:0".
EXPECT_EQ(std::get<0>(ret)
.value()
->get(GrpcStatusMetadata())
.value(),
grpc_status_code::GRPC_STATUS_OK);
return absl::OkStatus();
}),
// Receive messages from control/data endpoints.
Seq(
// Receive server initial metadata.
Map(pipe_server_intial_metadata_.receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
// Expect value: ":path: /demo.Service/Step"
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return absl::OkStatus();
}),
// Receive server to client messages.
Map(pipe_server_to_client_messages_.receiver.Next(),
[this](NextResult<MessageHandle> r) {
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()->payload()->JoinIntoString(),
message);
return absl::OkStatus();
}),
[this] {
// Close pipes after receive message.
pipe_server_to_client_messages_.sender.Close();
pipe_server_intial_metadata_.sender.Close();
return absl::OkStatus();
})),
// Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status>& ret) {
[](const std::tuple<absl::Status, absl::Status, absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok());
return absl::OkStatus();
}),
InlineWakeupScheduler(),
EventEngineWakeupScheduler(
std::static_pointer_cast<
grpc_event_engine::experimental::EventEngine>(event_engine_)),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
@ -267,51 +562,160 @@ TEST_F(ClientTransportTest, AddOneStreamMultipleMessages) {
}
TEST_F(ClientTransportTest, AddMultipleStreams) {
InitialClientTransport(2);
auto messages = CreateMessages(2);
ClientMetadataHandle md;
auto first_stream_args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
auto second_stream_args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr};
ClientMetadataHandle first_stream_md;
ClientMetadataHandle second_stream_md;
auto first_stream_args =
CallArgs{std::move(first_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
auto second_stream_args =
CallArgs{std::move(second_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_second_.sender,
&pipe_client_to_server_messages_second_.receiver,
&pipe_server_to_client_messages_second_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).Times(2).WillRepeatedly(Return(true));
EXPECT_CALL(data_endpoint_, Write).Times(2).WillRepeatedly(Return(true));
auto activity = MakeActivity(
Seq(
// Concurrently: send messages into the pipe, and receive from the
// pipe.
// Concurrently: write and read messages from client transport.
Join(
// Send message to first stream pipe.
// Send messages to first stream's
// call_args.client_to_server_messages pipe, which will be
// eventually sent to control/data endpoints.
Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])),
[this] {
pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
// Send message to second stream pipe.
// Send messages to second stream's
// call_args.client_to_server_messages pipe, which will be
// eventually sent to control/data endpoints.
Seq(pipe_client_to_server_messages_second_.sender.Push(
std::move(messages[1])),
[this] {
pipe_client_to_server_messages_second_.sender.Close();
return absl::OkStatus();
}),
// Receive message from first stream pipe.
client_transport_.AddStream(std::move(first_stream_args)),
// Receive message from second stream pipe.
client_transport_.AddStream(std::move(second_stream_args))),
// Add first stream with call_args into client transport.
Seq(Join(client_transport_->AddStream(
std::move(first_stream_args)),
[this] {
read_callback[0](absl::OkStatus());
return absl::OkStatus();
}),
[](std::tuple<absl::StatusOr<ServerMetadataHandle>,
absl::Status>
ret) {
// AddStream finish with trailers "grpc-status:0".
EXPECT_EQ(std::get<0>(ret)
.value()
->get(GrpcStatusMetadata())
.value(),
grpc_status_code::GRPC_STATUS_OK);
return absl::OkStatus();
}),
// Add second stream with call_args into client transport.
Seq(Join(client_transport_->AddStream(
std::move(second_stream_args)),
Seq(Wait(),
[this] {
// Wait until first stream read finished to start
// the second read.
read_callback[1](absl::OkStatus());
return absl::OkStatus();
})),
[](std::tuple<absl::StatusOr<ServerMetadataHandle>,
absl::Status>
ret) {
// AddStream finish with trailers "grpc-status:0".
EXPECT_EQ(std::get<0>(ret)
.value()
->get(GrpcStatusMetadata())
.value(),
grpc_status_code::GRPC_STATUS_OK);
return absl::OkStatus();
}),
// Receive first stream's messages from control/data endpoints.
Seq(
// Receive server initial metadata.
Map(pipe_server_intial_metadata_.receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
// Expect value: ":path: /demo.Service/Step"
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return absl::OkStatus();
}),
// Receive server to client messages.
Map(pipe_server_to_client_messages_.receiver.Next(),
[this](NextResult<MessageHandle> r) {
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()->payload()->JoinIntoString(),
message);
return absl::OkStatus();
}),
[this] {
// Wake up the sencond stream read after first stream read
// finished.
Wakeup();
// Close pipes after receive message.
pipe_server_to_client_messages_.sender.Close();
pipe_server_intial_metadata_.sender.Close();
return absl::OkStatus();
}),
// Receive second stream's messages from control/data endpoints.
Seq(
// Receive server initial metadata.
Map(pipe_server_intial_metadata_second_.receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
// Expect value: ":path: /demo.Service/Step"
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return absl::OkStatus();
}),
// Receive server to client messages.
Map(pipe_server_to_client_messages_second_.receiver.Next(),
[this](NextResult<MessageHandle> r) {
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()->payload()->JoinIntoString(),
message);
return absl::OkStatus();
}),
[this] {
// Close pipes after receive message.
pipe_server_to_client_messages_second_.sender.Close();
pipe_server_intial_metadata_second_.sender.Close();
return absl::OkStatus();
})),
// Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status, absl::Status,
absl::Status>& ret) {
absl::Status, absl::Status, absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok());
EXPECT_TRUE(std::get<3>(ret).ok());
EXPECT_TRUE(std::get<4>(ret).ok());
EXPECT_TRUE(std::get<5>(ret).ok());
return absl::OkStatus();
}),
InlineWakeupScheduler(),
EventEngineWakeupScheduler(
std::static_pointer_cast<
grpc_event_engine::experimental::EventEngine>(event_engine_)),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
@ -319,24 +723,35 @@ TEST_F(ClientTransportTest, AddMultipleStreams) {
}
TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) {
InitialClientTransport(2);
auto messages = CreateMessages(6);
ClientMetadataHandle md;
auto first_stream_args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
auto second_stream_args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr};
ClientMetadataHandle first_stream_md;
ClientMetadataHandle second_stream_md;
auto first_stream_args =
CallArgs{std::move(first_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
auto second_stream_args =
CallArgs{std::move(second_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_second_.sender,
&pipe_client_to_server_messages_second_.receiver,
&pipe_server_to_client_messages_second_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).Times(6).WillRepeatedly(Return(true));
EXPECT_CALL(data_endpoint_, Write).Times(6).WillRepeatedly(Return(true));
auto activity = MakeActivity(
Seq(
// Concurrently: send messages into the pipe, and receive from the
// pipe.
// Concurrently: write and read messages in client transport.
Join(
// Send messages to first stream pipe.
// Send messages to first stream's
// call_args.client_to_server_messages pipe, which will be
// eventually sent to control/data endpoints.
Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])),
pipe_client_to_server_messages_.sender.Push(
@ -347,7 +762,9 @@ TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) {
pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
// Send messages to second stream pipe.
// Send messages to second stream's
// call_args.client_to_server_messages pipe, which will be
// eventually sent to control/data endpoints.
Seq(pipe_client_to_server_messages_second_.sender.Push(
std::move(messages[3])),
pipe_client_to_server_messages_second_.sender.Push(
@ -358,20 +775,116 @@ TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) {
pipe_client_to_server_messages_second_.sender.Close();
return absl::OkStatus();
}),
// Receive messages from first stream pipe.
client_transport_.AddStream(std::move(first_stream_args)),
// Receive messages from second stream pipe.
client_transport_.AddStream(std::move(second_stream_args))),
// Add first stream with call_args into client transport.
Seq(Join(client_transport_->AddStream(
std::move(first_stream_args)),
[this] {
read_callback[0](absl::OkStatus());
return absl::OkStatus();
}),
[](std::tuple<absl::StatusOr<ServerMetadataHandle>,
absl::Status>
ret) {
// AddStream finish with trailers "grpc-status:0".
EXPECT_EQ(std::get<0>(ret)
.value()
->get(GrpcStatusMetadata())
.value(),
grpc_status_code::GRPC_STATUS_OK);
return absl::OkStatus();
}),
// Add second stream with call_args into client transport.
Seq(Join(client_transport_->AddStream(
std::move(second_stream_args)),
Seq(Wait(),
[this] {
// Wait until first stream read finished to start
// the second read.
read_callback[1](absl::OkStatus());
return absl::OkStatus();
})),
[](std::tuple<absl::StatusOr<ServerMetadataHandle>,
absl::Status>
ret) {
// AddStream finish with trailers "grpc-status:0".
EXPECT_EQ(std::get<0>(ret)
.value()
->get(GrpcStatusMetadata())
.value(),
grpc_status_code::GRPC_STATUS_OK);
return absl::OkStatus();
}),
// Receive first stream's messages from control/data endpoints.
Seq(
// Receive server initial metadata.
Map(pipe_server_intial_metadata_.receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
// Expect value: ":path: /demo.Service/Step"
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return absl::OkStatus();
}),
// Receive server to client messages.
Map(pipe_server_to_client_messages_.receiver.Next(),
[this](NextResult<MessageHandle> r) {
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()->payload()->JoinIntoString(),
message);
return absl::OkStatus();
}),
[this] {
// Wake up the sencond stream read after first stream read
// finished.
Wakeup();
// Close pipes after receive message.
pipe_server_to_client_messages_.sender.Close();
pipe_server_intial_metadata_.sender.Close();
return absl::OkStatus();
}),
// Receive second stream's messages from control/data endpoints.
Seq(
// Receive server initial metadata.
Map(pipe_server_intial_metadata_second_.receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
// Expect value: ":path: /demo.Service/Step"
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return absl::OkStatus();
}),
// Receive server to client messages.
Map(pipe_server_to_client_messages_second_.receiver.Next(),
[this](NextResult<MessageHandle> r) {
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()->payload()->JoinIntoString(),
message);
return absl::OkStatus();
}),
[this] {
// Close pipes after receive message.
pipe_server_to_client_messages_second_.sender.Close();
pipe_server_intial_metadata_second_.sender.Close();
return absl::OkStatus();
})),
// Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status, absl::Status,
absl::Status>& ret) {
absl::Status, absl::Status, absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok());
EXPECT_TRUE(std::get<3>(ret).ok());
EXPECT_TRUE(std::get<4>(ret).ok());
EXPECT_TRUE(std::get<5>(ret).ok());
return absl::OkStatus();
}),
InlineWakeupScheduler(),
EventEngineWakeupScheduler(
std::static_pointer_cast<
grpc_event_engine::experimental::EventEngine>(event_engine_)),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();

@ -2189,8 +2189,7 @@
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
@ -2202,10 +2201,9 @@
"platforms": [
"linux",
"mac",
"posix",
"windows"
"posix"
],
"uses_polling": false
"uses_polling": true
},
{
"args": [],

Loading…
Cancel
Save