[chaotic-good] Client transport write path (#33876)

This is the initial implementation of the chaotic-good client transport
write path. There will be a follow-up PR to fulfill the read path.

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/34276/head
nanahpang 1 year ago committed by GitHub
parent 90d185106a
commit 9907d94da4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      CMakeLists.txt
  2. 29
      build_autogenerated.yaml
  3. 37
      src/core/BUILD
  4. 120
      src/core/ext/transport/chaotic_good/client_transport.cc
  5. 115
      src/core/ext/transport/chaotic_good/client_transport.h
  6. 1
      src/core/ext/transport/chaotic_good/frame.h
  7. 3
      src/core/lib/transport/promise_endpoint.h
  8. 31
      test/core/transport/chaotic_good/BUILD
  9. 394
      test/core/transport/chaotic_good/client_transport_test.cc
  10. 24
      tools/run_tests/generated/tests.json

44
CMakeLists.txt generated

@ -919,6 +919,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx client_ssl_test)
endif()
add_dependencies(buildtests_cxx client_streaming_test)
add_dependencies(buildtests_cxx client_transport_test)
add_dependencies(buildtests_cxx cmdline_test)
add_dependencies(buildtests_cxx codegen_test_full)
add_dependencies(buildtests_cxx codegen_test_minimal)
@ -9195,6 +9196,49 @@ target_link_libraries(client_streaming_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(client_transport_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
src/core/ext/transport/chaotic_good/client_transport.cc
src/core/ext/transport/chaotic_good/frame.cc
src/core/ext/transport/chaotic_good/frame_header.cc
src/core/lib/transport/promise_endpoint.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
test/core/transport/chaotic_good/client_transport_test.cc
)
target_compile_features(client_transport_test PUBLIC cxx_std_14)
target_include_directories(client_transport_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(client_transport_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)

@ -6935,6 +6935,35 @@ targets:
- grpc_authorization_provider
- grpc_unsecure
- grpc_test_util
- name: client_transport_test
gtest: true
build: test
language: c++
headers:
- src/core/ext/transport/chaotic_good/client_transport.h
- src/core/ext/transport/chaotic_good/frame.h
- src/core/ext/transport/chaotic_good/frame_header.h
- src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/event_engine_wakeup_scheduler.h
- src/core/lib/promise/join.h
- src/core/lib/promise/mpsc.h
- src/core/lib/promise/wait_set.h
- src/core/lib/transport/promise_endpoint.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/promise/test_wakeup_schedulers.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- src/core/ext/transport/chaotic_good/client_transport.cc
- src/core/ext/transport/chaotic_good/frame.cc
- src/core/ext/transport/chaotic_good/frame_header.cc
- src/core/lib/transport/promise_endpoint.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
- test/core/transport/chaotic_good/client_transport_test.cc
deps:
- gtest
- protobuf
- grpc_test_util
uses_polling: false
- name: cmdline_test
gtest: true
build: test

@ -5976,6 +5976,43 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "chaotic_good_client_transport",
srcs = [
"ext/transport/chaotic_good/client_transport.cc",
],
hdrs = [
"ext/transport/chaotic_good/client_transport.h",
],
external_deps = [
"absl/base:core_headers",
"absl/status",
"absl/status:statusor",
"absl/types:variant",
],
language = "c++",
deps = [
"activity",
"chaotic_good_frame",
"chaotic_good_frame_header",
"event_engine_wakeup_scheduler",
"for_each",
"grpc_promise_endpoint",
"join",
"loop",
"match",
"mpsc",
"pipe",
"seq",
"slice",
"slice_buffer",
"//:gpr",
"//:gpr_platform",
"//:grpc_base",
"//:hpack_encoder",
],
)
### UPB Targets
grpc_upb_proto_library(

@ -0,0 +1,120 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/ext/transport/chaotic_good/client_transport.h"
#include <memory>
#include <string>
#include <tuple>
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/promise_endpoint.h"
namespace grpc_core {
namespace chaotic_good {
ClientTransport::ClientTransport(
std::unique_ptr<PromiseEndpoint> control_endpoint,
std::unique_ptr<PromiseEndpoint> data_endpoint,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)
: outgoing_frames_(MpscReceiver<ClientFrame>(4)),
control_endpoint_(std::move(control_endpoint)),
data_endpoint_(std::move(data_endpoint)),
control_endpoint_write_buffer_(SliceBuffer()),
data_endpoint_write_buffer_(SliceBuffer()),
hpack_compressor_(std::make_unique<HPackCompressor>()),
event_engine_(event_engine) {
auto write_loop = Loop([this] {
return Seq(
// Get next outgoing frame.
this->outgoing_frames_.Next(),
// Construct data buffers that will be sent to the endpoints.
[this](ClientFrame client_frame) {
MatchMutable(
&client_frame,
[this](ClientFragmentFrame* frame) mutable {
control_endpoint_write_buffer_.Append(
frame->Serialize(hpack_compressor_.get()));
if (frame->message != nullptr) {
auto frame_header =
FrameHeader::Parse(
reinterpret_cast<const uint8_t*>(GRPC_SLICE_START_PTR(
control_endpoint_write_buffer_.c_slice_buffer()
->slices[0])))
.value();
std::string message_padding(frame_header.message_padding,
'0');
Slice slice(grpc_slice_from_cpp_string(message_padding));
// Append message payload to data_endpoint_buffer.
data_endpoint_write_buffer_.Append(std::move(slice));
// Append message payload to data_endpoint_buffer.
frame->message->payload()->MoveFirstNBytesIntoSliceBuffer(
frame->message->payload()->Length(),
data_endpoint_write_buffer_);
}
},
[this](CancelFrame* frame) mutable {
control_endpoint_write_buffer_.Append(
frame->Serialize(hpack_compressor_.get()));
});
return absl::OkStatus();
},
// Write buffers to corresponding endpoints concurrently.
[this]() {
return Join(this->control_endpoint_->Write(
std::move(control_endpoint_write_buffer_)),
this->data_endpoint_->Write(
std::move(data_endpoint_write_buffer_)));
},
// Finish writes and return status.
[](std::tuple<absl::Status, absl::Status> ret)
-> LoopCtl<absl::Status> {
// If writes failed, return failure status.
if (!(std::get<0>(ret).ok() || std::get<1>(ret).ok())) {
// TODO(ladynana): handle the promise endpoint write failures with
// closing the transport.
return absl::InternalError("Promise endpoint writes failed.");
}
return Continue();
});
});
writer_ = MakeActivity(
// Continuously write next outgoing frames to promise endpoints.
std::move(write_loop), EventEngineWakeupScheduler(event_engine_),
[](absl::Status status) {
GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
status.code() == absl::StatusCode::kInternal);
});
}
} // namespace chaotic_good
} // namespace grpc_core

@ -0,0 +1,115 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CLIENT_TRANSPORT_H
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CLIENT_TRANSPORT_H
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <initializer_list> // IWYU pragma: keep
#include <memory>
#include <type_traits>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/for_each.h"
#include "src/core/lib/promise/mpsc.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/promise_endpoint.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
namespace chaotic_good {
class ClientTransport {
public:
ClientTransport(std::unique_ptr<PromiseEndpoint> control_endpoint,
std::unique_ptr<PromiseEndpoint> data_endpoint,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine);
~ClientTransport() {
if (writer_ != nullptr) {
writer_.reset();
}
}
auto AddStream(CallArgs call_args) {
// At this point, the connection is set up.
// Start sending data frames.
uint64_t stream_id;
{
MutexLock lock(&mu_);
stream_id = next_stream_id_++;
}
return Seq(
// Continuously send data frame with client to server messages.
ForEach(std::move(*call_args.client_to_server_messages),
[stream_id, initial_frame = true,
client_initial_metadata =
std::move(call_args.client_initial_metadata),
outgoing_frames = outgoing_frames_.MakeSender()](
MessageHandle result) mutable {
ClientFragmentFrame frame;
frame.stream_id = stream_id;
frame.message = std::move(result);
if (initial_frame) {
// Send initial frame with client intial metadata.
frame.headers = std::move(client_initial_metadata);
initial_frame = false;
}
return Seq(
outgoing_frames.Send(ClientFrame(std::move(frame))),
[](bool success) -> absl::Status {
if (!success) {
return absl::InternalError(
"Send frame to outgoing_frames failed.");
}
return absl::OkStatus();
});
}));
}
private:
// Max buffer is set to 4, so that for stream writes each time it will queue
// at most 2 frames.
MpscReceiver<ClientFrame> outgoing_frames_;
Mutex mu_;
uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1;
ActivityPtr writer_;
ActivityPtr reader_;
std::unique_ptr<PromiseEndpoint> control_endpoint_;
std::unique_ptr<PromiseEndpoint> data_endpoint_;
SliceBuffer control_endpoint_write_buffer_;
SliceBuffer data_endpoint_write_buffer_;
std::unique_ptr<HPackCompressor> hpack_compressor_;
// Use to synchronize writer_ and reader_ activity with outside activities;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
};
} // namespace chaotic_good
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CLIENT_TRANSPORT_H

@ -75,6 +75,7 @@ struct ClientFragmentFrame final : public FrameInterface {
uint32_t stream_id;
ClientMetadataHandle headers;
MessageHandle message;
bool end_of_stream = false;
bool operator==(const ClientFragmentFrame& other) const {

@ -51,6 +51,9 @@ class PromiseEndpoint {
endpoint,
SliceBuffer already_received);
~PromiseEndpoint();
/// Prevent copying and moving of PromiseEndpoint.
PromiseEndpoint(const PromiseEndpoint&) = delete;
PromiseEndpoint(PromiseEndpoint&&) = delete;
// Returns a promise that resolves to a `absl::Status` indicating the result
// of the write operation.

@ -77,3 +77,34 @@ grpc_fuzzer(
"//test/core/promise:test_context",
],
)
grpc_cc_test(
name = "client_transport_test",
srcs = ["client_transport_test.cc"],
external_deps = [
"absl/functional:any_invocable",
"absl/strings:str_format",
"gtest",
],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = [
"//:grpc",
"//:iomgr_timer",
"//:ref_counted_ptr",
"//src/core:activity",
"//src/core:arena",
"//src/core:chaotic_good_client_transport",
"//src/core:join",
"//src/core:memory_quota",
"//src/core:pipe",
"//src/core:resource_quota",
"//src/core:seq",
"//src/core:slice",
"//src/core:slice_buffer",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
"//test/core/promise:test_wakeup_schedulers",
],
)

@ -0,0 +1,394 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/ext/transport/chaotic_good/client_transport.h"
// IWYU pragma: no_include <sys/socket.h>
#include <stdio.h>
#include <algorithm> // IWYU pragma: keep
#include <memory>
#include <tuple>
#include <vector> // IWYU pragma: keep
#include "absl/functional/any_invocable.h"
#include "absl/strings/str_format.h" // IWYU pragma: keep
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/grpc.h>
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/promise/test_wakeup_schedulers.h"
using testing::MockFunction;
using testing::Return;
using testing::StrictMock;
using testing::WithArgs;
namespace grpc_core {
namespace chaotic_good {
namespace testing {
class MockEndpoint
: public grpc_event_engine::experimental::EventEngine::Endpoint {
public:
MOCK_METHOD(
bool, Read,
(absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer* buffer,
const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
args),
(override));
MOCK_METHOD(
bool, Write,
(absl::AnyInvocable<void(absl::Status)> on_writable,
grpc_event_engine::experimental::SliceBuffer* data,
const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
args),
(override));
MOCK_METHOD(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
GetPeerAddress, (), (const, override));
MOCK_METHOD(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
GetLocalAddress, (), (const, override));
};
class ClientTransportTest : public ::testing::Test {
public:
ClientTransportTest()
: control_endpoint_ptr_(new StrictMock<MockEndpoint>()),
data_endpoint_ptr_(new StrictMock<MockEndpoint>()),
memory_allocator_(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
"test")),
control_endpoint_(*control_endpoint_ptr_),
data_endpoint_(*data_endpoint_ptr_),
event_engine_(std::make_shared<
grpc_event_engine::experimental::FuzzingEventEngine>(
[]() {
grpc_timer_manager_set_threading(false);
grpc_event_engine::experimental::FuzzingEventEngine::Options
options;
return options;
}(),
fuzzing_event_engine::Actions())),
client_transport_(
std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(control_endpoint_ptr_),
SliceBuffer()),
std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(data_endpoint_ptr_),
SliceBuffer()),
std::static_pointer_cast<
grpc_event_engine::experimental::EventEngine>(event_engine_)),
arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)),
pipe_client_to_server_messages_(arena_.get()),
pipe_client_to_server_messages_second_(arena_.get()) {}
std::vector<MessageHandle> CreateMessages(int num_of_messages) {
std::vector<MessageHandle> messages;
for (int i = 0; i < num_of_messages; i++) {
SliceBuffer buffer;
buffer.Append(
Slice::FromCopiedString(absl::StrFormat("test message %d", i)));
auto message = arena_->MakePooled<Message>(std::move(buffer), 0);
messages.push_back(std::move(message));
}
return messages;
}
private:
MockEndpoint* control_endpoint_ptr_;
MockEndpoint* data_endpoint_ptr_;
size_t initial_arena_size = 1024;
MemoryAllocator memory_allocator_;
protected:
MockEndpoint& control_endpoint_;
MockEndpoint& data_endpoint_;
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
event_engine_;
ClientTransport client_transport_;
ScopedArenaPtr arena_;
Pipe<MessageHandle> pipe_client_to_server_messages_;
// Added for mutliple streams tests.
Pipe<MessageHandle> pipe_client_to_server_messages_second_;
const absl::Status kDummyErrorStatus =
absl::ErrnoToStatus(5566, "just an error");
static constexpr size_t kDummyRequestSize = 5566u;
};
TEST_F(ClientTransportTest, AddOneStream) {
auto messages = CreateMessages(1);
ClientMetadataHandle md;
auto args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).WillOnce(Return(true));
EXPECT_CALL(data_endpoint_, Write).WillOnce(Return(true));
auto activity = MakeActivity(
Seq(
// Concurrently: send message into the pipe, and receive from the
// pipe.
Join(Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])),
[this] {
this->pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
client_transport_.AddStream(std::move(args))),
// Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
}),
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
TEST_F(ClientTransportTest, AddOneStreamWithEEFailed) {
auto messages = CreateMessages(1);
ClientMetadataHandle md;
auto args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write)
.WillOnce(
WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(this->kDummyErrorStatus);
return false;
}));
EXPECT_CALL(data_endpoint_, Write)
.WillOnce(
WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(this->kDummyErrorStatus);
return false;
}));
auto activity = MakeActivity(
Seq(
// Concurrently: send message into the pipe, and receive from the
// pipe.
Join(Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])),
[this] {
this->pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
client_transport_.AddStream(std::move(args))),
// Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status>& ret) {
// TODO(ladynana): change these expectations to errors after the
// writer activity closes transport for EE failures.
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
}),
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
TEST_F(ClientTransportTest, AddOneStreamMultipleMessages) {
auto messages = CreateMessages(3);
ClientMetadataHandle md;
auto args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).Times(3).WillRepeatedly(Return(true));
EXPECT_CALL(data_endpoint_, Write).Times(3).WillRepeatedly(Return(true));
auto activity = MakeActivity(
Seq(
// Concurrently: send messages into the pipe, and receive from the
// pipe.
Join(Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])),
pipe_client_to_server_messages_.sender.Push(
std::move(messages[1])),
pipe_client_to_server_messages_.sender.Push(
std::move(messages[2])),
[this] {
this->pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
client_transport_.AddStream(std::move(args))),
// Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
}),
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
TEST_F(ClientTransportTest, AddMultipleStreams) {
auto messages = CreateMessages(2);
ClientMetadataHandle md;
auto first_stream_args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
auto second_stream_args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).Times(2).WillRepeatedly(Return(true));
EXPECT_CALL(data_endpoint_, Write).Times(2).WillRepeatedly(Return(true));
auto activity = MakeActivity(
Seq(
// Concurrently: send messages into the pipe, and receive from the
// pipe.
Join(
// Send message to first stream pipe.
Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])),
[this] {
pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
// Send message to second stream pipe.
Seq(pipe_client_to_server_messages_second_.sender.Push(
std::move(messages[1])),
[this] {
pipe_client_to_server_messages_second_.sender.Close();
return absl::OkStatus();
}),
// Receive message from first stream pipe.
client_transport_.AddStream(std::move(first_stream_args)),
// Receive message from second stream pipe.
client_transport_.AddStream(std::move(second_stream_args))),
// Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status, absl::Status,
absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok());
EXPECT_TRUE(std::get<3>(ret).ok());
return absl::OkStatus();
}),
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) {
auto messages = CreateMessages(6);
ClientMetadataHandle md;
auto first_stream_args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
auto second_stream_args = CallArgs{
std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).Times(6).WillRepeatedly(Return(true));
EXPECT_CALL(data_endpoint_, Write).Times(6).WillRepeatedly(Return(true));
auto activity = MakeActivity(
Seq(
// Concurrently: send messages into the pipe, and receive from the
// pipe.
Join(
// Send messages to first stream pipe.
Seq(pipe_client_to_server_messages_.sender.Push(
std::move(messages[0])),
pipe_client_to_server_messages_.sender.Push(
std::move(messages[1])),
pipe_client_to_server_messages_.sender.Push(
std::move(messages[2])),
[this] {
pipe_client_to_server_messages_.sender.Close();
return absl::OkStatus();
}),
// Send messages to second stream pipe.
Seq(pipe_client_to_server_messages_second_.sender.Push(
std::move(messages[3])),
pipe_client_to_server_messages_second_.sender.Push(
std::move(messages[4])),
pipe_client_to_server_messages_second_.sender.Push(
std::move(messages[5])),
[this] {
pipe_client_to_server_messages_second_.sender.Close();
return absl::OkStatus();
}),
// Receive messages from first stream pipe.
client_transport_.AddStream(std::move(first_stream_args)),
// Receive messages from second stream pipe.
client_transport_.AddStream(std::move(second_stream_args))),
// Once complete, verify successful sending and the received value.
[](const std::tuple<absl::Status, absl::Status, absl::Status,
absl::Status>& ret) {
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok());
EXPECT_TRUE(std::get<3>(ret).ok());
return absl::OkStatus();
}),
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
} // namespace testing
} // namespace chaotic_good
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
// Must call to create default EventEngine.
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
}

@ -2183,6 +2183,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "client_transport_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save