[chaotic-good] Add chaotic good client transport read (roll-forward) (#34806)

Roll forward #34657, which was reverted in #34761.  

Previous error in CMake:
```
[ RUN      ] ClientTransportTest.AddOneStreamMultipleMessages
unknown file: Failure

Unexpected mock function call - returning directly.
    Function call: Call(CANCELLED: )
Google Mock tried the following 1 expectation, but it didn't match:

/[var/local/git/grpc/test/core/transport/chaotic_good/client_transport_test.cc:484](https://cs.corp.google.com/piper///depot/google3/var/local/git/grpc/test/core/transport/chaotic_good/client_transport_test.cc?l=484): EXPECT_CALL(on_done, Call(absl::OkStatus()))...
  Expected arg #0: is equal to OK
           Actual: CANCELLED: 
         Expected: to be called once
           Actual: never called - unsatisfied and active

/[var/local/git/grpc/test/core/transport/chaotic_good/client_transport_test.cc:484](https://cs.corp.google.com/piper///depot/google3/var/local/git/grpc/test/core/transport/chaotic_good/client_transport_test.cc?l=484): Failure
Actual function call count doesn't match EXPECT_CALL(on_done, Call(absl::OkStatus()))...
         Expected: to be called once
           Actual: never called - unsatisfied and active

real 0.24
user 0.00
sys 0.00

2023-10-20 01:50:32,776 FAILED: cmake/build/client_transport_test --gtest_filter=ClientTransportTest.AddOneStreamMultipleMessages  GRPC_POLL_STRATEGY=epoll1 [ret=139, pid=1663532, time=0.3sec]
```
<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/34895/head
nanahpang 1 year ago committed by GitHub
parent 1dbbdb5820
commit 3869ef09a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      build_autogenerated.yaml
  2. 18
      src/core/BUILD
  3. 111
      src/core/ext/transport/chaotic_good/client_transport.cc
  4. 106
      src/core/ext/transport/chaotic_good/client_transport.h
  5. 6
      src/core/ext/transport/chaotic_good/frame.cc
  6. 1
      src/core/ext/transport/chaotic_good/frame.h
  7. 8
      src/core/lib/transport/promise_endpoint.cc
  8. 8
      test/core/transport/chaotic_good/BUILD
  9. 489
      test/core/transport/chaotic_good/client_transport_test.cc

@ -7082,12 +7082,13 @@ targets:
- src/core/ext/transport/chaotic_good/frame_header.h
- src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/event_engine_wakeup_scheduler.h
- src/core/lib/promise/inter_activity_pipe.h
- src/core/lib/promise/join.h
- src/core/lib/promise/mpsc.h
- src/core/lib/promise/try_join.h
- src/core/lib/promise/wait_set.h
- src/core/lib/transport/promise_endpoint.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/promise/test_wakeup_schedulers.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- src/core/ext/transport/chaotic_good/client_transport.cc

@ -6050,6 +6050,7 @@ grpc_cc_library(
"arena",
"bitset",
"chaotic_good_frame_header",
"context",
"no_destruct",
"slice",
"slice_buffer",
@ -6212,29 +6213,42 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/random",
"absl/random:bit_gen_ref",
"absl/status",
"absl/status:statusor",
"absl/types:optional",
"absl/types:variant",
],
language = "c++",
deps = [
"activity",
"arena",
"chaotic_good_frame",
"chaotic_good_frame_header",
"event_engine_wakeup_scheduler",
"for_each",
"grpc_promise_endpoint",
"join",
"if",
"inter_activity_pipe",
"loop",
"match",
"memory_quota",
"mpsc",
"pipe",
"seq",
"poll",
"resource_quota",
"slice",
"slice_buffer",
"try_join",
"try_seq",
"//:exec_ctx",
"//:gpr",
"//:gpr_platform",
"//:grpc_base",
"//:hpack_encoder",
"//:hpack_parser",
"//:ref_counted_ptr",
],
)

