diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 8c5bc155ed6..54358bb3891 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -6321,6 +6321,7 @@ targets: - src/core/lib/promise/detail/seq_state.h - src/core/lib/promise/detail/status.h - src/core/lib/promise/exec_ctx_wakeup_scheduler.h + - src/core/lib/promise/for_each.h - src/core/lib/promise/if.h - src/core/lib/promise/latch.h - src/core/lib/promise/loop.h diff --git a/src/core/BUILD b/src/core/BUILD index 38b1050695e..a6b01f761c4 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -8162,6 +8162,7 @@ grpc_cc_library( "call_final_info", "call_state", "dump_args", + "for_each", "if", "latch", "map", diff --git a/src/core/lib/promise/for_each.h b/src/core/lib/promise/for_each.h index 3cb965aab7f..b3c266b578c 100644 --- a/src/core/lib/promise/for_each.h +++ b/src/core/lib/promise/for_each.h @@ -77,8 +77,8 @@ struct NextValueTraits> { return NextValueType::kEndOfStream; } - GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Value& MutableValue(T& t) { - return *t; + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Value&& TakeValue(T& t) { + return std::move(*t); } }; @@ -95,9 +95,9 @@ struct NextValueTraits>> { return NextValueType::kError; } - GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Value& MutableValue( + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Value&& TakeValue( ValueOrFailure>& t) { - return **t; + return std::move(**t); } }; @@ -179,7 +179,7 @@ class ForEach { << DebugTag() << " PollReaderNext: got value"; Destruct(&reader_next_); auto action = action_factory_.Make( - std::move(NextValueTraits::MutableValue(*p))); + NextValueTraits::TakeValue(*p)); Construct(&in_action_, std::move(action), std::move(*p)); reading_next_ = false; return PollAction(); diff --git a/src/core/lib/surface/call_utils.cc b/src/core/lib/surface/call_utils.cc index e5949d9d200..41e122b03db 100644 --- a/src/core/lib/surface/call_utils.cc +++ b/src/core/lib/surface/call_utils.cc @@ -166,47 +166,6 @@ std::string WaitForCqEndOp::StateString(const State& state) { [](const Invalid&) -> std::string { return "Invalid{}"; }); } -//////////////////////////////////////////////////////////////////////// -// MessageReceiver - -StatusFlag MessageReceiver::FinishRecvMessage( - ValueOrFailure> result) { - if (!result.ok()) { - GRPC_TRACE_LOG(call, INFO) << Activity::current()->DebugTag() - << "[call] RecvMessage: outstanding_recv " - "finishes: received end-of-stream with error"; - *recv_message_ = nullptr; - recv_message_ = nullptr; - return Failure{}; - } - if (!result->has_value()) { - GRPC_TRACE_LOG(call, INFO) << Activity::current()->DebugTag() - << "[call] RecvMessage: outstanding_recv " - "finishes: received end-of-stream"; - *recv_message_ = nullptr; - recv_message_ = nullptr; - return Success{}; - } - MessageHandle& message = **result; - test_only_last_message_flags_ = message->flags(); - if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) && - (incoming_compression_algorithm_ != GRPC_COMPRESS_NONE)) { - *recv_message_ = grpc_raw_compressed_byte_buffer_create( - nullptr, 0, incoming_compression_algorithm_); - } else { - *recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0); - } - grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(), - &(*recv_message_)->data.raw.slice_buffer); - GRPC_TRACE_LOG(call, INFO) - << Activity::current()->DebugTag() - << "[call] RecvMessage: outstanding_recv " - "finishes: received " - << (*recv_message_)->data.raw.slice_buffer.length << " byte message"; - recv_message_ = nullptr; - return Success{}; -} - //////////////////////////////////////////////////////////////////////// // MakeErrorString diff --git a/src/core/lib/surface/call_utils.h b/src/core/lib/surface/call_utils.h index 269e2f25178..16d2a174a3a 100644 --- a/src/core/lib/surface/call_utils.h +++ b/src/core/lib/surface/call_utils.h @@ -422,15 +422,51 @@ class MessageReceiver { recv_message_ = op.data.recv_message.recv_message; return [this, puller]() mutable { return Map(puller->PullMessage(), - [this](ValueOrFailure> msg) { + [this](typename Puller::NextMessage msg) { return FinishRecvMessage(std::move(msg)); }); }; } private: - StatusFlag FinishRecvMessage( - ValueOrFailure> result); + template + StatusFlag FinishRecvMessage(NextMessage result) { + if (!result.ok()) { + GRPC_TRACE_LOG(call, INFO) + << Activity::current()->DebugTag() + << "[call] RecvMessage: outstanding_recv " + "finishes: received end-of-stream with error"; + *recv_message_ = nullptr; + recv_message_ = nullptr; + return Failure{}; + } + if (!result.has_value()) { + GRPC_TRACE_LOG(call, INFO) << Activity::current()->DebugTag() + << "[call] RecvMessage: outstanding_recv " + "finishes: received end-of-stream"; + *recv_message_ = nullptr; + recv_message_ = nullptr; + return Success{}; + } + MessageHandle message = result.TakeValue(); + test_only_last_message_flags_ = message->flags(); + if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) && + (incoming_compression_algorithm_ != GRPC_COMPRESS_NONE)) { + *recv_message_ = grpc_raw_compressed_byte_buffer_create( + nullptr, 0, incoming_compression_algorithm_); + } else { + *recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0); + } + grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(), + &(*recv_message_)->data.raw.slice_buffer); + GRPC_TRACE_LOG(call, INFO) + << Activity::current()->DebugTag() + << "[call] RecvMessage: outstanding_recv " + "finishes: received " + << (*recv_message_)->data.raw.slice_buffer.length << " byte message"; + recv_message_ = nullptr; + return Success{}; + } grpc_byte_buffer** recv_message_ = nullptr; uint32_t test_only_last_message_flags_ = 0; diff --git a/src/core/lib/transport/call_filters.h b/src/core/lib/transport/call_filters.h index bbdfa4a2fed..27c2a75d60b 100644 --- a/src/core/lib/transport/call_filters.h +++ b/src/core/lib/transport/call_filters.h @@ -24,6 +24,7 @@ #include #include "absl/log/check.h" +#include "src/core/lib/promise/for_each.h" #include "src/core/lib/promise/if.h" #include "src/core/lib/promise/latch.h" #include "src/core/lib/promise/map.h" @@ -121,6 +122,104 @@ struct NoInterceptor {}; namespace filters_detail { +// Flow control across pipe stages. +// This ends up being exceedingly subtle - essentially we need to ensure that +// across a series of pipes we have no more than one outstanding message at a +// time - but those pipes are for the most part independent. +// How we achieve this is that this NextMessage object holds both the message +// and a completion token - the last owning NextMessage instance will call +// the on_progress method on the referenced CallState - and at that point that +// CallState will allow the next message to be sent through it. +// Next, the ForEach promise combiner explicitly holds onto the wrapper object +// owning the result (this object) and extracts the message from it, but doesn't +// dispose that instance until the action promise for the ForEach iteration +// completes, ensuring most callers need do nothing special to have the +// flow control work correctly. +template +class NextMessage { + public: + ~NextMessage() { + if (message_ != end_of_stream() && message_ != error() && + message_ != taken()) { + delete message_; + } + if (call_state_ != nullptr) { + (call_state_->*on_progress)(); + } + } + + NextMessage() = default; + explicit NextMessage(Failure) : message_(error()), call_state_(nullptr) {} + NextMessage(MessageHandle message, CallState* call_state) { + DCHECK_NE(call_state, nullptr); + DCHECK_NE(message.get(), nullptr); + DCHECK(message.get_deleter().has_freelist()); + message_ = message.release(); + call_state_ = call_state; + } + NextMessage(const NextMessage& other) = delete; + NextMessage& operator=(const NextMessage& other) = delete; + NextMessage(NextMessage&& other) noexcept + : message_(std::exchange(other.message_, taken())), + call_state_(std::exchange(other.call_state_, nullptr)) {} + NextMessage& operator=(NextMessage&& other) noexcept { + if (message_ != end_of_stream() && message_ != error() && + message_ != taken()) { + delete message_; + } + if (call_state_ != nullptr) { + (call_state_->*on_progress)(); + } + message_ = std::exchange(other.message_, taken()); + call_state_ = std::exchange(other.call_state_, nullptr); + return *this; + } + + bool ok() const { + DCHECK_NE(message_, taken()); + return message_ != error(); + } + bool has_value() const { + DCHECK_NE(message_, taken()); + DCHECK(ok()); + return message_ != end_of_stream(); + } + StatusFlag status() const { return StatusFlag(ok()); } + Message& value() { + DCHECK_NE(message_, taken()); + DCHECK(ok()); + DCHECK(has_value()); + return *message_; + } + MessageHandle TakeValue() { + DCHECK_NE(message_, taken()); + DCHECK(ok()); + DCHECK(has_value()); + return MessageHandle(std::exchange(message_, taken()), + Arena::PooledDeleter()); + } + bool progressed() const { return call_state_ == nullptr; } + void Progress() { + DCHECK(!progressed()); + (call_state_->*on_progress)(); + call_state_ = nullptr; + } + + private: + static Message* end_of_stream() { return nullptr; } + static Message* error() { return reinterpret_cast(1); } + static Message* taken() { return reinterpret_cast(2); } + Message* message_ = end_of_stream(); + CallState* call_state_ = nullptr; +}; + +template +struct ArgumentMustBeNextMessage; +template +struct ArgumentMustBeNextMessage> { + static constexpr bool value() { return true; } +}; + inline void* Offset(void* base, size_t amt) { return static_cast(base) + amt; } @@ -1301,6 +1400,80 @@ const NoInterceptor ClientInitialMetadataInterceptor::Call::OnFinalize; } // namespace filters_detail +namespace for_each_detail { +template +struct NextValueTraits> { + using NextMsg = filters_detail::NextMessage; + using Value = MessageHandle; + + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static NextValueType Type( + const NextMsg& t) { + if (!t.ok()) return NextValueType::kError; + if (t.has_value()) return NextValueType::kValue; + return NextValueType::kEndOfStream; + } + + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static MessageHandle TakeValue( + NextMsg& t) { + return t.TakeValue(); + } +}; +} // namespace for_each_detail + +template +struct FailureStatusCastImpl, + StatusFlag> { + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static filters_detail::NextMessage< + on_progress> + Cast(StatusFlag flag) { + DCHECK_EQ(flag, Failure{}); + return filters_detail::NextMessage(Failure{}); + } +}; + +namespace promise_detail { +template +struct TrySeqTraitsWithSfinae> { + using UnwrappedType = MessageHandle; + using WrappedType = filters_detail::NextMessage; + template + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static auto CallFactory( + Next* next, WrappedType&& value) { + return next->Make(value.TakeValue()); + } + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static bool IsOk( + const WrappedType& value) { + return value.ok(); + } + static const char* ErrorString(const WrappedType& status) { + DCHECK(!status.ok()); + return "failed"; + } + template + static R ReturnValue(WrappedType&& status) { + DCHECK(!status.ok()); + return WrappedType(Failure{}); + } + template + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static auto CallSeqFactory( + F& f, Elem&& elem, WrappedType value) + -> decltype(f(std::forward(elem), std::declval())) { + return f(std::forward(elem), value.TakeValue()); + } + template + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Poll + CheckResultAndRunNext(WrappedType prior, RunNext run_next) { + if (!prior.ok()) return WrappedType(prior.status()); + return run_next(std::move(prior)); + } +}; +} // namespace promise_detail + +using ServerToClientNextMessage = + filters_detail::NextMessage<&CallState::FinishPullServerToClientMessage>; +using ClientToServerNextMessage = + filters_detail::NextMessage<&CallState::FinishPullClientToServerMessage>; + // Execution environment for a stack of filters. // This is a per-call object. class CallFilters { @@ -1415,10 +1588,10 @@ class CallFilters { Input(CallFilters::*input_location), filters_detail::Layout(filters_detail::StackData::*layout), void (CallState::*on_done)(), typename StackIterator> - class Executor { + class MetadataExecutor { public: - Executor(CallFilters* filters, StackIterator stack_begin, - StackIterator stack_end) + MetadataExecutor(CallFilters* filters, StackIterator stack_begin, + StackIterator stack_end) : stack_current_(stack_begin), stack_end_(stack_end), filters_(filters) { @@ -1466,17 +1639,72 @@ class CallFilters { filters_detail::OperationExecutor executor_; }; + template ( + filters_detail::StackData::*layout), + void (CallState::*on_done)(), typename StackIterator> + class MessageExecutor { + public: + using NextMsg = filters_detail::NextMessage; + + MessageExecutor(CallFilters* filters, StackIterator stack_begin, + StackIterator stack_end) + : stack_current_(stack_begin), + stack_end_(stack_end), + filters_(filters) { + DCHECK_NE((filters_->*input_location).get(), nullptr); + } + + Poll operator()() { + if ((filters_->*input_location) != nullptr) { + if (stack_current_ == stack_end_) { + DCHECK_NE((filters_->*input_location).get(), nullptr); + return NextMsg(std::move(filters_->*input_location), + &filters_->call_state_); + } + return FinishStep(executor_.Start( + &(stack_current_->stack->data_.*layout), + std::move(filters_->*input_location), filters_->call_data_)); + } else { + return FinishStep(executor_.Step(filters_->call_data_)); + } + } + + private: + Poll FinishStep(Poll> p) { + auto* r = p.value_if_ready(); + if (r == nullptr) return Pending{}; + if (r->ok != nullptr) { + ++stack_current_; + if (stack_current_ == stack_end_) { + return NextMsg{std::move(r->ok), &filters_->call_state_}; + } + return FinishStep( + executor_.Start(&(stack_current_->stack->data_.*layout), + std::move(r->ok), filters_->call_data_)); + } + (filters_->call_state_.*on_done)(); + filters_->PushServerTrailingMetadata(std::move(r->error)); + return Failure{}; + } + + StackIterator stack_current_; + StackIterator stack_end_; + CallFilters* filters_; + filters_detail::OperationExecutor executor_; + }; + public: // Client: Fetch client initial metadata // Returns a promise that resolves to ValueOrFailure GRPC_MUST_USE_RESULT auto PullClientInitialMetadata() { call_state_.BeginPullClientInitialMetadata(); - return Executor(this, stacks_.cbegin(), - stacks_.cend()); + return MetadataExecutor( + this, stacks_.cbegin(), stacks_.cend()); } // Server: Push server initial metadata // Returns a promise that resolves to a StatusFlag indicating success @@ -1496,7 +1724,7 @@ class CallFilters { has_server_initial_metadata, [this]() { return Map( - Executor< + MetadataExecutor< absl::optional, ServerMetadataHandle, &CallFilters::push_server_initial_metadata_, @@ -1526,7 +1754,7 @@ class CallFilters { // Client: Indicate that no more messages will be sent void FinishClientToServerSends() { call_state_.ClientToServerHalfClose(); } // Server: Fetch client to server message - // Returns a promise that resolves to ValueOrFailure + // Returns a promise that resolves to ClientToServerNextMessage GRPC_MUST_USE_RESULT auto PullClientToServerMessage() { return TrySeq( [this]() { @@ -1536,16 +1764,15 @@ class CallFilters { return If( message_available, [this]() { - return Executor< - absl::optional, MessageHandle, + return MessageExecutor< &CallFilters::push_client_to_server_message_, &filters_detail::StackData::client_to_server_messages, &CallState::FinishPullClientToServerMessage, StacksVector::const_iterator>(this, stacks_.cbegin(), stacks_.cend()); }, - []() -> ValueOrFailure> { - return absl::optional(); + []() -> ClientToServerNextMessage { + return ClientToServerNextMessage(); }); }); } @@ -1557,7 +1784,7 @@ class CallFilters { return [this]() { return call_state_.PollPushServerToClientMessage(); }; } // Server: Fetch server to client message - // Returns a promise that resolves to ValueOrFailure + // Returns a promise that resolves to ServerToClientNextMessage GRPC_MUST_USE_RESULT auto PullServerToClientMessage() { return TrySeq( [this]() { @@ -1567,16 +1794,15 @@ class CallFilters { return If( message_available, [this]() { - return Executor< - absl::optional, MessageHandle, + return MessageExecutor< &CallFilters::push_server_to_client_message_, &filters_detail::StackData::server_to_client_messages, &CallState::FinishPullServerToClientMessage, StacksVector::const_reverse_iterator>( this, stacks_.crbegin(), stacks_.crend()); }, - []() -> ValueOrFailure> { - return absl::optional(); + []() -> ServerToClientNextMessage { + return ServerToClientNextMessage(); }); }); } @@ -1654,6 +1880,20 @@ class CallFilters { static char g_empty_call_data_; }; +static_assert( + filters_detail::ArgumentMustBeNextMessage< + absl::remove_cvref_t() + ->PullServerToClientMessage()() + .value())>>::value(), + "PullServerToClientMessage must return a NextMessage"); + +static_assert( + filters_detail::ArgumentMustBeNextMessage< + absl::remove_cvref_t() + ->PullClientToServerMessage()() + .value())>>::value(), + "PullServerToClientMessage must return a NextMessage"); + } // namespace grpc_core #endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FILTERS_H diff --git a/src/core/lib/transport/call_spine.h b/src/core/lib/transport/call_spine.h index a962c308f69..4e8ff890cc0 100644 --- a/src/core/lib/transport/call_spine.h +++ b/src/core/lib/transport/call_spine.h @@ -205,6 +205,8 @@ class CallSpine final : public Party { class CallInitiator { public: + using NextMessage = ServerToClientNextMessage; + CallInitiator() = default; explicit CallInitiator(RefCountedPtr spine) : spine_(std::move(spine)) {} @@ -275,6 +277,8 @@ class CallInitiator { class CallHandler { public: + using NextMessage = ClientToServerNextMessage; + explicit CallHandler(RefCountedPtr spine) : spine_(std::move(spine)) {} diff --git a/src/core/server/server.cc b/src/core/server/server.cc index ae404ac1065..d984f8479da 100644 --- a/src/core/server/server.cc +++ b/src/core/server/server.cc @@ -831,14 +831,24 @@ auto Server::MatchAndPublishCall(CallHandler call_handler) { payload_handling = registered_method->payload_handling; rm = registered_method->matcher.get(); } + using FirstMessageResult = + ValueOrFailure>; auto maybe_read_first_message = If( payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, [call_handler]() mutable { - return call_handler.PullMessage(); + return Map( + call_handler.PullMessage(), + [](ClientToServerNextMessage next_msg) + -> FirstMessageResult { + if (!next_msg.ok()) return Failure{}; + if (!next_msg.has_value()) { + return FirstMessageResult(absl::nullopt); + } + return FirstMessageResult(next_msg.TakeValue()); + }); }, - []() -> ValueOrFailure> { - return ValueOrFailure>( - absl::nullopt); + []() -> FirstMessageResult { + return FirstMessageResult(absl::nullopt); }); return TryJoin( std::move(maybe_read_first_message), rm->MatchRequest(0), diff --git a/test/core/call/bm_client_call.cc b/test/core/call/bm_client_call.cc index b79ae1f9caf..d72e705e073 100644 --- a/test/core/call/bm_client_call.cc +++ b/test/core/call/bm_client_call.cc @@ -150,7 +150,7 @@ void BM_Unary(benchmark::State& state) { return status.status(); }), Map(handler.PullMessage(), - [](ValueOrFailure> message) { + [](ClientToServerNextMessage message) { return message.status(); }), handler.PushMessage(std::move(response))), diff --git a/test/core/transport/call_spine_benchmarks.h b/test/core/transport/call_spine_benchmarks.h index c62f1e21733..c4bd19f7a44 100644 --- a/test/core/transport/call_spine_benchmarks.h +++ b/test/core/transport/call_spine_benchmarks.h @@ -55,9 +55,7 @@ void BM_UnaryWithSpawnPerEnd(benchmark::State& state) { return md.status(); }), Map(handler.PullMessage(), - [](ValueOrFailure> msg) { - return msg.status(); - }), + [](ClientToServerNextMessage msg) { return msg.status(); }), handler.PushMessage(fixture.MakePayload())), [&handler_done, &fixture, handler](StatusFlag status) mutable { CHECK(status.ok()); @@ -67,29 +65,27 @@ void BM_UnaryWithSpawnPerEnd(benchmark::State& state) { return Empty{}; }); }); - call.initiator.SpawnInfallible( - "initiator", - [initiator = call.initiator, &fixture, &initiator_done]() mutable { - return Map( - AllOk( - Map(initiator.PushMessage(fixture.MakePayload()), - [](StatusFlag) { return Success{}; }), - Map(initiator.PullServerInitialMetadata(), - [](absl::optional md) { - return Success{}; - }), - Map(initiator.PullMessage(), - [](ValueOrFailure> msg) { - return msg.status(); - }), - Map(initiator.PullServerTrailingMetadata(), - [](ServerMetadataHandle) { return Success(); })), - [&initiator_done](StatusFlag result) { - CHECK(result.ok()); - initiator_done.Notify(); - return Empty{}; - }); - }); + call.initiator.SpawnInfallible("initiator", [initiator = call.initiator, + &fixture, + &initiator_done]() mutable { + return Map( + AllOk( + Map(initiator.PushMessage(fixture.MakePayload()), + [](StatusFlag) { return Success{}; }), + Map(initiator.PullServerInitialMetadata(), + [](absl::optional md) { + return Success{}; + }), + Map(initiator.PullMessage(), + [](ServerToClientNextMessage msg) { return msg.status(); }), + Map(initiator.PullServerTrailingMetadata(), + [](ServerMetadataHandle) { return Success(); })), + [&initiator_done](StatusFlag result) { + CHECK(result.ok()); + initiator_done.Notify(); + return Empty{}; + }); + }); } handler_done.WaitForNotification(); initiator_done.WaitForNotification(); @@ -127,7 +123,7 @@ void BM_UnaryWithSpawnPerOp(benchmark::State& state) { [](ValueOrFailure md) { CHECK(md.ok()); }); handler_spawner.Spawn( "HANDLER:PullMessage", [&]() { return call.handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { CHECK(msg.ok()); call.handler.SpawnInfallible( "HANDLER:PushServerTrailingMetadata", [&]() { @@ -155,9 +151,7 @@ void BM_UnaryWithSpawnPerOp(benchmark::State& state) { initiator_spawner.Spawn( "INITIATOR:PullMessage", [&]() { return call.initiator.PullMessage(); }, - [](ValueOrFailure> msg) { - CHECK(msg.ok()); - }); + [](ServerToClientNextMessage msg) { CHECK(msg.ok()); }); initiator_spawner.Spawn( "INITIATOR:PullServerTrailingMetadata", [&]() { return call.initiator.PullServerTrailingMetadata(); }, @@ -202,7 +196,7 @@ void BM_ClientToServerStreaming(benchmark::State& state) { Notification initiator_done; call.handler.SpawnInfallible("handler", [&]() { return Map(call.handler.PullMessage(), - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { CHECK(msg.ok()); handler_done.Notify(); return Empty{}; diff --git a/test/core/transport/call_spine_test.cc b/test/core/transport/call_spine_test.cc index cd1c8020d5f..bd52ccea21c 100644 --- a/test/core/transport/call_spine_test.cc +++ b/test/core/transport/call_spine_test.cc @@ -89,16 +89,16 @@ void CallSpineTest::UnaryRequest(CallInitiator initiator, CallHandler handler) { ContentTypeMetadata::kApplicationGrpc); return initiator.PullMessage(); }, - [initiator](ValueOrFailure> msg) mutable { + [initiator](ServerToClientNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor"); return initiator.PullMessage(); }, - [initiator](ValueOrFailure> msg) mutable { + [initiator](ServerToClientNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return initiator.PullServerTrailingMetadata(); }, [initiator](ValueOrFailure md) mutable { @@ -116,16 +116,15 @@ void CallSpineTest::UnaryRequest(CallInitiator initiator, CallHandler handler) { kTestPath); return handler.PullMessage(); }, - [handler](ValueOrFailure> msg) mutable { + [handler](ClientToServerNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world"); return handler.PullMessage(); }, - [handler](ValueOrFailure> msg) mutable { + [handler](ClientToServerNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); auto md = Arena::MakePooledForOverwrite(); md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); return handler.PushServerInitialMetadata(std::move(md)); diff --git a/test/core/transport/call_state_test.cc b/test/core/transport/call_state_test.cc index d8b14eab6aa..f39895cc8f2 100644 --- a/test/core/transport/call_state_test.cc +++ b/test/core/transport/call_state_test.cc @@ -333,6 +333,44 @@ TEST(CallStateTest, CanWaitForPullServerMessage) { EXPECT_THAT(state.PollPullServerToClientMessageStarted(), IsReady(Success{})); } +TEST(CallStateTest, ClientSendBlockedUntilPullCompletes) { + StrictMock activity; + activity.Activate(); + CallState state; + state.Start(); + state.PushServerInitialMetadata(); + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady()); + state.FinishPullServerInitialMetadata(); + state.BeginPullClientInitialMetadata(); + state.FinishPullClientInitialMetadata(); + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); + EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage()); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady()); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); +} + +TEST(CallStateTest, ServerSendBlockedUntilPullCompletes) { + StrictMock activity; + activity.Activate(); + CallState state; + state.Start(); + state.PushServerInitialMetadata(); + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady()); + state.FinishPullServerInitialMetadata(); + state.BeginPullClientInitialMetadata(); + state.FinishPullClientInitialMetadata(); + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); + EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage()); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady()); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); +} + } // namespace grpc_core int main(int argc, char** argv) { diff --git a/test/core/transport/chaotic_good/client_transport_test.cc b/test/core/transport/chaotic_good/client_transport_test.cc index 1739d1bbcec..3481230ee10 100644 --- a/test/core/transport/chaotic_good/client_transport_test.cc +++ b/test/core/transport/chaotic_good/client_transport_test.cc @@ -154,17 +154,16 @@ TEST_F(TransportTest, AddOneStream) { return Empty{}; }, [initiator]() mutable { return initiator.PullMessage(); }, - [](ValueOrFailure> msg) { + [](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "12345678"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "12345678"); return Empty{}; }, [initiator]() mutable { return initiator.PullMessage(); }, - [](ValueOrFailure> msg) { + [](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return Empty{}; }, [initiator]() mutable { @@ -245,25 +244,23 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) { return Empty{}; }, initiator.PullMessage(), - [](ValueOrFailure> msg) { + [](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "12345678"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "12345678"); return Empty{}; }, initiator.PullMessage(), - [](ValueOrFailure> msg) { + [](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "87654321"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "87654321"); return Empty{}; }, initiator.PullMessage(), - [](ValueOrFailure> msg) { + [](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return Empty{}; }, initiator.PullServerTrailingMetadata(), diff --git a/test/core/transport/chaotic_good/server_transport_test.cc b/test/core/transport/chaotic_good/server_transport_test.cc index 72e62a89682..7fa61d7f3a6 100644 --- a/test/core/transport/chaotic_good/server_transport_test.cc +++ b/test/core/transport/chaotic_good/server_transport_test.cc @@ -137,17 +137,16 @@ TEST_F(TransportTest, ReadAndWriteOneMessage) { return Empty{}; }, [handler]() mutable { return handler.PullMessage(); }, - [](ValueOrFailure> msg) { + [](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "12345678"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "12345678"); return Empty{}; }, [handler]() mutable { return handler.PullMessage(); }, - [](ValueOrFailure> msg) { + [](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return Empty{}; }, [handler]() mutable { diff --git a/test/core/transport/test_suite/call_content.cc b/test/core/transport/test_suite/call_content.cc index 84a5f9f1a13..82531bfa485 100644 --- a/test/core/transport/test_suite/call_content.cc +++ b/test/core/transport/test_suite/call_content.cc @@ -88,16 +88,15 @@ TRANSPORT_TEST(UnaryWithSomeContent) { UnorderedElementsAreArray(server_initial_metadata)); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - server_payload); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), server_payload); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return initiator.PullServerTrailingMetadata(); }, [&](ValueOrFailure md) { @@ -115,16 +114,15 @@ TRANSPORT_TEST(UnaryWithSomeContent) { UnorderedElementsAreArray(client_initial_metadata)); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - client_payload); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), client_payload); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); auto md = Arena::MakePooledForOverwrite(); FillMetadata(server_initial_metadata, *md); return handler.PushServerInitialMetadata(std::move(md)); diff --git a/test/core/transport/test_suite/call_shapes.cc b/test/core/transport/test_suite/call_shapes.cc index 343720fe2d3..7cfecf26af7 100644 --- a/test/core/transport/test_suite/call_shapes.cc +++ b/test/core/transport/test_suite/call_shapes.cc @@ -49,9 +49,9 @@ TRANSPORT_TEST(MetadataOnlyRequest) { "/foo/bar"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); auto md = Arena::MakePooledForOverwrite(); md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); return handler.PushServerInitialMetadata(std::move(md)); @@ -200,16 +200,16 @@ TRANSPORT_TEST(UnaryRequest) { ContentTypeMetadata::kApplicationGrpc); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return initiator.PullServerTrailingMetadata(); }, [&](ValueOrFailure md) { @@ -227,16 +227,15 @@ TRANSPORT_TEST(UnaryRequest) { "/foo/bar"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); auto md = Arena::MakePooledForOverwrite(); md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); return handler.PushServerInitialMetadata(std::move(md)); @@ -279,10 +278,10 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) { ContentTypeMetadata::kApplicationGrpc); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor"); return initiator.PullServerTrailingMetadata(); }, @@ -301,11 +300,10 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) { "/foo/bar"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world"); auto md = Arena::MakePooledForOverwrite(); md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); return handler.PushServerInitialMetadata(std::move(md)); @@ -346,16 +344,16 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) { initiator.FinishSends(); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return initiator.PullServerTrailingMetadata(); }, [&](ValueOrFailure md) { @@ -380,16 +378,15 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) { EXPECT_TRUE(result.ok()); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return handler.PushMessage(Arena::MakePooled( SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0)); }, @@ -444,9 +441,9 @@ TRANSPORT_TEST(ClientStreamingRequest) { initiator.FinishSends(); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return initiator.PullServerTrailingMetadata(); }, [&](ValueOrFailure md) { @@ -471,44 +468,39 @@ TRANSPORT_TEST(ClientStreamingRequest) { EXPECT_TRUE(result.ok()); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world (2)"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world (2)"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world (3)"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world (3)"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world (4)"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world (4)"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world (5)"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world (5)"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); auto md = Arena::MakePooledForOverwrite(); md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); handler.PushServerTrailingMetadata(std::move(md)); @@ -533,51 +525,51 @@ TRANSPORT_TEST(ServerStreamingRequest) { initiator.FinishSends(); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor (2)"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor (3)"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor (4)"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor (5)"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor (6)"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return initiator.PullServerTrailingMetadata(); }, [&](ValueOrFailure md) { @@ -602,9 +594,9 @@ TRANSPORT_TEST(ServerStreamingRequest) { EXPECT_TRUE(result.ok()); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return handler.PushMessage(Arena::MakePooled( SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0)); }, diff --git a/test/core/transport/test_suite/stress.cc b/test/core/transport/test_suite/stress.cc index 98a2997c1b5..548b06640cb 100644 --- a/test/core/transport/test_suite/stress.cc +++ b/test/core/transport/test_suite/stress.cc @@ -53,17 +53,17 @@ TRANSPORT_TEST(ManyUnaryRequests) { ContentTypeMetadata::kApplicationGrpc); return initiator.PullMessage(); }, - [initiator, i, &server_messages]( - ValueOrFailure> msg) mutable { + [initiator, i, + &server_messages](ServerToClientNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), server_messages[i]); return initiator.PullMessage(); }, - [initiator](ValueOrFailure> msg) mutable { + [initiator](ServerToClientNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return initiator.PullServerTrailingMetadata(); }, [initiator](ValueOrFailure md) mutable { @@ -87,17 +87,17 @@ TRANSPORT_TEST(ManyUnaryRequests) { &*this_call_index)); return handler.PullMessage(); }, - [handler, this_call_index, &client_messages]( - ValueOrFailure> msg) mutable { + [handler, this_call_index, + &client_messages](ClientToServerNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), client_messages[*this_call_index]); return handler.PullMessage(); }, - [handler](ValueOrFailure> msg) mutable { + [handler](ClientToServerNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); auto md = Arena::MakePooledForOverwrite(); md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); return handler.PushServerInitialMetadata(std::move(md));