Rewrite the server transport with gRPC Call 3.0 design.

pull/34728/head
Nana Pang 1 year ago
parent a26b767c60
commit 51aa3d4951
  1. 1
      build_autogenerated.yaml
  2. 4
      src/core/BUILD
  3. 43
      src/core/ext/transport/chaotic_good/server_transport.cc
  4. 169
      src/core/ext/transport/chaotic_good/server_transport.h
  5. 6
      src/core/lib/promise/inter_activity_pipe.h
  6. 1
      test/core/transport/chaotic_good/BUILD
  7. 230
      test/core/transport/chaotic_good/server_transport_test.cc

@ -14674,7 +14674,6 @@ targets:
- src/core/ext/transport/chaotic_good/server_transport.h
- src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/event_engine_wakeup_scheduler.h
- src/core/lib/promise/join.h
- src/core/lib/promise/mpsc.h
- src/core/lib/promise/try_join.h
- src/core/lib/promise/wait_set.h

@ -6275,7 +6275,6 @@ grpc_cc_library(
"ext/transport/chaotic_good/server_transport.h",
],
external_deps = [
"absl/base:core_headers",
"absl/functional:any_invocable",
"absl/random",
"absl/random:bit_gen_ref",
@ -6287,18 +6286,17 @@ grpc_cc_library(
deps = [
"activity",
"arena",
"arena_promise",
"chaotic_good_frame",
"chaotic_good_frame_header",
"context",
"event_engine_wakeup_scheduler",
"grpc_promise_endpoint",
"join",
"loop",
"memory_quota",
"mpsc",
"pipe",
"resource_quota",
"seq",
"slice",
"slice_buffer",
"try_join",

@ -37,7 +37,6 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/try_join.h"
#include "src/core/lib/promise/try_seq.h"
@ -52,13 +51,12 @@ namespace grpc_core {
namespace chaotic_good {
ServerTransport::ServerTransport(
absl::AnyInvocable<ArenaPromise<ServerMetadataHandle>(CallArgs)>
start_receive_callback,
std::unique_ptr<PromiseEndpoint> control_endpoint,
std::unique_ptr<PromiseEndpoint> data_endpoint,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)
: outgoing_frames_(MpscReceiver<ServerFrame>(4)),
start_receive_callback_(std::move(start_receive_callback)),
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine,
AcceptFn accept_fn)
: accept_fn_(std::move(accept_fn)),
outgoing_frames_(MpscReceiver<ServerFrame>(4)),
control_endpoint_(std::move(control_endpoint)),
data_endpoint_(std::move(data_endpoint)),
control_endpoint_write_buffer_(SliceBuffer()),
@ -121,9 +119,7 @@ ServerTransport::ServerTransport(
// Continuously write next outgoing frames to promise endpoints.
std::move(write_loop), EventEngineWakeupScheduler(event_engine_),
[this](absl::Status status) {
GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
status.code() == absl::StatusCode::kInternal);
if (status.code() == absl::StatusCode::kInternal) {
if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) {
this->AbortWithError();
}
},
@ -167,26 +163,13 @@ ServerTransport::ServerTransport(
GPR_ASSERT(status.ok());
auto message = arena_->MakePooled<Message>(
std::move(data_endpoint_read_buffer_), 0);
// Construct call args for stream.
auto call_data = ConstructCallData(frame.frame_header.stream_id);
auto call_args =
CallArgs{std::move(frame.headers),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&call_data->pipe_server_intial_metadata_.sender,
&call_data->pipe_client_to_server_messages_.receiver,
&call_data->pipe_server_to_client_messages_.sender};
return Join(
// Push message into pipe_client_to_server_messages_.
call_data->pipe_client_to_server_messages_.sender.Push(
std::move(message)),
// Execute start_receive_callback.
start_receive_callback_(std::move(call_args)));
// Initialize call.
auto call_initiator = accept_fn_(*frame.headers);
AddCall(call_initiator);
return call_initiator.PushClientToServerMessage(std::move(message));
},
[](std::tuple<bool, ServerMetadataHandle> ret)
-> LoopCtl<absl::Status> {
// TODO(ladynana): figure out what to do with ServerMetadataHandle.
if (std::get<0>(ret)) {
[](bool ret) -> LoopCtl<absl::Status> {
if (ret) {
return Continue();
} else {
return absl::InternalError("Send message to pipe failed.");
@ -197,9 +180,7 @@ ServerTransport::ServerTransport(
// Continuously read next incoming frames from promise endpoints.
std::move(read_loop), EventEngineWakeupScheduler(event_engine_),
[this](absl::Status status) {
GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
status.code() == absl::StatusCode::kInternal);
if (status.code() == absl::StatusCode::kInternal) {
if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) {
this->AbortWithError();
}
},

@ -21,12 +21,13 @@
#include <cstddef>
#include <initializer_list> // IWYU pragma: keep
#include <map>
#include <memory>
#include <utility>
#include <variant>
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
@ -35,30 +36,79 @@
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/mpsc.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/promise_endpoint.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
namespace chaotic_good {
// Prototype based on gRPC Call 3.0.
// TODO(ladynana): convert to the true Call/CallInitiator once available.
struct CallData {
uint32_t stream_id;
Pipe<MessageHandle> pipe_client_to_server_messages_;
Pipe<MessageHandle> pipe_server_to_client_messages_;
Pipe<ServerMetadataHandle> pipe_server_intial_metadata_;
};
class CallInitiator {
public:
explicit CallInitiator(std::unique_ptr<CallData> call_data)
: call_(std::move(call_data)) {}
// Returns a promise that push/pull message/metadata from corresponding pipe.
auto PushServerInitialMetadata(ServerMetadataHandle metadata) {
return call_->pipe_server_intial_metadata_.sender.Push(std::move(metadata));
}
auto PushServerToClientMessage(MessageHandle message) {
return call_->pipe_server_to_client_messages_.sender.Push(
std::move(message));
}
auto PushClientToServerMessage(MessageHandle message) {
return call_->pipe_client_to_server_messages_.sender.Push(
std::move(message));
}
auto PullServerInitialMetadata() {
return call_->pipe_server_intial_metadata_.receiver.Next();
}
auto PullServerToClientMessage() {
return call_->pipe_server_to_client_messages_.receiver.Next();
}
auto PullClientToServerMessage() {
return call_->pipe_client_to_server_messages_.receiver.Next();
}
uint32_t GetStreamId() { return call_->stream_id; }
void SetCall(std::unique_ptr<CallData> call_data) {
call_ = std::move(call_data);
}
template <typename Promise>
void Spawn(Promise p) {
// TODO(ladynana): call/implement Spawn method such as party->Spawn.
// party_->Spawn("Run promise", std::move(p), [](){return
// absl::OkStatus();});
}
private:
std::unique_ptr<CallData> call_;
};
class ServerTransport {
public:
ServerTransport(
absl::AnyInvocable<ArenaPromise<ServerMetadataHandle>(CallArgs)>
start_receive_callback,
std::unique_ptr<PromiseEndpoint> control_endpoint,
std::unique_ptr<PromiseEndpoint> data_endpoint,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine);
using AcceptFn = absl::AnyInvocable<CallInitiator(ClientMetadata&) const>;
ServerTransport(std::unique_ptr<PromiseEndpoint> control_endpoint,
std::unique_ptr<PromiseEndpoint> data_endpoint,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine,
AcceptFn accept_fn);
~ServerTransport() {
if (writer_ != nullptr) {
writer_.reset();
@ -73,66 +123,59 @@ class ServerTransport {
if (!outgoing_frames_.IsClosed()) {
outgoing_frames_.MarkClosed();
}
std::map<uint32_t, std::shared_ptr<CallData>> stream_map;
{
MutexLock lock(&mu_);
stream_map = stream_map_;
}
for (const auto& pair : stream_map) {
auto call_data = pair.second;
call_data->pipe_client_to_server_messages_.receiver.CloseWithError();
call_data->pipe_server_intial_metadata_.sender.CloseWithError();
call_data->pipe_server_to_client_messages_.sender.CloseWithError();
}
// MutexLock lock(&mu_);
// for (const auto& pair : stream_map_) {
// if (!pair.second->IsClose()) {
// pair.second->MarkClose();
// }
// }
}
// Prototype of what start_receive_callback will do.
// ArenaPromise<ServerMetadataHandle> start_receive_callback (CallArgs
// callargs){
// return TrySeq(
// ProcessClientInitialMetadata(callargs.client_initial_metadata),
// ForEach(callargs.client_to_server_messages,
// [](MessageHandle message){
// ProcessClientMessage();
// }),
// // Send server initial metadata to client.
// callargs.server_initial_metadata->Push(md),
// // Send server message to client.
// callargs.server_to_client_messages->Push(md)
// );
// }
private:
struct CallData {
Pipe<MessageHandle> pipe_client_to_server_messages_;
Pipe<MessageHandle> pipe_server_to_client_messages_;
Pipe<ServerMetadataHandle> pipe_server_intial_metadata_;
};
// Construct call data of each stream
CallData* ConstructCallData(uint32_t stream_id) {
MutexLock lock(&mu_);
auto iter = stream_map_.find(stream_id);
if (iter != stream_map_.end()) {
return stream_map_[stream_id].get();
} else {
CallData call_data{Pipe<MessageHandle>(arena_.get()),
Pipe<MessageHandle>(arena_.get()),
Pipe<ServerMetadataHandle>(arena_.get())};
stream_map_[stream_id] = std::make_shared<CallData>(std::move(call_data));
return stream_map_[stream_id].get();
}
void AddCall(CallInitiator& r) {
// Add server write promise.
auto server_write = Loop([&r, this] {
return TrySeq(
// TODO(ladynana): add initial metadata in server frame.
r.PullServerToClientMessage(),
[this, stream_id = r.GetStreamId()](
NextResult<MessageHandle> result) mutable {
auto outgoing_frames = outgoing_frames_.MakeSender();
ServerFragmentFrame frame;
uint32_t message_length = result.value()->payload()->Length();
uint32_t message_padding = message_length % aligned_bytes;
frame.frame_header = FrameHeader{
FrameType::kFragment, {}, stream_id, 0, message_length,
message_padding, 0};
frame.message = std::move(*result);
return outgoing_frames.Send(ServerFrame(std::move(frame)));
},
[](bool success) -> LoopCtl<absl::Status> {
if (!success) {
// TODO(ladynana): propagate the actual error message
// from EventEngine.
return absl::UnavailableError(
"Transport closed due to endpoint write/read "
"failed.");
}
return Continue();
});
});
r.Spawn(std::move(server_write));
}
AcceptFn accept_fn_;
// Max buffer is set to 4, so that for stream writes each time it will queue
// at most 2 frames.
MpscReceiver<ServerFrame> outgoing_frames_;
static const size_t client_frame_queue_size_ = 2;
Mutex mu_;
uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1;
// Map of stream outgoing client frames, key is stream_id.
std::map<uint32_t, std::shared_ptr<CallData>> stream_map_
ABSL_GUARDED_BY(mu_);
absl::AnyInvocable<ArenaPromise<ServerMetadataHandle>(CallArgs)>
start_receive_callback_;
// Assigned aligned bytes from setting frame.
size_t aligned_bytes = 64;
// Mutex mu_;
// // Map of outgoing client frames, key is stream_id.
// std::map<uint32_t, std::shared_ptr<InterActivityPipe<
// ClientFrame, client_frame_queue_size_>::Receiver>>
// stream_map_
// ABSL_GUARDED_BY(mu_);
ActivityPtr writer_;
ActivityPtr reader_;
std::unique_ptr<PromiseEndpoint> control_endpoint_;

@ -146,6 +146,12 @@ class InterActivityPipe {
return [center = center_]() { return center->Next(); };
}
bool IsClose() { return center_->IsClosed(); }
void MarkClose() {
if (center_ != nullptr) center_->MarkClosed();
}
private:
RefCountedPtr<Center> center_;
};

@ -183,7 +183,6 @@ grpc_cc_test(
"//src/core:memory_quota",
"//src/core:pipe",
"//src/core:resource_quota",
"//src/core:seq",
"//src/core:slice",
"//src/core:slice_buffer",
"//test/core/event_engine/fuzzing_event_engine",

@ -41,11 +41,9 @@
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h" // IWYU pragma: keep
#include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep
@ -108,54 +106,103 @@ class ServerTransportTest : public ::testing::Test {
return options;
}(),
fuzzing_event_engine::Actions())),
arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)) {}
// Add expectations of control/data endpoints write/read operations.
void AddExpectations(int num_of_streams, int successful_write_messages,
int successful_read_messages, bool expect_write_failed,
bool expect_read_failed) {
for (int i = 1; i <= num_of_streams; i++) {
AddReadExpectations(/*stream_id*/ i, successful_read_messages,
expect_read_failed);
}
if (expect_read_failed) {
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_read_sequence)
.WillOnce(
WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_read) {
// Mock EventEngine enpoint read fails.
on_read(absl::InternalError("control endpoint read failed."));
return false;
}));
} else {
// reader_ is pending for next read.
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_read_sequence);
}
}
arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)),
pipe_client_to_server_messages_(arena_.get()),
pipe_server_to_client_messages_(arena_.get()),
pipe_server_intial_metadata_(arena_.get()) {}
void InitialServerTransport() {
// Read expectaions need to be added before transport initialization since
// reader_ activity loop is started in ServerTransport initialization,
auto accept_fn = [this](ClientMetadata& md) {
CallData call_data{1, Pipe<MessageHandle>(arena_.get()),
Pipe<MessageHandle>(arena_.get()),
Pipe<ServerMetadataHandle>(arena_.get())};
CallInitiator call_initiator(
std::make_unique<CallData>(std::move(call_data)));
return call_initiator;
};
server_transport_ = std::make_unique<ServerTransport>(
std::move(start_receive_callback_),
std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(control_endpoint_ptr_),
SliceBuffer()),
std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(data_endpoint_ptr_), SliceBuffer()),
std::static_pointer_cast<grpc_event_engine::experimental::EventEngine>(
event_engine_));
event_engine_),
std::move(accept_fn));
}
// Create client to server test messages.
std::vector<MessageHandle> CreateMessages(int num_of_messages) {
std::vector<MessageHandle> messages;
for (int i = 0; i < num_of_messages; i++) {
SliceBuffer buffer;
buffer.Append(
Slice::FromCopiedString(absl::StrFormat("test message %d", i)));
auto message = arena_->MakePooled<Message>(std::move(buffer), 0);
messages.push_back(std::move(message));
}
return messages;
void AddReadExpectation() {
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence)
.WillOnce(WithArgs<0, 1>(
[](absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer* buffer) mutable {
// Construct test frame for EventEngine read: headers (26
// bytes), message(8 bytes), message padding (56 byte),
// trailers (0 bytes).
const std::string frame_header = {
static_cast<char>(0x80), // frame type = fragment
0x01, // flag = has header
0x00,
0x00,
0x01, // stream id = 1
0x00,
0x00,
0x00,
0x1a, // header length = 26
0x00,
0x00,
0x00,
0x08, // message length = 8
0x00,
0x00,
0x00,
0x38, // message padding =56
0x00,
0x00,
0x00,
0x00, // trailer length = 0
0x00,
0x00,
0x00};
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(frame_header));
buffer->Append(std::move(slice));
return true;
}));
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence)
.WillOnce(WithArgs<1>(
[](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Encoded string of header ":path: /demo.Service/Step".
const std::string header = {
0x10, 0x05, 0x3a, 0x70, 0x61, 0x74, 0x68, 0x12, 0x2f,
0x64, 0x65, 0x6d, 0x6f, 0x2e, 0x53, 0x65, 0x72, 0x76,
0x69, 0x63, 0x65, 0x2f, 0x53, 0x74, 0x65, 0x70};
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(header));
buffer->Append(std::move(slice));
return true;
}));
// EXPECT_CALL(control_endpoint_, Read)
// .InSequence(control_endpoint_sequence)
// .WillOnce(Return(false));
EXPECT_CALL(data_endpoint_, Read)
.InSequence(data_endpoint_sequence)
.WillOnce(WithArgs<1>(
[this](grpc_event_engine::experimental::SliceBuffer* buffer) {
const std::string message_padding = {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(message_padding + message_));
buffer->Append(std::move(slice));
return true;
}));
}
private:
@ -163,91 +210,8 @@ class ServerTransportTest : public ::testing::Test {
MockEndpoint* data_endpoint_ptr_;
size_t initial_arena_size = 1024;
MemoryAllocator memory_allocator_;
Sequence control_endpoint_write_sequence;
Sequence control_endpoint_read_sequence;
Sequence data_endpoint_write_sequence;
Sequence data_endpoint_read_sequence;
void AddReadExpectations(int stream_id, int successful_read_messages,
bool failed_at_last) {
if (successful_read_messages > 0) {
// Transport starts read.
for (int i = 1; i <= successful_read_messages; i++) {
// Only last message will return trailer.
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_read_sequence)
.WillOnce(WithArgs<1>(
[stream_id](grpc_event_engine::experimental::SliceBuffer*
buffer) mutable {
// Construct test frame for EventEngine read: headers (15
// bytes), message(16 bytes), message padding (48 byte),
// trailers (15 bytes).
const std::string frame_header = {
static_cast<char>(0x80), // frame type = fragment
static_cast<char>(0x01), // flag = has header
0x00,
0x00,
static_cast<char>(stream_id), // stream id >= 1
0x00,
0x00,
0x00,
0x1a, // header length = 26
0x00,
0x00,
0x00,
0x08, // message length = 8
0x00,
0x00,
0x00,
0x38, // message padding =56
0x00,
0x00,
0x00,
static_cast<char>(0x00), // trailer length = 0
0x00,
0x00,
0x00};
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(frame_header));
buffer->Append(std::move(slice));
return true;
}));
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_read_sequence)
.WillOnce(WithArgs<1>(
[](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Encoded string of header ":path: /demo.Service/Step".
const std::string header = {
0x10, 0x05, 0x3a, 0x70, 0x61, 0x74, 0x68, 0x12, 0x2f,
0x64, 0x65, 0x6d, 0x6f, 0x2e, 0x53, 0x65, 0x72, 0x76,
0x69, 0x63, 0x65, 0x2f, 0x53, 0x74, 0x65, 0x70};
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(header));
buffer->Append(std::move(slice));
return true;
}));
EXPECT_CALL(data_endpoint_, Read)
.InSequence(data_endpoint_read_sequence)
.WillOnce(WithArgs<1>(
[this](grpc_event_engine::experimental::SliceBuffer* buffer) {
const std::string message_padding = {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(message_padding + message));
buffer->Append(std::move(slice));
return true;
}));
}
}
}
Sequence control_endpoint_sequence;
Sequence data_endpoint_sequence;
protected:
MockEndpoint& control_endpoint_;
@ -256,19 +220,15 @@ class ServerTransportTest : public ::testing::Test {
event_engine_;
std::unique_ptr<ServerTransport> server_transport_;
ScopedArenaPtr arena_;
absl::AnyInvocable<ArenaPromise<ServerMetadataHandle>(CallArgs)>
start_receive_callback_ = [](CallArgs call_args) {
return Seq(call_args.client_to_server_messages->Next(),
[] { return ServerMetadataFromStatus(absl::OkStatus()); });
};
Pipe<MessageHandle> pipe_client_to_server_messages_;
Pipe<MessageHandle> pipe_server_to_client_messages_;
Pipe<ServerMetadataHandle> pipe_server_intial_metadata_;
// Added to verify received message payload.
const std::string message = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
const std::string message_ = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
};
TEST_F(ServerTransportTest, ReadOneMessage) {
AddExpectations(/*num_of_streams*/ 1, /*successful_write_messages*/ 1,
/*successful_read_messages*/ 1, /*expect_write_failed*/ false,
/*expect_read_failed*/ false);
AddReadExpectation();
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
auto activity = MakeActivity(

Loading…
Cancel
Save