@ -20,17 +20,26 @@
#include <string>
#include <tuple>
#include "absl/random/bit_gen_ref.h"
#include "absl/random/random.h"
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/try_join.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
@ -49,9 +58,14 @@ ClientTransport::ClientTransport(
control_endpoint_write_buffer_(SliceBuffer()),
data_endpoint_write_buffer_(SliceBuffer()),
hpack_compressor_(std::make_unique<HPackCompressor>()),
hpack_parser_(std::make_unique<HPackParser>()),
memory_allocator_(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
"client_transport")),
arena_(MakeScopedArena(1024, &memory_allocator_)),
event_engine_(event_engine) {
auto write_loop = Loop([this] {
return Seq(
return TrySeq(
// Get next outgoing frame.
this->outgoing_frames_.Next(),
// Construct data buffers that will be sent to the endpoints.
@ -81,20 +95,16 @@ ClientTransport::ClientTransport(
},
// Write buffers to corresponding endpoints concurrently.
[this]() {
return Join(this->control_endpoint_->Write(
std::move(control_endpoint_write_buffer_)),
this->data_endpoint_->Write(
std::move(data_endpoint_write_buffer_)));
return TryJoin(
control_endpoint_->Write(
std::move(control_endpoint_write_buffer_)),
data_endpoint_->Write(std::move(data_endpoint_write_buffer_)));
},
// Finish writes and return status.
[](std::tuple<absl::Status, absl::Status> ret)
-> LoopCtl<absl::Status> {
// If writes failed, return failure status.
if (!(std::get<0>(ret).ok() || std::get<1>(ret).ok())) {
// TODO(ladynana): handle the promise endpoint write failures with
// closing the transport.
return absl::InternalError("Promise endpoint writes failed.");
}
// Finish writes to difference endpoints and continue the loop.
[]() -> LoopCtl<absl::Status> {
// The write failures will be caught in TrySeq and exit loop.
// Therefore, only need to return Continue() in the last lambda
// function.
return Continue();
});
});
@ -104,7 +114,76 @@ ClientTransport::ClientTransport(
[](absl::Status status) {
GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
status.code() == absl::StatusCode::kInternal);
});
// TODO(ladynana): handle the promise endpoint write failures with
// outgoing_frames.close() once available.
},
// Hold Arena in activity for GetContext<Arena> usage.
arena_.get());
auto read_loop = Loop([this] {
return TrySeq(
// Read frame header from control endpoint.
// TODO(ladynana): remove memcpy in ReadSlice.
this->control_endpoint_->ReadSlice(FrameHeader::frame_header_size_),
// Read different parts of the server frame from control/data endpoints
// based on frame header.
[this](Slice read_buffer) mutable {
frame_header_ = std::make_shared<FrameHeader>(
FrameHeader::Parse(
reinterpret_cast<const uint8_t*>(
GRPC_SLICE_START_PTR(read_buffer.c_slice())))
.value());
// Read header and trailers from control endpoint.
// Read message padding and message from data endpoint.
return TryJoin(
control_endpoint_->Read(frame_header_->GetFrameLength()),
data_endpoint_->Read(frame_header_->message_padding +
frame_header_->message_length));
},
// Construct and send the server frame to corresponding stream.
[this](std::tuple<SliceBuffer, SliceBuffer> ret) mutable {
control_endpoint_read_buffer_ = std::move(std::get<0>(ret));
// Discard message padding and only keep message in data read buffer.
std::get<1>(ret).MoveLastNBytesIntoSliceBuffer(
frame_header_->message_length, data_endpoint_read_buffer_);
ServerFragmentFrame frame;
// Initialized to get this_cpu() info in global_stat().
ExecCtx exec_ctx;
// Deserialize frame from read buffer.
absl::BitGen bitgen;
auto status = frame.Deserialize(hpack_parser_.get(), *frame_header_,
absl::BitGenRef(bitgen),
control_endpoint_read_buffer_);
GPR_ASSERT(status.ok());
// Move message into frame.
frame.message = arena_->MakePooled<Message>(
std::move(data_endpoint_read_buffer_), 0);
auto stream_id = frame.frame_header.stream_id;
{
MutexLock lock(&mu_);
return stream_map_[stream_id]->Push(ServerFrame(std::move(frame)));
}
},
// Check if send frame to corresponding stream successfully.
[](bool ret) -> LoopCtl<absl::Status> {
if (ret) {
// Send incoming frames successfully.
return Continue();
} else {
return absl::InternalError("Send incoming frames failed.");
}
});
});
reader_ = MakeActivity(
// Continuously read next incoming frames from promise endpoints.
std::move(read_loop), EventEngineWakeupScheduler(event_engine_),
[](absl::Status status) {
GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
status.code() == absl::StatusCode::kInternal);
// TODO(ladynana): handle the promise endpoint read failures with
// iterating stream_map_ and close all the pipes once available.
},
// Hold Arena in activity for GetContext<Arena> usage.
arena_.get());
}
} // namespace chaotic_good

