[chaotic-good] Add client transport error handling. (#34611)

This is a follow-up PR of #34191, which handles the error condition of
endpoints failed to write/read in chaotic-good client transport.

This PR needs to be merged after #34191.

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/34905/head
nanahpang 1 year ago committed by GitHub
parent e58268525a
commit 1e15d00ec4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      CMakeLists.txt
  2. 30
      build_autogenerated.yaml
  3. 1
      src/core/BUILD
  4. 19
      src/core/ext/transport/chaotic_good/client_transport.cc
  5. 92
      src/core/ext/transport/chaotic_good/client_transport.h
  6. 11
      src/core/lib/promise/inter_activity_pipe.h
  7. 10
      src/core/lib/promise/mpsc.h
  8. 38
      test/core/transport/chaotic_good/BUILD
  9. 441
      test/core/transport/chaotic_good/client_transport_error_test.cc
  10. 24
      tools/run_tests/generated/tests.json

44
CMakeLists.txt generated

@ -957,6 +957,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx client_ssl_test)
endif()
add_dependencies(buildtests_cxx client_streaming_test)
add_dependencies(buildtests_cxx client_transport_error_test)
add_dependencies(buildtests_cxx client_transport_test)
add_dependencies(buildtests_cxx cmdline_test)
add_dependencies(buildtests_cxx codegen_test_full)
@ -9405,6 +9406,49 @@ target_link_libraries(client_streaming_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(client_transport_error_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
src/core/ext/transport/chaotic_good/client_transport.cc
src/core/ext/transport/chaotic_good/frame.cc
src/core/ext/transport/chaotic_good/frame_header.cc
src/core/lib/transport/promise_endpoint.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
test/core/transport/chaotic_good/client_transport_error_test.cc
)
target_compile_features(client_transport_error_test PUBLIC cxx_std_14)
target_include_directories(client_transport_error_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(client_transport_error_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)

@ -7093,6 +7093,36 @@ targets:
- grpc_authorization_provider
- grpc_unsecure
- grpc_test_util
- name: client_transport_error_test
gtest: true
build: test
language: c++
headers:
- src/core/ext/transport/chaotic_good/client_transport.h
- src/core/ext/transport/chaotic_good/frame.h
- src/core/ext/transport/chaotic_good/frame_header.h
- src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/event_engine_wakeup_scheduler.h
- src/core/lib/promise/inter_activity_pipe.h
- src/core/lib/promise/join.h
- src/core/lib/promise/mpsc.h
- src/core/lib/promise/try_join.h
- src/core/lib/promise/wait_set.h
- src/core/lib/transport/promise_endpoint.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- src/core/ext/transport/chaotic_good/client_transport.cc
- src/core/ext/transport/chaotic_good/frame.cc
- src/core/ext/transport/chaotic_good/frame_header.cc
- src/core/lib/transport/promise_endpoint.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
- test/core/transport/chaotic_good/client_transport_error_test.cc
deps:
- gtest
- protobuf
- grpc_test_util
uses_polling: false
- name: client_transport_test
gtest: true
build: test

@ -6227,6 +6227,7 @@ grpc_cc_library(
"arena",
"chaotic_good_frame",
"chaotic_good_frame_header",
"context",
"event_engine_wakeup_scheduler",
"for_each",
"grpc_promise_endpoint",

@ -63,6 +63,7 @@ ClientTransport::ClientTransport(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
"client_transport")),
arena_(MakeScopedArena(1024, &memory_allocator_)),
context_(arena_.get()),
event_engine_(event_engine) {
auto write_loop = Loop([this] {
return TrySeq(
@ -111,11 +112,10 @@ ClientTransport::ClientTransport(
writer_ = MakeActivity(
// Continuously write next outgoing frames to promise endpoints.
std::move(write_loop), EventEngineWakeupScheduler(event_engine_),
[](absl::Status status) {
GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
status.code() == absl::StatusCode::kInternal);
// TODO(ladynana): handle the promise endpoint write failures with
// outgoing_frames.close() once available.
[this](absl::Status status) {
if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) {
this->AbortWithError();
}
},
// Hold Arena in activity for GetContext<Arena> usage.
arena_.get());
@ -176,11 +176,10 @@ ClientTransport::ClientTransport(
reader_ = MakeActivity(
// Continuously read next incoming frames from promise endpoints.
std::move(read_loop), EventEngineWakeupScheduler(event_engine_),
[](absl::Status status) {
GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
status.code() == absl::StatusCode::kInternal);
// TODO(ladynana): handle the promise endpoint read failures with
// iterating stream_map_ and close all the pipes once available.
[this](absl::Status status) {
if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) {
this->AbortWithError();
}
},
// Hold Arena in activity for GetContext<Arena> usage.
arena_.get());

@ -34,7 +34,6 @@
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/support/log.h>
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
@ -42,6 +41,7 @@
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/for_each.h"
#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/inter_activity_pipe.h"
@ -75,6 +75,19 @@ class ClientTransport {
reader_.reset();
}
}
void AbortWithError() {
// Mark transport as unavailable when the endpoint write/read failed.
// Close all the available pipes.
if (!outgoing_frames_.IsClosed()) {
outgoing_frames_.MarkClosed();
}
MutexLock lock(&mu_);
for (const auto& pair : stream_map_) {
if (!pair.second->IsClose()) {
pair.second->MarkClose();
}
}
}
auto AddStream(CallArgs call_args) {
// At this point, the connection is set up.
// Start sending data frames.
@ -119,8 +132,11 @@ class ClientTransport {
outgoing_frames.Send(ClientFrame(std::move(frame))),
[](bool success) -> absl::Status {
if (!success) {
return absl::InternalError(
"Send frame to outgoing_frames failed.");
// TODO(ladynana): propagate the actual error message
// from EventEngine.
return absl::UnavailableError(
"Transport closed due to endpoint write/read "
"failed.");
}
return absl::OkStatus();
});
@ -137,38 +153,51 @@ class ClientTransport {
// Save incomming frame results to call_args.
[server_initial_metadata, server_to_client_messages](
absl::optional<ServerFrame> server_frame) mutable {
GPR_ASSERT(server_frame.has_value());
auto frame = std::move(
absl::get<ServerFragmentFrame>(*server_frame));
bool transport_closed = false;
ServerFragmentFrame frame;
if (!server_frame.has_value()) {
// Incoming server frame pipe is closed, which only
// happens when transport is aborted.
transport_closed = true;
} else {
frame = std::move(
absl::get<ServerFragmentFrame>(*server_frame));
};
bool has_headers = (frame.headers != nullptr);
bool has_message = (frame.message != nullptr);
bool has_trailers = (frame.trailers != nullptr);
return TrySeq(
If(
has_headers,
[server_initial_metadata,
headers = std::move(frame.headers)]() mutable {
return server_initial_metadata->Push(
std::move(headers));
},
[] { return false; }),
If(
has_message,
[server_to_client_messages,
message = std::move(frame.message)]() mutable {
return server_to_client_messages->Push(
std::move(message));
},
[] { return false; }),
If(
has_trailers,
[trailers = std::move(frame.trailers)]() mutable
-> LoopCtl<ServerMetadataHandle> {
return std::move(trailers);
},
[]() -> LoopCtl<ServerMetadataHandle> {
return Continue();
}));
If((!transport_closed) && has_headers,
[server_initial_metadata,
headers = std::move(frame.headers)]() mutable {
return server_initial_metadata->Push(
std::move(headers));
},
[] { return false; }),
If((!transport_closed) && has_message,
[server_to_client_messages,
message = std::move(frame.message)]() mutable {
return server_to_client_messages->Push(
std::move(message));
},
[] { return false; }),
If((!transport_closed) && has_trailers,
[trailers = std::move(frame.trailers)]() mutable
-> LoopCtl<ServerMetadataHandle> {
return std::move(trailers);
},
[transport_closed]()
-> LoopCtl<ServerMetadataHandle> {
if (transport_closed) {
// TODO(ladynana): propagate the actual error
// message from EventEngine.
return ServerMetadataFromStatus(
absl::UnavailableError(
"Transport closed due to endpoint "
"write/read failed."));
}
return Continue();
}));
});
})),
[](std::tuple<Empty, ServerMetadataHandle> ret) {
@ -204,6 +233,7 @@ class ClientTransport {
std::shared_ptr<FrameHeader> frame_header_;
MemoryAllocator memory_allocator_;
ScopedArenaPtr arena_;
promise_detail::Context<Arena> context_;
// Use to synchronize writer_ and reader_ activity with outside activities;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
};

@ -83,6 +83,11 @@ class InterActivityPipe {
on_available.Wakeup();
}
bool IsClosed() {
MutexLock lock(&mu_);
return closed_;
}
private:
Mutex mu_;
std::array<T, kQueueSize> queue_ ABSL_GUARDED_BY(mu_);
@ -108,6 +113,12 @@ class InterActivityPipe {
if (center_ != nullptr) center_->MarkClosed();
}
bool IsClose() { return center_->IsClosed(); }
void MarkClose() {
if (center_ != nullptr) center_->MarkClosed();
}
auto Push(T value) {
return [center = center_, value = std::move(value)]() mutable {
return center->Push(value);

@ -107,6 +107,12 @@ class Center : public RefCounted<Center<T>> {
receiver_closed_ = true;
}
// Return whether the receiver is closed.
bool IsClosed() {
MutexLock lock(&mu_);
return receiver_closed_;
}
private:
Mutex mu_;
const size_t max_queued_;
@ -164,6 +170,10 @@ class MpscReceiver {
~MpscReceiver() {
if (center_ != nullptr) center_->ReceiverClosed();
}
bool IsClosed() { return center_->IsClosed(); }
void MarkClosed() {
if (center_ != nullptr) center_->ReceiverClosed();
}
MpscReceiver(const MpscReceiver&) = delete;
MpscReceiver& operator=(const MpscReceiver&) = delete;
// Only movable until it's first polled, and so we don't need to contend with

@ -118,3 +118,41 @@ grpc_cc_test(
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
],
)
grpc_cc_test(
name = "client_transport_error_test",
srcs = ["client_transport_error_test.cc"],
external_deps = [
"absl/functional:any_invocable",
"absl/status",
"absl/status:statusor",
"absl/strings:str_format",
"absl/types:optional",
"gtest",
],
language = "C++",
uses_event_engine = False,
uses_polling = False,
deps = [
"//:grpc",
"//:grpc_public_hdrs",
"//:iomgr_timer",
"//:ref_counted_ptr",
"//src/core:activity",
"//src/core:arena",
"//src/core:chaotic_good_client_transport",
"//src/core:event_engine_wakeup_scheduler",
"//src/core:grpc_promise_endpoint",
"//src/core:if",
"//src/core:join",
"//src/core:loop",
"//src/core:memory_quota",
"//src/core:pipe",
"//src/core:resource_quota",
"//src/core:seq",
"//src/core:slice",
"//src/core:slice_buffer",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
],
)

@ -0,0 +1,441 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/status/status.h"
#include "src/core/ext/transport/chaotic_good/client_transport.h"
#include "src/core/lib/transport/promise_endpoint.h"
#include "src/core/lib/transport/transport.h"
// IWYU pragma: no_include <sys/socket.h>
#include <stddef.h>
#include <algorithm> // IWYU pragma: keep
#include <memory>
#include <string> // IWYU pragma: keep
#include <tuple>
#include <utility>
#include <vector> // IWYU pragma: keep
#include "absl/functional/any_invocable.h"
#include "absl/status/statusor.h" // IWYU pragma: keep
#include "absl/strings/str_format.h" // IWYU pragma: keep
#include "absl/types/optional.h" // IWYU pragma: keep
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice.h> // IWYU pragma: keep
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/grpc.h>
#include <grpc/status.h> // IWYU pragma: keep
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h" // IWYU pragma: keep
#include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
using testing::MockFunction;
using testing::Return;
using testing::Sequence;
using testing::StrictMock;
using testing::WithArgs;
namespace grpc_core {
namespace chaotic_good {
namespace testing {
class MockEndpoint
: public grpc_event_engine::experimental::EventEngine::Endpoint {
public:
MOCK_METHOD(
bool, Read,
(absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer* buffer,
const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
args),
(override));
MOCK_METHOD(
bool, Write,
(absl::AnyInvocable<void(absl::Status)> on_writable,
grpc_event_engine::experimental::SliceBuffer* data,
const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
args),
(override));
MOCK_METHOD(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
GetPeerAddress, (), (const, override));
MOCK_METHOD(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
GetLocalAddress, (), (const, override));
};
class ClientTransportTest : public ::testing::Test {
public:
ClientTransportTest()
: control_endpoint_ptr_(new StrictMock<MockEndpoint>()),
data_endpoint_ptr_(new StrictMock<MockEndpoint>()),
memory_allocator_(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
"test")),
control_endpoint_(*control_endpoint_ptr_),
data_endpoint_(*data_endpoint_ptr_),
event_engine_(std::make_shared<
grpc_event_engine::experimental::FuzzingEventEngine>(
[]() {
grpc_timer_manager_set_threading(false);
grpc_event_engine::experimental::FuzzingEventEngine::Options
options;
return options;
}(),
fuzzing_event_engine::Actions())),
arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)),
pipe_client_to_server_messages_(arena_.get()),
pipe_server_to_client_messages_(arena_.get()),
pipe_server_intial_metadata_(arena_.get()),
pipe_client_to_server_messages_second_(arena_.get()),
pipe_server_to_client_messages_second_(arena_.get()),
pipe_server_intial_metadata_second_(arena_.get()) {}
// Initial ClientTransport with read expecations
void InitialClientTransport() {
client_transport_ = std::make_unique<ClientTransport>(
std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(control_endpoint_ptr_),
SliceBuffer()),
std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(data_endpoint_ptr_), SliceBuffer()),
event_engine_);
}
// Send messages from client to server.
auto SendClientToServerMessages(
Pipe<MessageHandle>& pipe_client_to_server_messages,
int num_of_messages) {
return Loop([&pipe_client_to_server_messages, num_of_messages,
this]() mutable {
bool has_message = (num_of_messages > 0);
return If(
has_message,
Seq(pipe_client_to_server_messages.sender.Push(
arena_->MakePooled<Message>()),
[&num_of_messages]() -> LoopCtl<absl::Status> {
num_of_messages--;
return Continue();
}),
[&pipe_client_to_server_messages]() mutable -> LoopCtl<absl::Status> {
pipe_client_to_server_messages.sender.Close();
return absl::OkStatus();
});
});
}
// Add stream into client transport, and expect return trailers of
// "grpc-status:code".
auto AddStream(CallArgs args) {
return client_transport_->AddStream(std::move(args));
}
private:
MockEndpoint* control_endpoint_ptr_;
MockEndpoint* data_endpoint_ptr_;
size_t initial_arena_size = 1024;
MemoryAllocator memory_allocator_;
protected:
MockEndpoint& control_endpoint_;
MockEndpoint& data_endpoint_;
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
event_engine_;
std::unique_ptr<ClientTransport> client_transport_;
ScopedArenaPtr arena_;
Pipe<MessageHandle> pipe_client_to_server_messages_;
Pipe<MessageHandle> pipe_server_to_client_messages_;
Pipe<ServerMetadataHandle> pipe_server_intial_metadata_;
// Added for mutliple streams tests.
Pipe<MessageHandle> pipe_client_to_server_messages_second_;
Pipe<MessageHandle> pipe_server_to_client_messages_second_;
Pipe<ServerMetadataHandle> pipe_server_intial_metadata_second_;
absl::AnyInvocable<void(absl::Status)> read_callback_;
Sequence control_endpoint_sequence_;
Sequence data_endpoint_sequence_;
// Added to verify received message payload.
const std::string message_ = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
};
TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) {
// Mock write failed and read is pending.
EXPECT_CALL(control_endpoint_, Write)
.WillOnce(
WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(absl::InternalError("control endpoint write failed."));
return false;
}));
EXPECT_CALL(data_endpoint_, Write)
.WillOnce(
WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(absl::InternalError("data endpoint write failed."));
return false;
}));
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence_)
.WillOnce(Return(false));
InitialClientTransport();
ClientMetadataHandle md;
auto args = CallArgs{std::move(md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
auto activity = MakeActivity(
Seq(
// Concurrently: write and read messages in client transport.
Join(
// Add first stream with call_args into client transport.
// Expect return trailers "grpc-status:unavailable".
AddStream(std::move(args)),
// Send messages to call_args.client_to_server_messages pipe,
// which will be eventually sent to control/data endpoints.
SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
// Once complete, verify successful sending and the received value.
[](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
GRPC_STATUS_UNAVAILABLE);
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
}),
EventEngineWakeupScheduler(event_engine_),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
TEST_F(ClientTransportTest, AddOneStreamWithReadFailed) {
// Mock read failed.
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence_)
.WillOnce(WithArgs<0>(
[](absl::AnyInvocable<void(absl::Status)> on_read) mutable {
on_read(absl::InternalError("control endpoint read failed."));
// Return false to mock EventEngine read not finish.
return false;
}));
InitialClientTransport();
ClientMetadataHandle md;
auto args = CallArgs{std::move(md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
auto activity = MakeActivity(
Seq(
// Concurrently: write and read messages in client transport.
Join(
// Add first stream with call_args into client transport.
// Expect return trailers "grpc-status:unavailable".
AddStream(std::move(args)),
// Send messages to call_args.client_to_server_messages pipe.
SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
// Once complete, verify successful sending and the received value.
[](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
GRPC_STATUS_UNAVAILABLE);
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
}),
EventEngineWakeupScheduler(event_engine_),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) {
// Mock write failed at first stream and second stream's write will fail too.
EXPECT_CALL(control_endpoint_, Write)
.Times(1)
.WillRepeatedly(
WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(absl::InternalError("control endpoint write failed."));
return false;
}));
EXPECT_CALL(data_endpoint_, Write)
.Times(1)
.WillRepeatedly(
WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(absl::InternalError("data endpoint write failed."));
return false;
}));
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence_)
.WillOnce(Return(false));
InitialClientTransport();
ClientMetadataHandle first_stream_md;
ClientMetadataHandle second_stream_md;
auto first_stream_args =
CallArgs{std::move(first_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
auto second_stream_args =
CallArgs{std::move(second_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_second_.sender,
&pipe_client_to_server_messages_second_.receiver,
&pipe_server_to_client_messages_second_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
auto activity = MakeActivity(
Seq(
// Concurrently: write and read messages from client transport.
Join(
// Add first stream with call_args into client transport.
// Expect return trailers "grpc-status:unavailable".
AddStream(std::move(first_stream_args)),
// Send messages to first stream's
// call_args.client_to_server_messages pipe.
SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
// Once complete, verify successful sending and the received value.
[](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
GRPC_STATUS_UNAVAILABLE);
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
},
Join(
// Add second stream with call_args into client transport.
// Expect return trailers "grpc-status:unavailable".
AddStream(std::move(second_stream_args)),
// Send messages to second stream's
// call_args.client_to_server_messages pipe.
SendClientToServerMessages(pipe_client_to_server_messages_second_,
1)),
// Once complete, verify successful sending and the received value.
[](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
GRPC_STATUS_UNAVAILABLE);
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
}),
EventEngineWakeupScheduler(event_engine_),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) {
// Mock read failed at first stream, and second stream's write will fail too.
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence_)
.WillOnce(WithArgs<0>(
[](absl::AnyInvocable<void(absl::Status)> on_read) mutable {
on_read(absl::InternalError("control endpoint read failed."));
// Return false to mock EventEngine read not finish.
return false;
}));
InitialClientTransport();
ClientMetadataHandle first_stream_md;
ClientMetadataHandle second_stream_md;
auto first_stream_args =
CallArgs{std::move(first_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
auto second_stream_args =
CallArgs{std::move(second_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_second_.sender,
&pipe_client_to_server_messages_second_.receiver,
&pipe_server_to_client_messages_second_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
auto activity = MakeActivity(
Seq(
// Concurrently: write and read messages from client transport.
Join(
// Add first stream with call_args into client transport.
AddStream(std::move(first_stream_args)),
// Send messages to first stream's
// call_args.client_to_server_messages pipe, which will be
// eventually sent to control/data endpoints.
SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
// Once complete, verify successful sending and the received value.
[](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
GRPC_STATUS_UNAVAILABLE);
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
},
Join(
// Add second stream with call_args into client transport.
AddStream(std::move(second_stream_args)),
// Send messages to second stream's
// call_args.client_to_server_messages pipe, which will be
// eventually sent to control/data endpoints.
SendClientToServerMessages(pipe_client_to_server_messages_second_,
1)),
// Once complete, verify successful sending and the received value.
[](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
GRPC_STATUS_UNAVAILABLE);
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
}),
EventEngineWakeupScheduler(event_engine_),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
} // namespace testing
} // namespace chaotic_good
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
// Must call to create default EventEngine.
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
}

@ -2205,6 +2205,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "client_transport_error_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save