From 9945066c88e41859920f6154aa57fb6328570f6f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 16 Oct 2024 14:55:32 -0700 Subject: [PATCH] [call-v3] Send flow control (#37868) This is missing in v3 vs v2 - in v2 we had Pipe setup so that multiple Pipe stages could be chained and only complete when the last stage had passed flow control, whereas in v3 the top stage will start accepting requests as soon as the first stage in the pipeline takes the message. Closes #37868 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37868 from ctiller:drizzling 69209da8a70edd252c7412c34d8fa280867a3aeb PiperOrigin-RevId: 686652402 --- build_autogenerated.yaml | 1 + src/core/BUILD | 1 + src/core/lib/promise/for_each.h | 10 +- src/core/lib/surface/call_utils.cc | 41 --- src/core/lib/surface/call_utils.h | 42 ++- src/core/lib/transport/call_filters.h | 280 ++++++++++++++++-- src/core/lib/transport/call_spine.h | 4 + src/core/server/server.cc | 18 +- test/core/call/bm_client_call.cc | 2 +- test/core/transport/call_spine_benchmarks.h | 56 ++-- test/core/transport/call_spine_test.cc | 21 +- test/core/transport/call_state_test.cc | 38 +++ .../chaotic_good/client_transport_test.cc | 29 +- .../chaotic_good/server_transport_test.cc | 11 +- .../core/transport/test_suite/call_content.cc | 22 +- test/core/transport/test_suite/call_shapes.cc | 146 +++++---- test/core/transport/test_suite/stress.cc | 24 +- 17 files changed, 507 insertions(+), 239 deletions(-) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 8c5bc155ed6..54358bb3891 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -6321,6 +6321,7 @@ targets: - src/core/lib/promise/detail/seq_state.h - src/core/lib/promise/detail/status.h - src/core/lib/promise/exec_ctx_wakeup_scheduler.h + - src/core/lib/promise/for_each.h - src/core/lib/promise/if.h - src/core/lib/promise/latch.h - src/core/lib/promise/loop.h diff --git a/src/core/BUILD b/src/core/BUILD index 38b1050695e..a6b01f761c4 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -8162,6 +8162,7 @@ grpc_cc_library( "call_final_info", "call_state", "dump_args", + "for_each", "if", "latch", "map", diff --git a/src/core/lib/promise/for_each.h b/src/core/lib/promise/for_each.h index 3cb965aab7f..b3c266b578c 100644 --- a/src/core/lib/promise/for_each.h +++ b/src/core/lib/promise/for_each.h @@ -77,8 +77,8 @@ struct NextValueTraits> { return NextValueType::kEndOfStream; } - GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Value& MutableValue(T& t) { - return *t; + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Value&& TakeValue(T& t) { + return std::move(*t); } }; @@ -95,9 +95,9 @@ struct NextValueTraits>> { return NextValueType::kError; } - GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Value& MutableValue( + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Value&& TakeValue( ValueOrFailure>& t) { - return **t; + return std::move(**t); } }; @@ -179,7 +179,7 @@ class ForEach { << DebugTag() << " PollReaderNext: got value"; Destruct(&reader_next_); auto action = action_factory_.Make( - std::move(NextValueTraits::MutableValue(*p))); + NextValueTraits::TakeValue(*p)); Construct(&in_action_, std::move(action), std::move(*p)); reading_next_ = false; return PollAction(); diff --git a/src/core/lib/surface/call_utils.cc b/src/core/lib/surface/call_utils.cc index e5949d9d200..41e122b03db 100644 --- a/src/core/lib/surface/call_utils.cc +++ b/src/core/lib/surface/call_utils.cc @@ -166,47 +166,6 @@ std::string WaitForCqEndOp::StateString(const State& state) { [](const Invalid&) -> std::string { return "Invalid{}"; }); } -//////////////////////////////////////////////////////////////////////// -// MessageReceiver - -StatusFlag MessageReceiver::FinishRecvMessage( - ValueOrFailure> result) { - if (!result.ok()) { - GRPC_TRACE_LOG(call, INFO) << Activity::current()->DebugTag() - << "[call] RecvMessage: outstanding_recv " - "finishes: received end-of-stream with error"; - *recv_message_ = nullptr; - recv_message_ = nullptr; - return Failure{}; - } - if (!result->has_value()) { - GRPC_TRACE_LOG(call, INFO) << Activity::current()->DebugTag() - << "[call] RecvMessage: outstanding_recv " - "finishes: received end-of-stream"; - *recv_message_ = nullptr; - recv_message_ = nullptr; - return Success{}; - } - MessageHandle& message = **result; - test_only_last_message_flags_ = message->flags(); - if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) && - (incoming_compression_algorithm_ != GRPC_COMPRESS_NONE)) { - *recv_message_ = grpc_raw_compressed_byte_buffer_create( - nullptr, 0, incoming_compression_algorithm_); - } else { - *recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0); - } - grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(), - &(*recv_message_)->data.raw.slice_buffer); - GRPC_TRACE_LOG(call, INFO) - << Activity::current()->DebugTag() - << "[call] RecvMessage: outstanding_recv " - "finishes: received " - << (*recv_message_)->data.raw.slice_buffer.length << " byte message"; - recv_message_ = nullptr; - return Success{}; -} - //////////////////////////////////////////////////////////////////////// // MakeErrorString diff --git a/src/core/lib/surface/call_utils.h b/src/core/lib/surface/call_utils.h index 269e2f25178..16d2a174a3a 100644 --- a/src/core/lib/surface/call_utils.h +++ b/src/core/lib/surface/call_utils.h @@ -422,15 +422,51 @@ class MessageReceiver { recv_message_ = op.data.recv_message.recv_message; return [this, puller]() mutable { return Map(puller->PullMessage(), - [this](ValueOrFailure> msg) { + [this](typename Puller::NextMessage msg) { return FinishRecvMessage(std::move(msg)); }); }; } private: - StatusFlag FinishRecvMessage( - ValueOrFailure> result); + template + StatusFlag FinishRecvMessage(NextMessage result) { + if (!result.ok()) { + GRPC_TRACE_LOG(call, INFO) + << Activity::current()->DebugTag() + << "[call] RecvMessage: outstanding_recv " + "finishes: received end-of-stream with error"; + *recv_message_ = nullptr; + recv_message_ = nullptr; + return Failure{}; + } + if (!result.has_value()) { + GRPC_TRACE_LOG(call, INFO) << Activity::current()->DebugTag() + << "[call] RecvMessage: outstanding_recv " + "finishes: received end-of-stream"; + *recv_message_ = nullptr; + recv_message_ = nullptr; + return Success{}; + } + MessageHandle message = result.TakeValue(); + test_only_last_message_flags_ = message->flags(); + if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) && + (incoming_compression_algorithm_ != GRPC_COMPRESS_NONE)) { + *recv_message_ = grpc_raw_compressed_byte_buffer_create( + nullptr, 0, incoming_compression_algorithm_); + } else { + *recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0); + } + grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(), + &(*recv_message_)->data.raw.slice_buffer); + GRPC_TRACE_LOG(call, INFO) + << Activity::current()->DebugTag() + << "[call] RecvMessage: outstanding_recv " + "finishes: received " + << (*recv_message_)->data.raw.slice_buffer.length << " byte message"; + recv_message_ = nullptr; + return Success{}; + } grpc_byte_buffer** recv_message_ = nullptr; uint32_t test_only_last_message_flags_ = 0; diff --git a/src/core/lib/transport/call_filters.h b/src/core/lib/transport/call_filters.h index bbdfa4a2fed..27c2a75d60b 100644 --- a/src/core/lib/transport/call_filters.h +++ b/src/core/lib/transport/call_filters.h @@ -24,6 +24,7 @@ #include #include "absl/log/check.h" +#include "src/core/lib/promise/for_each.h" #include "src/core/lib/promise/if.h" #include "src/core/lib/promise/latch.h" #include "src/core/lib/promise/map.h" @@ -121,6 +122,104 @@ struct NoInterceptor {}; namespace filters_detail { +// Flow control across pipe stages. +// This ends up being exceedingly subtle - essentially we need to ensure that +// across a series of pipes we have no more than one outstanding message at a +// time - but those pipes are for the most part independent. +// How we achieve this is that this NextMessage object holds both the message +// and a completion token - the last owning NextMessage instance will call +// the on_progress method on the referenced CallState - and at that point that +// CallState will allow the next message to be sent through it. +// Next, the ForEach promise combiner explicitly holds onto the wrapper object +// owning the result (this object) and extracts the message from it, but doesn't +// dispose that instance until the action promise for the ForEach iteration +// completes, ensuring most callers need do nothing special to have the +// flow control work correctly. +template +class NextMessage { + public: + ~NextMessage() { + if (message_ != end_of_stream() && message_ != error() && + message_ != taken()) { + delete message_; + } + if (call_state_ != nullptr) { + (call_state_->*on_progress)(); + } + } + + NextMessage() = default; + explicit NextMessage(Failure) : message_(error()), call_state_(nullptr) {} + NextMessage(MessageHandle message, CallState* call_state) { + DCHECK_NE(call_state, nullptr); + DCHECK_NE(message.get(), nullptr); + DCHECK(message.get_deleter().has_freelist()); + message_ = message.release(); + call_state_ = call_state; + } + NextMessage(const NextMessage& other) = delete; + NextMessage& operator=(const NextMessage& other) = delete; + NextMessage(NextMessage&& other) noexcept + : message_(std::exchange(other.message_, taken())), + call_state_(std::exchange(other.call_state_, nullptr)) {} + NextMessage& operator=(NextMessage&& other) noexcept { + if (message_ != end_of_stream() && message_ != error() && + message_ != taken()) { + delete message_; + } + if (call_state_ != nullptr) { + (call_state_->*on_progress)(); + } + message_ = std::exchange(other.message_, taken()); + call_state_ = std::exchange(other.call_state_, nullptr); + return *this; + } + + bool ok() const { + DCHECK_NE(message_, taken()); + return message_ != error(); + } + bool has_value() const { + DCHECK_NE(message_, taken()); + DCHECK(ok()); + return message_ != end_of_stream(); + } + StatusFlag status() const { return StatusFlag(ok()); } + Message& value() { + DCHECK_NE(message_, taken()); + DCHECK(ok()); + DCHECK(has_value()); + return *message_; + } + MessageHandle TakeValue() { + DCHECK_NE(message_, taken()); + DCHECK(ok()); + DCHECK(has_value()); + return MessageHandle(std::exchange(message_, taken()), + Arena::PooledDeleter()); + } + bool progressed() const { return call_state_ == nullptr; } + void Progress() { + DCHECK(!progressed()); + (call_state_->*on_progress)(); + call_state_ = nullptr; + } + + private: + static Message* end_of_stream() { return nullptr; } + static Message* error() { return reinterpret_cast(1); } + static Message* taken() { return reinterpret_cast(2); } + Message* message_ = end_of_stream(); + CallState* call_state_ = nullptr; +}; + +template +struct ArgumentMustBeNextMessage; +template +struct ArgumentMustBeNextMessage> { + static constexpr bool value() { return true; } +}; + inline void* Offset(void* base, size_t amt) { return static_cast(base) + amt; } @@ -1301,6 +1400,80 @@ const NoInterceptor ClientInitialMetadataInterceptor::Call::OnFinalize; } // namespace filters_detail +namespace for_each_detail { +template +struct NextValueTraits> { + using NextMsg = filters_detail::NextMessage; + using Value = MessageHandle; + + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static NextValueType Type( + const NextMsg& t) { + if (!t.ok()) return NextValueType::kError; + if (t.has_value()) return NextValueType::kValue; + return NextValueType::kEndOfStream; + } + + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static MessageHandle TakeValue( + NextMsg& t) { + return t.TakeValue(); + } +}; +} // namespace for_each_detail + +template +struct FailureStatusCastImpl, + StatusFlag> { + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static filters_detail::NextMessage< + on_progress> + Cast(StatusFlag flag) { + DCHECK_EQ(flag, Failure{}); + return filters_detail::NextMessage(Failure{}); + } +}; + +namespace promise_detail { +template +struct TrySeqTraitsWithSfinae> { + using UnwrappedType = MessageHandle; + using WrappedType = filters_detail::NextMessage; + template + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static auto CallFactory( + Next* next, WrappedType&& value) { + return next->Make(value.TakeValue()); + } + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static bool IsOk( + const WrappedType& value) { + return value.ok(); + } + static const char* ErrorString(const WrappedType& status) { + DCHECK(!status.ok()); + return "failed"; + } + template + static R ReturnValue(WrappedType&& status) { + DCHECK(!status.ok()); + return WrappedType(Failure{}); + } + template + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static auto CallSeqFactory( + F& f, Elem&& elem, WrappedType value) + -> decltype(f(std::forward(elem), std::declval())) { + return f(std::forward(elem), value.TakeValue()); + } + template + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Poll + CheckResultAndRunNext(WrappedType prior, RunNext run_next) { + if (!prior.ok()) return WrappedType(prior.status()); + return run_next(std::move(prior)); + } +}; +} // namespace promise_detail + +using ServerToClientNextMessage = + filters_detail::NextMessage<&CallState::FinishPullServerToClientMessage>; +using ClientToServerNextMessage = + filters_detail::NextMessage<&CallState::FinishPullClientToServerMessage>; + // Execution environment for a stack of filters. // This is a per-call object. class CallFilters { @@ -1415,10 +1588,10 @@ class CallFilters { Input(CallFilters::*input_location), filters_detail::Layout(filters_detail::StackData::*layout), void (CallState::*on_done)(), typename StackIterator> - class Executor { + class MetadataExecutor { public: - Executor(CallFilters* filters, StackIterator stack_begin, - StackIterator stack_end) + MetadataExecutor(CallFilters* filters, StackIterator stack_begin, + StackIterator stack_end) : stack_current_(stack_begin), stack_end_(stack_end), filters_(filters) { @@ -1466,17 +1639,72 @@ class CallFilters { filters_detail::OperationExecutor executor_; }; + template ( + filters_detail::StackData::*layout), + void (CallState::*on_done)(), typename StackIterator> + class MessageExecutor { + public: + using NextMsg = filters_detail::NextMessage; + + MessageExecutor(CallFilters* filters, StackIterator stack_begin, + StackIterator stack_end) + : stack_current_(stack_begin), + stack_end_(stack_end), + filters_(filters) { + DCHECK_NE((filters_->*input_location).get(), nullptr); + } + + Poll operator()() { + if ((filters_->*input_location) != nullptr) { + if (stack_current_ == stack_end_) { + DCHECK_NE((filters_->*input_location).get(), nullptr); + return NextMsg(std::move(filters_->*input_location), + &filters_->call_state_); + } + return FinishStep(executor_.Start( + &(stack_current_->stack->data_.*layout), + std::move(filters_->*input_location), filters_->call_data_)); + } else { + return FinishStep(executor_.Step(filters_->call_data_)); + } + } + + private: + Poll FinishStep(Poll> p) { + auto* r = p.value_if_ready(); + if (r == nullptr) return Pending{}; + if (r->ok != nullptr) { + ++stack_current_; + if (stack_current_ == stack_end_) { + return NextMsg{std::move(r->ok), &filters_->call_state_}; + } + return FinishStep( + executor_.Start(&(stack_current_->stack->data_.*layout), + std::move(r->ok), filters_->call_data_)); + } + (filters_->call_state_.*on_done)(); + filters_->PushServerTrailingMetadata(std::move(r->error)); + return Failure{}; + } + + StackIterator stack_current_; + StackIterator stack_end_; + CallFilters* filters_; + filters_detail::OperationExecutor executor_; + }; + public: // Client: Fetch client initial metadata // Returns a promise that resolves to ValueOrFailure GRPC_MUST_USE_RESULT auto PullClientInitialMetadata() { call_state_.BeginPullClientInitialMetadata(); - return Executor(this, stacks_.cbegin(), - stacks_.cend()); + return MetadataExecutor( + this, stacks_.cbegin(), stacks_.cend()); } // Server: Push server initial metadata // Returns a promise that resolves to a StatusFlag indicating success @@ -1496,7 +1724,7 @@ class CallFilters { has_server_initial_metadata, [this]() { return Map( - Executor< + MetadataExecutor< absl::optional, ServerMetadataHandle, &CallFilters::push_server_initial_metadata_, @@ -1526,7 +1754,7 @@ class CallFilters { // Client: Indicate that no more messages will be sent void FinishClientToServerSends() { call_state_.ClientToServerHalfClose(); } // Server: Fetch client to server message - // Returns a promise that resolves to ValueOrFailure + // Returns a promise that resolves to ClientToServerNextMessage GRPC_MUST_USE_RESULT auto PullClientToServerMessage() { return TrySeq( [this]() { @@ -1536,16 +1764,15 @@ class CallFilters { return If( message_available, [this]() { - return Executor< - absl::optional, MessageHandle, + return MessageExecutor< &CallFilters::push_client_to_server_message_, &filters_detail::StackData::client_to_server_messages, &CallState::FinishPullClientToServerMessage, StacksVector::const_iterator>(this, stacks_.cbegin(), stacks_.cend()); }, - []() -> ValueOrFailure> { - return absl::optional(); + []() -> ClientToServerNextMessage { + return ClientToServerNextMessage(); }); }); } @@ -1557,7 +1784,7 @@ class CallFilters { return [this]() { return call_state_.PollPushServerToClientMessage(); }; } // Server: Fetch server to client message - // Returns a promise that resolves to ValueOrFailure + // Returns a promise that resolves to ServerToClientNextMessage GRPC_MUST_USE_RESULT auto PullServerToClientMessage() { return TrySeq( [this]() { @@ -1567,16 +1794,15 @@ class CallFilters { return If( message_available, [this]() { - return Executor< - absl::optional, MessageHandle, + return MessageExecutor< &CallFilters::push_server_to_client_message_, &filters_detail::StackData::server_to_client_messages, &CallState::FinishPullServerToClientMessage, StacksVector::const_reverse_iterator>( this, stacks_.crbegin(), stacks_.crend()); }, - []() -> ValueOrFailure> { - return absl::optional(); + []() -> ServerToClientNextMessage { + return ServerToClientNextMessage(); }); }); } @@ -1654,6 +1880,20 @@ class CallFilters { static char g_empty_call_data_; }; +static_assert( + filters_detail::ArgumentMustBeNextMessage< + absl::remove_cvref_t() + ->PullServerToClientMessage()() + .value())>>::value(), + "PullServerToClientMessage must return a NextMessage"); + +static_assert( + filters_detail::ArgumentMustBeNextMessage< + absl::remove_cvref_t() + ->PullClientToServerMessage()() + .value())>>::value(), + "PullServerToClientMessage must return a NextMessage"); + } // namespace grpc_core #endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FILTERS_H diff --git a/src/core/lib/transport/call_spine.h b/src/core/lib/transport/call_spine.h index a962c308f69..4e8ff890cc0 100644 --- a/src/core/lib/transport/call_spine.h +++ b/src/core/lib/transport/call_spine.h @@ -205,6 +205,8 @@ class CallSpine final : public Party { class CallInitiator { public: + using NextMessage = ServerToClientNextMessage; + CallInitiator() = default; explicit CallInitiator(RefCountedPtr spine) : spine_(std::move(spine)) {} @@ -275,6 +277,8 @@ class CallInitiator { class CallHandler { public: + using NextMessage = ClientToServerNextMessage; + explicit CallHandler(RefCountedPtr spine) : spine_(std::move(spine)) {} diff --git a/src/core/server/server.cc b/src/core/server/server.cc index ae404ac1065..d984f8479da 100644 --- a/src/core/server/server.cc +++ b/src/core/server/server.cc @@ -831,14 +831,24 @@ auto Server::MatchAndPublishCall(CallHandler call_handler) { payload_handling = registered_method->payload_handling; rm = registered_method->matcher.get(); } + using FirstMessageResult = + ValueOrFailure>; auto maybe_read_first_message = If( payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, [call_handler]() mutable { - return call_handler.PullMessage(); + return Map( + call_handler.PullMessage(), + [](ClientToServerNextMessage next_msg) + -> FirstMessageResult { + if (!next_msg.ok()) return Failure{}; + if (!next_msg.has_value()) { + return FirstMessageResult(absl::nullopt); + } + return FirstMessageResult(next_msg.TakeValue()); + }); }, - []() -> ValueOrFailure> { - return ValueOrFailure>( - absl::nullopt); + []() -> FirstMessageResult { + return FirstMessageResult(absl::nullopt); }); return TryJoin( std::move(maybe_read_first_message), rm->MatchRequest(0), diff --git a/test/core/call/bm_client_call.cc b/test/core/call/bm_client_call.cc index b79ae1f9caf..d72e705e073 100644 --- a/test/core/call/bm_client_call.cc +++ b/test/core/call/bm_client_call.cc @@ -150,7 +150,7 @@ void BM_Unary(benchmark::State& state) { return status.status(); }), Map(handler.PullMessage(), - [](ValueOrFailure> message) { + [](ClientToServerNextMessage message) { return message.status(); }), handler.PushMessage(std::move(response))), diff --git a/test/core/transport/call_spine_benchmarks.h b/test/core/transport/call_spine_benchmarks.h index c62f1e21733..c4bd19f7a44 100644 --- a/test/core/transport/call_spine_benchmarks.h +++ b/test/core/transport/call_spine_benchmarks.h @@ -55,9 +55,7 @@ void BM_UnaryWithSpawnPerEnd(benchmark::State& state) { return md.status(); }), Map(handler.PullMessage(), - [](ValueOrFailure> msg) { - return msg.status(); - }), + [](ClientToServerNextMessage msg) { return msg.status(); }), handler.PushMessage(fixture.MakePayload())), [&handler_done, &fixture, handler](StatusFlag status) mutable { CHECK(status.ok()); @@ -67,29 +65,27 @@ void BM_UnaryWithSpawnPerEnd(benchmark::State& state) { return Empty{}; }); }); - call.initiator.SpawnInfallible( - "initiator", - [initiator = call.initiator, &fixture, &initiator_done]() mutable { - return Map( - AllOk( - Map(initiator.PushMessage(fixture.MakePayload()), - [](StatusFlag) { return Success{}; }), - Map(initiator.PullServerInitialMetadata(), - [](absl::optional md) { - return Success{}; - }), - Map(initiator.PullMessage(), - [](ValueOrFailure> msg) { - return msg.status(); - }), - Map(initiator.PullServerTrailingMetadata(), - [](ServerMetadataHandle) { return Success(); })), - [&initiator_done](StatusFlag result) { - CHECK(result.ok()); - initiator_done.Notify(); - return Empty{}; - }); - }); + call.initiator.SpawnInfallible("initiator", [initiator = call.initiator, + &fixture, + &initiator_done]() mutable { + return Map( + AllOk( + Map(initiator.PushMessage(fixture.MakePayload()), + [](StatusFlag) { return Success{}; }), + Map(initiator.PullServerInitialMetadata(), + [](absl::optional md) { + return Success{}; + }), + Map(initiator.PullMessage(), + [](ServerToClientNextMessage msg) { return msg.status(); }), + Map(initiator.PullServerTrailingMetadata(), + [](ServerMetadataHandle) { return Success(); })), + [&initiator_done](StatusFlag result) { + CHECK(result.ok()); + initiator_done.Notify(); + return Empty{}; + }); + }); } handler_done.WaitForNotification(); initiator_done.WaitForNotification(); @@ -127,7 +123,7 @@ void BM_UnaryWithSpawnPerOp(benchmark::State& state) { [](ValueOrFailure md) { CHECK(md.ok()); }); handler_spawner.Spawn( "HANDLER:PullMessage", [&]() { return call.handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { CHECK(msg.ok()); call.handler.SpawnInfallible( "HANDLER:PushServerTrailingMetadata", [&]() { @@ -155,9 +151,7 @@ void BM_UnaryWithSpawnPerOp(benchmark::State& state) { initiator_spawner.Spawn( "INITIATOR:PullMessage", [&]() { return call.initiator.PullMessage(); }, - [](ValueOrFailure> msg) { - CHECK(msg.ok()); - }); + [](ServerToClientNextMessage msg) { CHECK(msg.ok()); }); initiator_spawner.Spawn( "INITIATOR:PullServerTrailingMetadata", [&]() { return call.initiator.PullServerTrailingMetadata(); }, @@ -202,7 +196,7 @@ void BM_ClientToServerStreaming(benchmark::State& state) { Notification initiator_done; call.handler.SpawnInfallible("handler", [&]() { return Map(call.handler.PullMessage(), - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { CHECK(msg.ok()); handler_done.Notify(); return Empty{}; diff --git a/test/core/transport/call_spine_test.cc b/test/core/transport/call_spine_test.cc index cd1c8020d5f..bd52ccea21c 100644 --- a/test/core/transport/call_spine_test.cc +++ b/test/core/transport/call_spine_test.cc @@ -89,16 +89,16 @@ void CallSpineTest::UnaryRequest(CallInitiator initiator, CallHandler handler) { ContentTypeMetadata::kApplicationGrpc); return initiator.PullMessage(); }, - [initiator](ValueOrFailure> msg) mutable { + [initiator](ServerToClientNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor"); return initiator.PullMessage(); }, - [initiator](ValueOrFailure> msg) mutable { + [initiator](ServerToClientNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return initiator.PullServerTrailingMetadata(); }, [initiator](ValueOrFailure md) mutable { @@ -116,16 +116,15 @@ void CallSpineTest::UnaryRequest(CallInitiator initiator, CallHandler handler) { kTestPath); return handler.PullMessage(); }, - [handler](ValueOrFailure> msg) mutable { + [handler](ClientToServerNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world"); return handler.PullMessage(); }, - [handler](ValueOrFailure> msg) mutable { + [handler](ClientToServerNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); auto md = Arena::MakePooledForOverwrite(); md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); return handler.PushServerInitialMetadata(std::move(md)); diff --git a/test/core/transport/call_state_test.cc b/test/core/transport/call_state_test.cc index d8b14eab6aa..f39895cc8f2 100644 --- a/test/core/transport/call_state_test.cc +++ b/test/core/transport/call_state_test.cc @@ -333,6 +333,44 @@ TEST(CallStateTest, CanWaitForPullServerMessage) { EXPECT_THAT(state.PollPullServerToClientMessageStarted(), IsReady(Success{})); } +TEST(CallStateTest, ClientSendBlockedUntilPullCompletes) { + StrictMock activity; + activity.Activate(); + CallState state; + state.Start(); + state.PushServerInitialMetadata(); + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady()); + state.FinishPullServerInitialMetadata(); + state.BeginPullClientInitialMetadata(); + state.FinishPullClientInitialMetadata(); + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsPending()); + EXPECT_WAKEUP(activity, state.BeginPushClientToServerMessage()); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_THAT(state.PollPullClientToServerMessageAvailable(), IsReady()); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullClientToServerMessage()); + EXPECT_THAT(state.PollPushClientToServerMessage(), IsReady(Success{})); +} + +TEST(CallStateTest, ServerSendBlockedUntilPullCompletes) { + StrictMock activity; + activity.Activate(); + CallState state; + state.Start(); + state.PushServerInitialMetadata(); + EXPECT_THAT(state.PollPullServerInitialMetadataAvailable(), IsReady()); + state.FinishPullServerInitialMetadata(); + state.BeginPullClientInitialMetadata(); + state.FinishPullClientInitialMetadata(); + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsPending()); + EXPECT_WAKEUP(activity, state.BeginPushServerToClientMessage()); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); + EXPECT_THAT(state.PollPullServerToClientMessageAvailable(), IsReady()); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsPending()); + EXPECT_WAKEUP(activity, state.FinishPullServerToClientMessage()); + EXPECT_THAT(state.PollPushServerToClientMessage(), IsReady(Success{})); +} + } // namespace grpc_core int main(int argc, char** argv) { diff --git a/test/core/transport/chaotic_good/client_transport_test.cc b/test/core/transport/chaotic_good/client_transport_test.cc index 1739d1bbcec..3481230ee10 100644 --- a/test/core/transport/chaotic_good/client_transport_test.cc +++ b/test/core/transport/chaotic_good/client_transport_test.cc @@ -154,17 +154,16 @@ TEST_F(TransportTest, AddOneStream) { return Empty{}; }, [initiator]() mutable { return initiator.PullMessage(); }, - [](ValueOrFailure> msg) { + [](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "12345678"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "12345678"); return Empty{}; }, [initiator]() mutable { return initiator.PullMessage(); }, - [](ValueOrFailure> msg) { + [](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return Empty{}; }, [initiator]() mutable { @@ -245,25 +244,23 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) { return Empty{}; }, initiator.PullMessage(), - [](ValueOrFailure> msg) { + [](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "12345678"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "12345678"); return Empty{}; }, initiator.PullMessage(), - [](ValueOrFailure> msg) { + [](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "87654321"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "87654321"); return Empty{}; }, initiator.PullMessage(), - [](ValueOrFailure> msg) { + [](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return Empty{}; }, initiator.PullServerTrailingMetadata(), diff --git a/test/core/transport/chaotic_good/server_transport_test.cc b/test/core/transport/chaotic_good/server_transport_test.cc index 72e62a89682..7fa61d7f3a6 100644 --- a/test/core/transport/chaotic_good/server_transport_test.cc +++ b/test/core/transport/chaotic_good/server_transport_test.cc @@ -137,17 +137,16 @@ TEST_F(TransportTest, ReadAndWriteOneMessage) { return Empty{}; }, [handler]() mutable { return handler.PullMessage(); }, - [](ValueOrFailure> msg) { + [](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "12345678"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "12345678"); return Empty{}; }, [handler]() mutable { return handler.PullMessage(); }, - [](ValueOrFailure> msg) { + [](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return Empty{}; }, [handler]() mutable { diff --git a/test/core/transport/test_suite/call_content.cc b/test/core/transport/test_suite/call_content.cc index 84a5f9f1a13..82531bfa485 100644 --- a/test/core/transport/test_suite/call_content.cc +++ b/test/core/transport/test_suite/call_content.cc @@ -88,16 +88,15 @@ TRANSPORT_TEST(UnaryWithSomeContent) { UnorderedElementsAreArray(server_initial_metadata)); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - server_payload); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), server_payload); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return initiator.PullServerTrailingMetadata(); }, [&](ValueOrFailure md) { @@ -115,16 +114,15 @@ TRANSPORT_TEST(UnaryWithSomeContent) { UnorderedElementsAreArray(client_initial_metadata)); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - client_payload); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), client_payload); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); auto md = Arena::MakePooledForOverwrite(); FillMetadata(server_initial_metadata, *md); return handler.PushServerInitialMetadata(std::move(md)); diff --git a/test/core/transport/test_suite/call_shapes.cc b/test/core/transport/test_suite/call_shapes.cc index 343720fe2d3..7cfecf26af7 100644 --- a/test/core/transport/test_suite/call_shapes.cc +++ b/test/core/transport/test_suite/call_shapes.cc @@ -49,9 +49,9 @@ TRANSPORT_TEST(MetadataOnlyRequest) { "/foo/bar"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); auto md = Arena::MakePooledForOverwrite(); md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); return handler.PushServerInitialMetadata(std::move(md)); @@ -200,16 +200,16 @@ TRANSPORT_TEST(UnaryRequest) { ContentTypeMetadata::kApplicationGrpc); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return initiator.PullServerTrailingMetadata(); }, [&](ValueOrFailure md) { @@ -227,16 +227,15 @@ TRANSPORT_TEST(UnaryRequest) { "/foo/bar"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); auto md = Arena::MakePooledForOverwrite(); md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); return handler.PushServerInitialMetadata(std::move(md)); @@ -279,10 +278,10 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) { ContentTypeMetadata::kApplicationGrpc); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor"); return initiator.PullServerTrailingMetadata(); }, @@ -301,11 +300,10 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) { "/foo/bar"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world"); auto md = Arena::MakePooledForOverwrite(); md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); return handler.PushServerInitialMetadata(std::move(md)); @@ -346,16 +344,16 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) { initiator.FinishSends(); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return initiator.PullServerTrailingMetadata(); }, [&](ValueOrFailure md) { @@ -380,16 +378,15 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) { EXPECT_TRUE(result.ok()); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return handler.PushMessage(Arena::MakePooled( SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0)); }, @@ -444,9 +441,9 @@ TRANSPORT_TEST(ClientStreamingRequest) { initiator.FinishSends(); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return initiator.PullServerTrailingMetadata(); }, [&](ValueOrFailure md) { @@ -471,44 +468,39 @@ TRANSPORT_TEST(ClientStreamingRequest) { EXPECT_TRUE(result.ok()); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world (2)"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world (2)"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world (3)"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world (3)"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world (4)"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world (4)"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), - "hello world (5)"); + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "hello world (5)"); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); auto md = Arena::MakePooledForOverwrite(); md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED); handler.PushServerTrailingMetadata(std::move(md)); @@ -533,51 +525,51 @@ TRANSPORT_TEST(ServerStreamingRequest) { initiator.FinishSends(); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor (2)"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor (3)"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor (4)"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor (5)"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), "why hello neighbor (6)"); return initiator.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ServerToClientNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return initiator.PullServerTrailingMetadata(); }, [&](ValueOrFailure md) { @@ -602,9 +594,9 @@ TRANSPORT_TEST(ServerStreamingRequest) { EXPECT_TRUE(result.ok()); return handler.PullMessage(); }, - [&](ValueOrFailure> msg) { + [&](ClientToServerNextMessage msg) { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return handler.PushMessage(Arena::MakePooled( SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0)); }, diff --git a/test/core/transport/test_suite/stress.cc b/test/core/transport/test_suite/stress.cc index 98a2997c1b5..548b06640cb 100644 --- a/test/core/transport/test_suite/stress.cc +++ b/test/core/transport/test_suite/stress.cc @@ -53,17 +53,17 @@ TRANSPORT_TEST(ManyUnaryRequests) { ContentTypeMetadata::kApplicationGrpc); return initiator.PullMessage(); }, - [initiator, i, &server_messages]( - ValueOrFailure> msg) mutable { + [initiator, i, + &server_messages](ServerToClientNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), server_messages[i]); return initiator.PullMessage(); }, - [initiator](ValueOrFailure> msg) mutable { + [initiator](ServerToClientNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); return initiator.PullServerTrailingMetadata(); }, [initiator](ValueOrFailure md) mutable { @@ -87,17 +87,17 @@ TRANSPORT_TEST(ManyUnaryRequests) { &*this_call_index)); return handler.PullMessage(); }, - [handler, this_call_index, &client_messages]( - ValueOrFailure> msg) mutable { + [handler, this_call_index, + &client_messages](ClientToServerNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_TRUE(msg.value().has_value()); - EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(), + EXPECT_TRUE(msg.has_value()); + EXPECT_EQ(msg.value().payload()->JoinIntoString(), client_messages[*this_call_index]); return handler.PullMessage(); }, - [handler](ValueOrFailure> msg) mutable { + [handler](ClientToServerNextMessage msg) mutable { EXPECT_TRUE(msg.ok()); - EXPECT_FALSE(msg.value().has_value()); + EXPECT_FALSE(msg.has_value()); auto md = Arena::MakePooledForOverwrite(); md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc); return handler.PushServerInitialMetadata(std::move(md));