@ -17,29 +17,42 @@
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <initializer_list> // IWYU pragma: keep
#include <map>
#include <memory>
#include <tuple>
#include <type_traits>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/support/log.h>
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/for_each.h"
#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/inter_activity_pipe.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/mpsc.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/try_join.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep
#include "src/core/lib/transport/promise_endpoint.h"
@ -58,18 +71,31 @@ class ClientTransport {
if (writer_ != nullptr) {
writer_.reset();
}
if (reader_ != nullptr) {
reader_.reset();
}
}
auto AddStream(CallArgs call_args) {
// At this point, the connection is set up.
// Start sending data frames.
uint32_t stream_id;
InterActivityPipe<ServerFrame, server_frame_queue_size_> pipe_server_frames;
{
MutexLock lock(&mu_);
stream_id = next_stream_id_++;
stream_map_.insert(
std::pair<uint32_t,
std::shared_ptr<InterActivityPipe<
ServerFrame, server_frame_queue_size_>::Sender>>(
stream_id, std::make_shared<InterActivityPipe<
ServerFrame, server_frame_queue_size_>::Sender>(
std::move(pipe_server_frames.sender))));
}
return Seq(
// Continuously send data frame with client to server messages.
ForEach(std::move(*call_args.client_to_server_messages),
return TrySeq(
TryJoin(
// Continuously send client frame with client to server messages.
ForEach(
std::move(*call_args.client_to_server_messages),
[stream_id, initial_frame = true,
client_initial_metadata =
std::move(call_args.client_initial_metadata),
@ -89,7 +115,7 @@ class ClientTransport {
frame.headers = std::move(client_initial_metadata);
initial_frame = false;
}
return Seq(
return TrySeq(
outgoing_frames.Send(ClientFrame(std::move(frame))),
[](bool success) -> absl::Status {
if (!success) {
@ -98,24 +124,86 @@ class ClientTransport {
}
return absl::OkStatus();
});
}));
}),
// Continuously receive server frames from endpoints and save
// results to call_args.
Loop([server_initial_metadata = call_args.server_initial_metadata,
server_to_client_messages =
call_args.server_to_client_messages,
receiver = std::move(pipe_server_frames.receiver)]() mutable {
return TrySeq(
// Receive incoming server frame.
receiver.Next(),
// Save incomming frame results to call_args.
[server_initial_metadata, server_to_client_messages](
absl::optional<ServerFrame> server_frame) mutable {
GPR_ASSERT(server_frame.has_value());
auto frame = std::move(
absl::get<ServerFragmentFrame>(*server_frame));
bool has_headers = (frame.headers != nullptr);
bool has_message = (frame.message != nullptr);
bool has_trailers = (frame.trailers != nullptr);
return TrySeq(
If(
has_headers,
[server_initial_metadata,
headers = std::move(frame.headers)]() mutable {
return server_initial_metadata->Push(
std::move(headers));
},
[] { return false; }),
If(
has_message,
[server_to_client_messages,
message = std::move(frame.message)]() mutable {
return server_to_client_messages->Push(
std::move(message));
},
[] { return false; }),
If(
has_trailers,
[trailers = std::move(frame.trailers)]() mutable
-> LoopCtl<ServerMetadataHandle> {
return std::move(trailers);
},
[]() -> LoopCtl<ServerMetadataHandle> {
return Continue();
}));
});
})),
[](std::tuple<Empty, ServerMetadataHandle> ret) {
return std::move(std::get<1>(ret));
});
}
private:
// Max buffer is set to 4, so that for stream writes each time it will queue
// at most 2 frames.
MpscReceiver<ClientFrame> outgoing_frames_;
Mutex mu_;
uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1;
// Queue size of each stream pipe is set to 2, so that for each stream read it
// will queue at most 2 frames.
static const size_t server_frame_queue_size_ = 2;
// Assigned aligned bytes from setting frame.
size_t aligned_bytes = 64;
Mutex mu_;
uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1;
// Map of stream incoming server frames, key is stream_id.
std::map<uint32_t, std::shared_ptr<InterActivityPipe<
ServerFrame, server_frame_queue_size_>::Sender>>
stream_map_ ABSL_GUARDED_BY(mu_);
ActivityPtr writer_;
ActivityPtr reader_;
std::unique_ptr<PromiseEndpoint> control_endpoint_;
std::unique_ptr<PromiseEndpoint> data_endpoint_;
SliceBuffer control_endpoint_write_buffer_;
SliceBuffer data_endpoint_write_buffer_;
SliceBuffer control_endpoint_read_buffer_;
SliceBuffer data_endpoint_read_buffer_;
std::unique_ptr<HPackCompressor> hpack_compressor_;
std::unique_ptr<HPackParser> hpack_parser_;
std::shared_ptr<FrameHeader> frame_header_;
MemoryAllocator memory_allocator_;
ScopedArenaPtr arena_;
// Use to synchronize writer_ and reader_ activity with outside activities;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
};

@ -32,6 +32,8 @@
#include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
@ -134,7 +136,9 @@ absl::StatusOr<Arena::PoolPtr<Metadata>> ReadMetadata(
absl::BitGenRef bitsrc) {
if (!maybe_slices.ok()) return maybe_slices.status();
auto& slices = *maybe_slices;
Arena::PoolPtr<Metadata> metadata;
auto arena = GetContext<Arena>();
GPR_ASSERT(arena != nullptr);
Arena::PoolPtr<Metadata> metadata = arena->MakePooled<Metadata>(arena);
parser->BeginFrame(
metadata.get(), std::numeric_limits<uint32_t>::max(),
std::numeric_limits<uint32_t>::max(),

@ -97,6 +97,7 @@ struct ServerFragmentFrame final : public FrameInterface {
FrameHeader frame_header;
ServerMetadataHandle headers;
MessageHandle message;
ServerMetadataHandle trailers;
bool operator==(const ServerFragmentFrame& other) const {

@ -46,10 +46,10 @@ PromiseEndpoint::PromiseEndpoint(
}
PromiseEndpoint::~PromiseEndpoint() {
// Last write result has not been polled.
GPR_ASSERT(!write_result_.has_value());
// Last read result has not been polled.
GPR_ASSERT(!read_result_.has_value());
// Promise endpoint close when last write result has not been polled.
write_result_.reset();
// Promise endpoint close when last read result has not been polled.
read_result_.reset();
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&

@ -87,7 +87,9 @@ grpc_cc_test(
srcs = ["client_transport_test.cc"],
external_deps = [
"absl/functional:any_invocable",
"absl/status:statusor",
"absl/strings:str_format",
"absl/types:optional",
"gtest",
],
language = "C++",
@ -95,12 +97,17 @@ grpc_cc_test(
uses_polling = False,
deps = [
"//:grpc",
"//:grpc_public_hdrs",
"//:iomgr_timer",
"//:ref_counted_ptr",
"//src/core:activity",
"//src/core:arena",
"//src/core:chaotic_good_client_transport",
"//src/core:event_engine_wakeup_scheduler",
"//src/core:if",
"//src/core:join",
"//src/core:loop",
"//src/core:map",
"//src/core:memory_quota",
"//src/core:pipe",
"//src/core:resource_quota",
@ -109,6 +116,5 @@ grpc_cc_test(
"//src/core:slice_buffer",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
"//test/core/promise:test_wakeup_schedulers",
],
)

@ -18,36 +18,46 @@
#include <algorithm> // IWYU pragma: keep
#include <memory>
#include <string> // IWYU pragma: keep
#include <tuple>
#include <vector> // IWYU pragma: keep
#include "absl/functional/any_invocable.h"
#include "absl/status/statusor.h" // IWYU pragma: keep
#include "absl/strings/str_format.h" // IWYU pragma: keep
#include "absl/types/optional.h" // IWYU pragma: keep
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice.h> // IWYU pragma: keep
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/grpc.h>
#include <grpc/status.h> // IWYU pragma: keep
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h" // IWYU pragma: keep
#include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/promise/test_wakeup_schedulers.h"
using testing::MockFunction;
using testing::Return;
using testing::Sequence;
using testing::StrictMock;
using testing::WithArgs;
@ -101,29 +111,187 @@ class ClientTransportTest : public ::testing::Test {
return options;
}(),
fuzzing_event_engine::Actions())),
client_transport_(
std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(control_endpoint_ptr_),
SliceBuffer()),
std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(data_endpoint_ptr_),
SliceBuffer()),
std::static_pointer_cast<
grpc_event_engine::experimental::EventEngine>(event_engine_)),
arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)),
pipe_client_to_server_messages_(arena_.get()),
pipe_client_to_server_messages_second_(arena_.get()) {}
std::vector<MessageHandle> CreateMessages(int num_of_messages) {
std::vector<MessageHandle> messages;
for (int i = 0; i < num_of_messages; i++) {
SliceBuffer buffer;
buffer.Append(
Slice::FromCopiedString(absl::StrFormat("test message %d", i)));
auto message = arena_->MakePooled<Message>(std::move(buffer), 0);
messages.push_back(std::move(message));
pipe_server_to_client_messages_(arena_.get()),
pipe_server_intial_metadata_(arena_.get()),
pipe_client_to_server_messages_second_(arena_.get()),
pipe_server_to_client_messages_second_(arena_.get()),
pipe_server_intial_metadata_second_(arena_.get()) {}
// Expect how client transport will read from control/data endpoints with a
// test frame.
void AddReadExpectations(int num_of_streams) {
for (int i = 0; i < num_of_streams; i++) {
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence)
.WillOnce(WithArgs<0, 1>(
[this, i](absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer*
buffer) mutable {
// Construct test frame for EventEngine read: headers (15
// bytes), message(16 bytes), message padding (48 byte),
// trailers (15 bytes).
const std::string frame_header = {
static_cast<char>(0x80), // frame type = fragment
0x03, // flag = has header + has trailer
0x00,
0x00,
static_cast<char>(i + 1), // stream id = 1
0x00,
0x00,
0x00,
0x1a, // header length = 26
0x00,
0x00,
0x00,
0x08, // message length = 8
0x00,
0x00,
0x00,
0x38, // message padding =56
0x00,
0x00,
0x00,
0x0f, // trailer length = 15
0x00,
0x00,
0x00};
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(frame_header));
buffer->Append(std::move(slice));
// Execute read callback later to control when read starts.
if (i == 0) {
read_callback_ = std::move(on_read);
// Return false to mock EventEngine read not finish.
return false;
} else {
return true;
}
}));
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence)
.WillOnce(WithArgs<1>(
[](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Encoded string of header ":path: /demo.Service/Step".
const std::string header = {
0x10, 0x05, 0x3a, 0x70, 0x61, 0x74, 0x68, 0x12, 0x2f,
0x64, 0x65, 0x6d, 0x6f, 0x2e, 0x53, 0x65, 0x72, 0x76,
0x69, 0x63, 0x65, 0x2f, 0x53, 0x74, 0x65, 0x70};
// Encoded string of trailer "grpc-status: 0".
const std::string trailers = {0x10, 0x0b, 0x67, 0x72, 0x70,
0x63, 0x2d, 0x73, 0x74, 0x61,
0x74, 0x75, 0x73, 0x01, 0x30};
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(header + trailers));
buffer->Append(std::move(slice));
return true;
}));
}
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence)
.WillOnce(Return(false));
for (int i = 0; i < num_of_streams; i++) {
EXPECT_CALL(data_endpoint_, Read)
.InSequence(data_endpoint_sequence)
.WillOnce(WithArgs<1>(
[this](grpc_event_engine::experimental::SliceBuffer* buffer) {
const std::string message_padding = {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(message_padding + message_));
buffer->Append(std::move(slice));
return true;
}));
}
return messages;
}
// Initial ClientTransport with read expecations
void InitialClientTransport(int num_of_streams) {
// Read expectaions need to be added before transport initialization since
// reader_ activity loop is started in ClientTransport initialization,
AddReadExpectations(num_of_streams);
client_transport_ = std::make_unique<ClientTransport>(
std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(control_endpoint_ptr_),
SliceBuffer()),
std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(data_endpoint_ptr_), SliceBuffer()),
event_engine_);
}
// Send messages from client to server.
auto SendClientToServerMessages(
Pipe<MessageHandle>& pipe_client_to_server_messages,
int num_of_messages) {
return Loop([&pipe_client_to_server_messages, num_of_messages,
this]() mutable {
bool has_message = (num_of_messages > 0);
return If(
has_message,
Seq(pipe_client_to_server_messages.sender.Push(
arena_->MakePooled<Message>()),
[&num_of_messages]() -> LoopCtl<absl::Status> {
num_of_messages--;
return Continue();
}),
[&pipe_client_to_server_messages]() mutable -> LoopCtl<absl::Status> {
pipe_client_to_server_messages.sender.Close();
return absl::OkStatus();
});
});
}
// Add stream into client transport, and expect return trailers of
// "grpc-status:code".
auto AddStream(CallArgs args, const grpc_status_code trailers) {
return Seq(client_transport_->AddStream(std::move(args)),
[trailers](ServerMetadataHandle ret) {
// AddStream will finish with server trailers:
// "grpc-status:code".
EXPECT_EQ(ret->get(GrpcStatusMetadata()).value(), trailers);
return trailers;
});
}
// Start read from control endpoints.
auto StartRead(const absl::Status& read_status) {
return [read_status, this] {
read_callback_(read_status);
return read_status;
};
}
// Receive messages from server to client.
auto ReceiveServerToClientMessages(
Pipe<ServerMetadataHandle>& pipe_server_intial_metadata,
Pipe<MessageHandle>& pipe_server_to_client_messages) {
return Seq(
// Receive server initial metadata.
Map(pipe_server_intial_metadata.receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
// Expect value: ":path: /demo.Service/Step"
EXPECT_TRUE(r.has_value());
EXPECT_EQ(
r.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"/demo.Service/Step");
return absl::OkStatus();
}),
// Receive server to client messages.
Map(pipe_server_to_client_messages.receiver.Next(),
[this](NextResult<MessageHandle> r) {
EXPECT_TRUE(r.has_value());
EXPECT_EQ(r.value()->payload()->JoinIntoString(), message_);
return absl::OkStatus();
}),
[&pipe_server_intial_metadata,
&pipe_server_to_client_messages]() mutable {
// Close pipes after receive message.
pipe_server_to_client_messages.sender.Close();
pipe_server_intial_metadata.sender.Close();
return absl::OkStatus();
});
}
private:
@ -131,97 +299,65 @@ class ClientTransportTest : public ::testing::Test {
MockEndpoint* data_endpoint_ptr_;
size_t initial_arena_size = 1024;
MemoryAllocator memory_allocator_;
Sequence control_endpoint_sequence;
Sequence data_endpoint_sequence;
protected:
MockEndpoint& control_endpoint_;
MockEndpoint& data_endpoint_;
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
event_engine_;
ClientTransport client_transport_;
std::unique_ptr<ClientTransport> client_transport_;
ScopedArenaPtr arena_;
Pipe<MessageHandle> pipe_client_to_server_messages_;
Pipe<MessageHandle> pipe_server_to_client_messages_;
Pipe<ServerMetadataHandle> pipe_server_intial_metadata_;
// Added for mutliple streams tests.
Pipe<MessageHandle> pipe_client_to_server_messages_second_;
const absl::Status kDummyErrorStatus =
absl::ErrnoToStatus(5566, "just an error");
static constexpr size_t kDummyRequestSize = 5566u;
Pipe<MessageHandle> pipe_server_to_client_messages_second_;
Pipe<ServerMetadataHandle> pipe_server_intial_metadata_second_;
absl::AnyInvocable<void(absl::Status)> read_callback_;
// Added to verify received message payload.
const std::string message_ = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
};
TEST_F(ClientTransportTest, AddOneStream) {
auto messages = CreateMessages(1);
InitialClientTransport(1);
ClientMetadataHandle md;
auto args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
auto args = CallArgs{std::move(md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).WillOnce(Return(true));
EXPECT_CALL(data_endpoint_, Write).WillOnce(Return(true));
auto activity = MakeActivity(
Seq(
// Concurrently: send message into the pipe, and receive from the
// pipe.
Join(Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])),
[this] {
this->pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
client_transport_.AddStream(std::move(args))),
// Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
}),
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
TEST_F(ClientTransportTest, AddOneStreamWithEEFailed) {
auto messages = CreateMessages(1);
ClientMetadataHandle md;
auto args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write)
.WillOnce(
WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(this->kDummyErrorStatus);
return false;
}));
EXPECT_CALL(data_endpoint_, Write)
.WillOnce(
WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(this->kDummyErrorStatus);
return false;
}));
auto activity = MakeActivity(
Seq(
// Concurrently: send message into the pipe, and receive from the
// pipe.
Join(Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])),
[this] {
this->pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
client_transport_.AddStream(std::move(args))),
// Concurrently: write and read messages in client transport.
Join(
// Add first stream with call_args into client transport.
AddStream(std::move(args), GRPC_STATUS_OK),
// Start read from control endpoints.
StartRead(absl::OkStatus()),
// Send messages to call_args.client_to_server_messages pipe,
// which will be eventually sent to control/data endpoints.
SendClientToServerMessages(pipe_client_to_server_messages_, 1),
// Receive messages from control/data endpoints.
ReceiveServerToClientMessages(pipe_server_intial_metadata_,
pipe_server_to_client_messages_)),
// Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status>& ret) {
// TODO(ladynana): change these expectations to errors after the
// writer activity closes transport for EE failures.
EXPECT_TRUE(std::get<0>(ret).ok());
[](const std::tuple<grpc_status_code, absl::Status, absl::Status,
absl::Status>& ret) {
EXPECT_EQ(std::get<0>(ret), GRPC_STATUS_OK);
EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok());
EXPECT_TRUE(std::get<3>(ret).ok());
return absl::OkStatus();
}),
InlineWakeupScheduler(),
EventEngineWakeupScheduler(event_engine_),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
@ -229,89 +365,42 @@ TEST_F(ClientTransportTest, AddOneStreamWithEEFailed) {
}
TEST_F(ClientTransportTest, AddOneStreamMultipleMessages) {
auto messages = CreateMessages(3);
InitialClientTransport(1);
ClientMetadataHandle md;
auto args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
auto args = CallArgs{std::move(md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).Times(3).WillRepeatedly(Return(true));
EXPECT_CALL(data_endpoint_, Write).Times(3).WillRepeatedly(Return(true));
auto activity = MakeActivity(
Seq(
// Concurrently: send messages into the pipe, and receive from the
// pipe.
Join(Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])),
pipe_client_to_server_messages_.sender.Push(
std::move(messages[1])),
pipe_client_to_server_messages_.sender.Push(
std::move(messages[2])),
[this] {
this->pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
client_transport_.AddStream(std::move(args))),
// Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
}),
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
TEST_F(ClientTransportTest, AddMultipleStreams) {
auto messages = CreateMessages(2);
ClientMetadataHandle md;
auto first_stream_args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
auto second_stream_args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).Times(2).WillRepeatedly(Return(true));
EXPECT_CALL(data_endpoint_, Write).Times(2).WillRepeatedly(Return(true));
auto activity = MakeActivity(
Seq(
// Concurrently: send messages into the pipe, and receive from the
// pipe.
// Concurrently: write and read messages in client transport.
Join(
// Send message to first stream pipe.
Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])),
[this] {
pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
// Send message to second stream pipe.
Seq(pipe_client_to_server_messages_second_.sender.Push(
std::move(messages[1])),
[this] {
pipe_client_to_server_messages_second_.sender.Close();
return absl::OkStatus();
}),
// Receive message from first stream pipe.
client_transport_.AddStream(std::move(first_stream_args)),
// Receive message from second stream pipe.
client_transport_.AddStream(std::move(second_stream_args))),
// Add first stream with call_args into client transport.
AddStream(std::move(args), GRPC_STATUS_OK),
// Start read from control endpoints.
StartRead(absl::OkStatus()),
// Send messages to call_args.client_to_server_messages pipe,
// which will be eventually sent to control/data endpoints.
SendClientToServerMessages(pipe_client_to_server_messages_, 3),
// Receive messages from control/data endpoints.
ReceiveServerToClientMessages(pipe_server_intial_metadata_,
pipe_server_to_client_messages_)),
// Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status, absl::Status,
[](const std::tuple<grpc_status_code, absl::Status, absl::Status,
absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_EQ(std::get<0>(ret), GRPC_STATUS_OK);
EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok());
EXPECT_TRUE(std::get<3>(ret).ok());
return absl::OkStatus();
}),
InlineWakeupScheduler(),
EventEngineWakeupScheduler(event_engine_),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
@ -319,59 +408,63 @@ TEST_F(ClientTransportTest, AddMultipleStreams) {
}
TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) {
auto messages = CreateMessages(6);
ClientMetadataHandle md;
auto first_stream_args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
auto second_stream_args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr};
InitialClientTransport(2);
ClientMetadataHandle first_stream_md;
ClientMetadataHandle second_stream_md;
auto first_stream_args =
CallArgs{std::move(first_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
auto second_stream_args =
CallArgs{std::move(second_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_second_.sender,
&pipe_client_to_server_messages_second_.receiver,
&pipe_server_to_client_messages_second_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).Times(6).WillRepeatedly(Return(true));
EXPECT_CALL(data_endpoint_, Write).Times(6).WillRepeatedly(Return(true));
auto activity = MakeActivity(
Seq(
// Concurrently: send messages into the pipe, and receive from the
// pipe.
// Concurrently: write and read messages from client transport.
Join(
// Add first stream with call_args into client transport.
AddStream(std::move(first_stream_args), GRPC_STATUS_OK),
// Start read from control endpoints.
StartRead(absl::OkStatus()),
// Send messages to first stream's
// call_args.client_to_server_messages pipe, which will be
// eventually sent to control/data endpoints.
SendClientToServerMessages(pipe_client_to_server_messages_, 3),
// Receive first stream's messages from control/data endpoints.
ReceiveServerToClientMessages(pipe_server_intial_metadata_,
pipe_server_to_client_messages_)),
Join(
// Send messages to first stream pipe.
Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])),
pipe_client_to_server_messages_.sender.Push(
std::move(messages[1])),
pipe_client_to_server_messages_.sender.Push(
std::move(messages[2])),
[this] {
pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
// Send messages to second stream pipe.
Seq(pipe_client_to_server_messages_second_.sender.Push(
std::move(messages[3])),
pipe_client_to_server_messages_second_.sender.Push(
std::move(messages[4])),
pipe_client_to_server_messages_second_.sender.Push(
std::move(messages[5])),
[this] {
pipe_client_to_server_messages_second_.sender.Close();
return absl::OkStatus();
}),
// Receive messages from first stream pipe.
client_transport_.AddStream(std::move(first_stream_args)),
// Receive messages from second stream pipe.
client_transport_.AddStream(std::move(second_stream_args))),
// Add second stream with call_args into client transport.
AddStream(std::move(second_stream_args), GRPC_STATUS_OK),
// Send messages to second stream's
// call_args.client_to_server_messages pipe, which will be
// eventually sent to control/data endpoints.
SendClientToServerMessages(pipe_client_to_server_messages_second_,
3),
// Receive second stream's messages from control/data endpoints.
ReceiveServerToClientMessages(
pipe_server_intial_metadata_second_,
pipe_server_to_client_messages_second_)),
// Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status, absl::Status,
absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok());
[](const std::tuple<grpc_status_code, absl::Status, absl::Status>&
ret) {
EXPECT_EQ(std::get<0>(ret), GRPC_STATUS_OK);
EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok());
EXPECT_TRUE(std::get<3>(ret).ok());
return absl::OkStatus();
}),
InlineWakeupScheduler(),
EventEngineWakeupScheduler(event_engine_),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();

Loading…
Cancel
Save