[call-v3] Interim step to allow multiple representations of CallSpine (#36321)

Make `CallSpineInterface` describe the operations that we need, and return `Promise<>` types -- these are `std::function` wrappers and involve an allocation.

This ought to be acceptable for the use cases we'll be using `CallSpineInterface` for in the short term - and doing this lets us bring the rest of the v3 stack in concurrently with the "v2.5" stack -- the v2 stack using some of the v3 interfaces I put together to unblock chaotic-good.

Later we'll remove this scaffolding and eventually `CallSpineInterface` in its entirety, in preference to something wrapped around `CallFilters`.

Closes #36321

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36321 from ctiller:cally-2 7f608c36d3
PiperOrigin-RevId: 625096226
pull/36355/head^2
Craig Tiller 10 months ago committed by Copybara-Service
parent 7d04e020df
commit 8a6a02ff4d
  1. 3
      src/core/BUILD
  2. 5
      src/core/ext/filters/deadline/deadline_filter.cc
  3. 2
      src/core/ext/filters/message_size/message_size_filter.cc
  4. 20
      src/core/ext/transport/chaotic_good/client_transport.cc
  5. 54
      src/core/ext/transport/chaotic_good/server_transport.cc
  6. 10
      src/core/ext/transport/inproc/inproc_transport.cc
  7. 16
      src/core/lib/channel/channel_stack_builder_impl.cc
  8. 59
      src/core/lib/channel/connected_channel.cc
  9. 4
      src/core/lib/channel/promise_based_filter.cc
  10. 145
      src/core/lib/channel/promise_based_filter.h
  11. 1
      src/core/lib/channel/server_call_tracer_filter.cc
  12. 84
      src/core/lib/promise/for_each.h
  13. 2
      src/core/lib/promise/pipe.h
  14. 72
      src/core/lib/surface/call.cc
  15. 7
      src/core/lib/surface/call.h
  16. 35
      src/core/lib/surface/server.cc
  17. 2
      src/core/lib/surface/server.h
  18. 7
      src/core/lib/transport/batch_builder.h
  19. 28
      src/core/lib/transport/call_spine.cc
  20. 394
      src/core/lib/transport/call_spine.h
  21. 2
      src/core/lib/transport/transport.h
  22. 9
      test/core/end2end/tests/filter_causes_close.cc
  23. 129
      test/core/transport/chaotic_good/client_transport_error_test.cc
  24. 76
      test/core/transport/chaotic_good/client_transport_test.cc
  25. 92
      test/core/transport/chaotic_good/server_transport_test.cc
  26. 45
      test/core/transport/test_suite/call_content.cc
  27. 337
      test/core/transport/test_suite/call_shapes.cc
  28. 48
      test/core/transport/test_suite/stress.cc
  29. 23
      test/core/transport/test_suite/test.cc
  30. 4
      test/core/transport/test_suite/test.h

@ -6451,7 +6451,6 @@ grpc_cc_library(
"//:endpoint_addresses",
"//:gpr",
"//:grpc_base",
"//:grpc_client_channel",
"//:grpc_public_hdrs",
"//:grpc_resolver",
"//:grpc_service_config_impl",
@ -7341,7 +7340,6 @@ grpc_cc_library(
],
deps = [
"1999",
"call_final_info",
"for_each",
"if",
"latch",
@ -7353,6 +7351,7 @@ grpc_cc_library(
"status_flag",
"try_seq",
"//:gpr",
"//:promise",
],
)

@ -370,8 +370,9 @@ const grpc_channel_filter grpc_server_deadline_filter = {
return next_promise_factory(std::move(call_args));
},
[](grpc_channel_element*, grpc_core::CallSpineInterface* spine) {
spine->client_initial_metadata().receiver.InterceptAndMap(
[](grpc_core::ClientMetadataHandle md) {
grpc_core::DownCast<grpc_core::PipeBasedCallSpine*>(spine)
->client_initial_metadata()
.receiver.InterceptAndMap([](grpc_core::ClientMetadataHandle md) {
auto deadline = md->get(grpc_core::GrpcTimeoutMetadata());
if (deadline.has_value()) {
grpc_core::GetContext<grpc_core::CallContext>()->UpdateDeadline(

@ -168,7 +168,7 @@ ServerMetadataHandle CheckPayload(const Message& msg,
is_send ? "send" : "recv", msg.payload()->Length(), *max_length);
}
if (msg.payload()->Length() <= *max_length) return nullptr;
auto r = GetContext<Arena>()->MakePooled<ServerMetadata>();
auto r = Arena::MakePooled<ServerMetadata>();
r->Set(GrpcStatusMetadata(), GRPC_STATUS_RESOURCE_EXHAUSTED);
r->Set(GrpcMessageMetadata(),
Slice::FromCopiedString(absl::StrFormat(

@ -102,14 +102,12 @@ auto ChaoticGoodClientTransport::PushFrameIntoCall(ServerFragmentFrame frame,
},
[]() -> StatusFlag { return Success{}; });
},
[call_handler, trailers = std::move(frame.trailers)]() mutable {
return If(
trailers != nullptr,
[&call_handler, &trailers]() mutable {
return call_handler.PushServerTrailingMetadata(
std::move(trailers));
},
[]() -> StatusFlag { return Success{}; });
[call_handler,
trailers = std::move(frame.trailers)]() mutable -> StatusFlag {
if (trailers != nullptr) {
call_handler.PushServerTrailingMetadata(std::move(trailers));
}
return Success{};
});
// Wrap the actual sequence with something that owns the call handler so that
// its lifetime extends until the push completes.
@ -223,7 +221,7 @@ void ChaoticGoodClientTransport::AbortWithError() {
for (const auto& pair : stream_map) {
auto call_handler = pair.second;
call_handler.SpawnInfallible("cancel", [call_handler]() mutable {
call_handler.Cancel(ServerMetadataFromStatus(
call_handler.PushServerTrailingMetadata(ServerMetadataFromStatus(
absl::UnavailableError("Transport closed.")));
return Empty{};
});
@ -300,6 +298,10 @@ void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {
const uint32_t stream_id = MakeStream(call_handler);
return Map(CallOutboundLoop(stream_id, call_handler),
[stream_id, this](absl::Status result) {
if (grpc_chaotic_good_trace.enabled()) {
gpr_log(GPR_INFO, "CHAOTIC_GOOD: Call %d finished with %s",
stream_id, result.ToString().c_str());
}
if (!result.ok()) {
CancelFrame frame;
frame.stream_id = stream_id;

@ -72,33 +72,29 @@ auto ChaoticGoodServerTransport::TransportWriteLoop(
auto ChaoticGoodServerTransport::PushFragmentIntoCall(
CallInitiator call_initiator, ClientFragmentFrame frame,
uint32_t stream_id) {
auto& headers = frame.headers;
return TrySeq(
If(
headers != nullptr,
[call_initiator, &headers]() mutable {
return call_initiator.PushClientInitialMetadata(std::move(headers));
},
[]() -> StatusFlag { return Success{}; }),
[call_initiator, message = std::move(frame.message)]() mutable {
return If(
message.has_value(),
[&call_initiator, &message]() mutable {
return call_initiator.PushMessage(std::move(message->message));
},
[]() -> StatusFlag { return Success{}; });
},
[this, call_initiator, end_of_stream = frame.end_of_stream,
stream_id]() mutable -> StatusFlag {
if (end_of_stream) {
call_initiator.FinishSends();
// We have received end_of_stream. It is now safe to remove the call
// from the stream map.
MutexLock lock(&mu_);
stream_map_.erase(stream_id);
}
return Success{};
});
GPR_DEBUG_ASSERT(frame.headers == nullptr);
if (grpc_chaotic_good_trace.enabled()) {
gpr_log(GPR_INFO, "CHAOTIC_GOOD: PushFragmentIntoCall: frame=%s",
frame.ToString().c_str());
}
return TrySeq(If(
frame.message.has_value(),
[&call_initiator, &frame]() mutable {
return call_initiator.PushMessage(
std::move(frame.message->message));
},
[]() -> StatusFlag { return Success{}; }),
[this, call_initiator, end_of_stream = frame.end_of_stream,
stream_id]() mutable -> StatusFlag {
if (end_of_stream) {
call_initiator.FinishSends();
// We have received end_of_stream. It is now safe to remove
// the call from the stream map.
MutexLock lock(&mu_);
stream_map_.erase(stream_id);
}
return Success{};
});
}
auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall(
@ -244,8 +240,8 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall(
FrameLimits{1024 * 1024 * 1024, aligned_bytes_ - 1});
absl::optional<CallInitiator> call_initiator;
if (status.ok()) {
auto create_call_result =
acceptor_->CreateCall(*fragment_frame.headers, arena.release());
auto create_call_result = acceptor_->CreateCall(
std::move(fragment_frame.headers), arena.release());
if (grpc_chaotic_good_trace.enabled()) {
gpr_log(GPR_INFO,
"CHAOTIC_GOOD: DeserializeAndPushFragmentToNewCall: "

@ -83,7 +83,7 @@ class InprocServerTransport final : public RefCounted<InprocServerTransport>,
"inproc transport disconnected");
}
absl::StatusOr<CallInitiator> AcceptCall(ClientMetadata& md) {
absl::StatusOr<CallInitiator> AcceptCall(ClientMetadataHandle md) {
switch (state_.load(std::memory_order_acquire)) {
case ConnectionState::kInitial:
return absl::InternalError(
@ -93,7 +93,7 @@ class InprocServerTransport final : public RefCounted<InprocServerTransport>,
case ConnectionState::kReady:
break;
}
return acceptor_->CreateCall(md, acceptor_->CreateArena());
return acceptor_->CreateCall(std::move(md), acceptor_->CreateArena());
}
private:
@ -116,10 +116,10 @@ class InprocClientTransport final : public Transport, public ClientTransport {
TrySeq(call_handler.PullClientInitialMetadata(),
[server_transport = server_transport_,
call_handler](ClientMetadataHandle md) {
auto call_initiator = server_transport->AcceptCall(*md);
auto call_initiator =
server_transport->AcceptCall(std::move(md));
if (!call_initiator.ok()) return call_initiator.status();
ForwardCall(call_handler, std::move(*call_initiator),
std::move(md));
ForwardCall(call_handler, std::move(*call_initiator));
return absl::OkStatus();
}));
}

@ -95,43 +95,37 @@ const grpc_channel_filter* PromiseTracingFilterFor(
},
/* init_call: */
[](grpc_channel_element* elem, CallSpineInterface* call) {
auto* c = DownCast<PipeBasedCallSpine*>(call);
auto* source_filter =
static_cast<const DerivedFilter*>(elem->filter)->filter;
call->client_initial_metadata().receiver.InterceptAndMap(
c->client_initial_metadata().receiver.InterceptAndMap(
[source_filter](ClientMetadataHandle md) {
gpr_log(GPR_DEBUG, "%s[%s] OnClientInitialMetadata: %s",
GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, md->DebugString().c_str());
return md;
});
call->client_to_server_messages().receiver.InterceptAndMap(
c->client_to_server_messages().receiver.InterceptAndMap(
[source_filter](MessageHandle msg) {
gpr_log(GPR_DEBUG, "%s[%s] OnClientToServerMessage: %s",
GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, msg->DebugString().c_str());
return msg;
});
call->server_initial_metadata().sender.InterceptAndMap(
c->server_initial_metadata().sender.InterceptAndMap(
[source_filter](ServerMetadataHandle md) {
gpr_log(GPR_DEBUG, "%s[%s] OnServerInitialMetadata: %s",
GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, md->DebugString().c_str());
return md;
});
call->server_to_client_messages().sender.InterceptAndMap(
c->server_to_client_messages().sender.InterceptAndMap(
[source_filter](MessageHandle msg) {
gpr_log(GPR_DEBUG, "%s[%s] OnServerToClientMessage: %s",
GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, msg->DebugString().c_str());
return msg;
});
call->server_trailing_metadata().sender.InterceptAndMap(
[source_filter](ServerMetadataHandle md) {
gpr_log(GPR_DEBUG, "%s[%s] OnServerTrailingMetadata: %s",
GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, md->DebugString().c_str());
return md;
});
},
grpc_channel_next_op,
/* sizeof_call_data: */ 0,

@ -463,8 +463,7 @@ ArenaPromise<ServerMetadataHandle> MakeClientCallPromise(Transport* transport,
[](absl::Status) {});
// Start a promise to receive server initial metadata and then forward it up
// through the receiving pipe.
auto server_initial_metadata =
GetContext<Arena>()->MakePooled<ServerMetadata>();
auto server_initial_metadata = Arena::MakePooled<ServerMetadata>();
party->Spawn(
"recv_initial_metadata",
TrySeq(GetContext<BatchBuilder>()->ReceiveServerInitialMetadata(
@ -501,27 +500,25 @@ ArenaPromise<ServerMetadataHandle> MakeClientCallPromise(Transport* transport,
// Create a promise that will receive server trailing metadata.
// If this fails, we massage the error into metadata that we can report
// upwards.
auto server_trailing_metadata =
GetContext<Arena>()->MakePooled<ServerMetadata>();
auto recv_trailing_metadata =
Map(GetContext<BatchBuilder>()->ReceiveServerTrailingMetadata(
stream->batch_target()),
[](absl::StatusOr<ServerMetadataHandle> status) mutable {
if (!status.ok()) {
auto server_trailing_metadata =
GetContext<Arena>()->MakePooled<ServerMetadata>();
grpc_status_code status_code = GRPC_STATUS_UNKNOWN;
std::string message;
grpc_error_get_status(status.status(), Timestamp::InfFuture(),
&status_code, &message, nullptr, nullptr);
server_trailing_metadata->Set(GrpcStatusMetadata(), status_code);
server_trailing_metadata->Set(GrpcMessageMetadata(),
Slice::FromCopiedString(message));
return server_trailing_metadata;
} else {
return std::move(*status);
}
});
auto server_trailing_metadata = Arena::MakePooled<ServerMetadata>();
auto recv_trailing_metadata = Map(
GetContext<BatchBuilder>()->ReceiveServerTrailingMetadata(
stream->batch_target()),
[](absl::StatusOr<ServerMetadataHandle> status) mutable {
if (!status.ok()) {
auto server_trailing_metadata = Arena::MakePooled<ServerMetadata>();
grpc_status_code status_code = GRPC_STATUS_UNKNOWN;
std::string message;
grpc_error_get_status(status.status(), Timestamp::InfFuture(),
&status_code, &message, nullptr, nullptr);
server_trailing_metadata->Set(GrpcStatusMetadata(), status_code);
server_trailing_metadata->Set(GrpcMessageMetadata(),
Slice::FromCopiedString(message));
return server_trailing_metadata;
} else {
return std::move(*status);
}
});
// Finally the main call promise.
// Concurrently: send initial metadata and receive messages, until BOTH
// complete (or one fails).
@ -784,8 +781,7 @@ ArenaPromise<ServerMetadataHandle> MakeServerCallPromise(
if (status.ok()) {
trailing_metadata = std::move(*status);
} else {
trailing_metadata =
GetContext<Arena>()->MakePooled<ClientMetadata>();
trailing_metadata = Arena::MakePooled<ClientMetadata>();
grpc_status_code status_code = GRPC_STATUS_UNKNOWN;
std::string message;
grpc_error_get_status(status.status(), Timestamp::InfFuture(),
@ -888,18 +884,7 @@ ArenaPromise<ServerMetadataHandle> MakeClientTransportCallPromise(
Transport* transport, CallArgs call_args, NextPromiseFactory) {
auto spine = GetContext<CallContext>()->MakeCallSpine(std::move(call_args));
transport->client_transport()->StartCall(CallHandler{spine});
return Map(spine->server_trailing_metadata().receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
if (r.has_value()) {
auto md = std::move(r.value());
md->Set(GrpcStatusFromWire(), true);
return md;
}
auto m = GetContext<Arena>()->MakePooled<ServerMetadata>();
m->Set(GrpcStatusMetadata(), GRPC_STATUS_CANCELLED);
m->Set(GrpcCallWasCancelled(), true);
return m;
});
return spine->PullServerTrailingMetadata();
}
const grpc_channel_filter kClientPromiseBasedTransportFilter =

@ -508,7 +508,7 @@ void BaseCallData::SendMessage::WakeInsideCombiner(Flusher* flusher,
case State::kGotBatch:
if (allow_push_to_pipe) {
state_ = State::kPushedToPipe;
auto message = GetContext<Arena>()->MakePooled<Message>();
auto message = Arena::MakePooled<Message>();
message->payload()->Swap(batch_->payload->send_message.send_message);
message->mutable_flags() = batch_->payload->send_message.flags;
push_ = interceptor()->Push()->Push(std::move(message));
@ -839,7 +839,7 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher,
} else {
state_ = State::kCompletedWhilePushedToPipe;
}
auto message = GetContext<Arena>()->MakePooled<Message>();
auto message = Arena::MakePooled<Message>();
message->payload()->Swap(&**intercepted_slice_buffer_);
message->mutable_flags() = *intercepted_flags_;
push_ = interceptor()->Push()->Push(std::move(message));

@ -534,13 +534,14 @@ inline void InterceptClientToServerMessage(const NoInterceptor*, void*, void*,
template <typename Derived>
inline void InterceptClientToServerMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, call_spine](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnClientToServerMessage(*msg);
if (return_md == nullptr) return std::move(msg);
return call_spine->Cancel(std::move(return_md));
call_spine->PushServerTrailingMetadata(std::move(return_md));
return absl::nullopt;
});
}
@ -548,14 +549,15 @@ template <typename Derived>
inline void InterceptClientToServerMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnClientToServerMessage(*msg, channel);
if (return_md == nullptr) return std::move(msg);
return call_spine->Cancel(std::move(return_md));
call_spine->PushServerTrailingMetadata(std::move(return_md));
return absl::nullopt;
});
}
@ -563,7 +565,7 @@ template <typename Derived>
inline void InterceptClientToServerMessage(
MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, channel](MessageHandle msg) {
@ -575,24 +577,26 @@ template <typename Derived>
inline void InterceptClientToServerMessage(
absl::StatusOr<MessageHandle> (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call->OnClientToServerMessage(std::move(msg), channel);
if (r.ok()) return std::move(*r);
return call_spine->Cancel(ServerMetadataFromStatus(r.status()));
call_spine->PushServerTrailingMetadata(
ServerMetadataFromStatus(r.status()));
return absl::nullopt;
});
}
inline void InterceptClientInitialMetadata(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
PipeBasedCallSpine*) {}
template <typename Derived>
inline void InterceptClientInitialMetadata(
void (Derived::Call::*fn)(ClientMetadata& md), typename Derived::Call* call,
Derived*, CallSpineInterface* call_spine) {
Derived*, PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call](ClientMetadataHandle md) {
@ -605,7 +609,7 @@ template <typename Derived>
inline void InterceptClientInitialMetadata(
void (Derived::Call::*fn)(ClientMetadata& md, Derived* channel),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call, channel](ClientMetadataHandle md) {
@ -617,14 +621,15 @@ inline void InterceptClientInitialMetadata(
template <typename Derived>
inline void InterceptClientInitialMetadata(
ServerMetadataHandle (Derived::Call::*fn)(ClientMetadata& md),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call_spine,
call](ClientMetadataHandle md) -> absl::optional<ClientMetadataHandle> {
auto return_md = call->OnClientInitialMetadata(*md);
if (return_md == nullptr) return std::move(md);
return call_spine->Cancel(std::move(return_md));
call_spine->PushServerTrailingMetadata(std::move(return_md));
return absl::nullopt;
});
}
@ -633,28 +638,31 @@ inline void InterceptClientInitialMetadata(
ServerMetadataHandle (Derived::Call::*fn)(ClientMetadata& md,
Derived* channel),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call_spine, call, channel](
ClientMetadataHandle md) -> absl::optional<ClientMetadataHandle> {
auto return_md = call->OnClientInitialMetadata(*md, channel);
if (return_md == nullptr) return std::move(md);
return call_spine->Cancel(std::move(return_md));
call_spine->PushServerTrailingMetadata(std::move(return_md));
return absl::nullopt;
});
}
template <typename Derived>
inline void InterceptClientInitialMetadata(
absl::Status (Derived::Call::*fn)(ClientMetadata& md),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call_spine,
call](ClientMetadataHandle md) -> absl::optional<ClientMetadataHandle> {
auto status = call->OnClientInitialMetadata(*md);
if (status.ok()) return std::move(md);
return call_spine->Cancel(ServerMetadataFromStatus(status));
call_spine->PushServerTrailingMetadata(
ServerMetadataFromStatus(status));
return absl::nullopt;
});
}
@ -662,14 +670,16 @@ template <typename Derived>
inline void InterceptClientInitialMetadata(
absl::Status (Derived::Call::*fn)(ClientMetadata& md, Derived* channel),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call_spine, call, channel](
ClientMetadataHandle md) -> absl::optional<ClientMetadataHandle> {
auto status = call->OnClientInitialMetadata(*md, channel);
if (status.ok()) return std::move(md);
return call_spine->Cancel(ServerMetadataFromStatus(status));
call_spine->PushServerTrailingMetadata(
ServerMetadataFromStatus(status));
return absl::nullopt;
});
}
@ -681,7 +691,7 @@ absl::void_t<decltype(StatusCast<ServerMetadataHandle>(
InterceptClientInitialMetadata(Promise (Derived::Call::*promise_factory)(
ClientMetadata& md, Derived* channel),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(promise_factory == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call, call_spine, channel](ClientMetadataHandle md) {
@ -691,8 +701,9 @@ InterceptClientInitialMetadata(Promise (Derived::Call::*promise_factory)(
call_spine](PromiseResult<Promise> status) mutable
-> absl::optional<ClientMetadataHandle> {
if (IsStatusOk(status)) return std::move(md);
return call_spine->Cancel(
call_spine->PushServerTrailingMetadata(
StatusCast<ServerMetadataHandle>(std::move(status)));
return absl::nullopt;
});
});
}
@ -766,7 +777,7 @@ inline void InterceptServerInitialMetadata(const NoInterceptor*, void*, void*,
template <typename Derived>
inline void InterceptServerInitialMetadata(
void (Derived::Call::*fn)(ServerMetadata&), typename Derived::Call* call,
Derived*, CallSpineInterface* call_spine) {
Derived*, PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_spine->server_initial_metadata().sender.InterceptAndMap(
[call](ServerMetadataHandle md) {
@ -778,14 +789,16 @@ inline void InterceptServerInitialMetadata(
template <typename Derived>
inline void InterceptServerInitialMetadata(
absl::Status (Derived::Call::*fn)(ServerMetadata&),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_spine->server_initial_metadata().sender.InterceptAndMap(
[call, call_spine](
ServerMetadataHandle md) -> absl::optional<ServerMetadataHandle> {
auto status = call->OnServerInitialMetadata(*md);
if (status.ok()) return std::move(md);
return call_spine->Cancel(ServerMetadataFromStatus(status));
call_spine->PushServerTrailingMetadata(
ServerMetadataFromStatus(status));
return absl::nullopt;
});
}
@ -793,7 +806,7 @@ template <typename Derived>
inline void InterceptServerInitialMetadata(
void (Derived::Call::*fn)(ServerMetadata&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_spine->server_initial_metadata().sender.InterceptAndMap(
[call, channel](ServerMetadataHandle md) {
@ -806,14 +819,16 @@ template <typename Derived>
inline void InterceptServerInitialMetadata(
absl::Status (Derived::Call::*fn)(ServerMetadata&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_spine->server_initial_metadata().sender.InterceptAndMap(
[call, call_spine, channel](
ServerMetadataHandle md) -> absl::optional<ServerMetadataHandle> {
auto status = call->OnServerInitialMetadata(*md, channel);
if (status.ok()) return std::move(md);
return call_spine->Cancel(ServerMetadataFromStatus(status));
call_spine->PullServerTrailingMetadata(
ServerMetadataFromStatus(status));
return absl::nullopt;
});
}
@ -885,13 +900,14 @@ inline void InterceptServerToClientMessage(const NoInterceptor*, void*, void*,
template <typename Derived>
inline void InterceptServerToClientMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, call_spine](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnServerToClientMessage(*msg);
if (return_md == nullptr) return std::move(msg);
return call_spine->Cancel(std::move(return_md));
call_spine->PushServerTrailingMetadata(std::move(return_md));
return absl::nullopt;
});
}
@ -899,14 +915,15 @@ template <typename Derived>
inline void InterceptServerToClientMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnServerToClientMessage(*msg, channel);
if (return_md == nullptr) return std::move(msg);
return call_spine->Cancel(std::move(return_md));
call_spine->PushServerTrailingMetadata(std::move(return_md));
return absl::nullopt;
});
}
@ -914,7 +931,7 @@ template <typename Derived>
inline void InterceptServerToClientMessage(
MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, channel](MessageHandle msg) {
@ -926,14 +943,16 @@ template <typename Derived>
inline void InterceptServerToClientMessage(
absl::StatusOr<MessageHandle> (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call->OnServerToClientMessage(std::move(msg), channel);
if (r.ok()) return std::move(*r);
return call_spine->Cancel(ServerMetadataFromStatus(r.status()));
call_spine->PushServerTrailingMetadata(
ServerMetadataFromStatus(r.status()));
return absl::nullopt;
});
}
@ -942,40 +961,25 @@ inline void InterceptServerTrailingMetadata(const NoInterceptor*, void*, void*,
template <typename Derived>
inline void InterceptServerTrailingMetadata(
void (Derived::Call::*fn)(ServerMetadata&), typename Derived::Call* call,
Derived*, CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata);
call_spine->server_trailing_metadata().sender.InterceptAndMap(
[call](ServerMetadataHandle md) {
call->OnServerTrailingMetadata(*md);
return md;
});
void (Derived::Call::*)(ServerMetadata&), typename Derived::Call*, Derived*,
PipeBasedCallSpine*) {
gpr_log(GPR_ERROR,
"InterceptServerTrailingMetadata not available for call v2.5");
}
template <typename Derived>
inline void InterceptServerTrailingMetadata(
void (Derived::Call::*fn)(ServerMetadata&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata);
call_spine->server_trailing_metadata().sender.InterceptAndMap(
[call, channel](ServerMetadataHandle md) {
call->OnServerTrailingMetadata(*md, channel);
return md;
});
void (Derived::Call::*)(ServerMetadata&, Derived*), typename Derived::Call*,
Derived*, PipeBasedCallSpine*) {
gpr_log(GPR_ERROR,
"InterceptServerTrailingMetadata not available for call v2.5");
}
template <typename Derived>
inline void InterceptServerTrailingMetadata(
absl::Status (Derived::Call::*fn)(ServerMetadata&),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata);
call_spine->server_trailing_metadata().sender.InterceptAndMap(
[call](ServerMetadataHandle md) -> absl::optional<ServerMetadataHandle> {
auto status = call->OnServerTrailingMetadata(*md);
if (status.ok()) return std::move(md);
return ServerMetadataFromStatus(status);
});
absl::Status (Derived::Call::*)(ServerMetadata&), typename Derived::Call*,
Derived*, PipeBasedCallSpine*) {
gpr_log(GPR_ERROR,
"InterceptServerTrailingMetadata not available for call v2.5");
}
inline void InterceptFinalize(const NoInterceptor*, void*, void*) {}
@ -1085,23 +1089,20 @@ class ImplementChannelFilter : public ChannelFilter,
GetContext<Arena>()
->ManagedNew<promise_filter_detail::CallWrapper<Derived>>(
static_cast<Derived*>(this));
auto* c = DownCast<PipeBasedCallSpine*>(call_spine);
auto* d = static_cast<Derived*>(this);
promise_filter_detail::InterceptClientInitialMetadata(
&Derived::Call::OnClientInitialMetadata, call,
static_cast<Derived*>(this), call_spine);
&Derived::Call::OnClientInitialMetadata, call, d, c);
promise_filter_detail::InterceptClientToServerMessage(
&Derived::Call::OnClientToServerMessage, call,
static_cast<Derived*>(this), call_spine);
&Derived::Call::OnClientToServerMessage, call, d, c);
promise_filter_detail::InterceptServerInitialMetadata(
&Derived::Call::OnServerInitialMetadata, call,
static_cast<Derived*>(this), call_spine);
&Derived::Call::OnServerInitialMetadata, call, d, c);
promise_filter_detail::InterceptServerToClientMessage(
&Derived::Call::OnServerToClientMessage, call,
static_cast<Derived*>(this), call_spine);
&Derived::Call::OnServerToClientMessage, call, d, c);
promise_filter_detail::InterceptServerTrailingMetadata(
&Derived::Call::OnServerTrailingMetadata, call,
static_cast<Derived*>(this), call_spine);
promise_filter_detail::InterceptFinalize(&Derived::Call::OnFinalize,
static_cast<Derived*>(this), call);
&Derived::Call::OnServerTrailingMetadata, call, d, c);
promise_filter_detail::InterceptFinalize(&Derived::Call::OnFinalize, d,
call);
}
// Polyfill for the original promise scheme.

@ -106,6 +106,7 @@ absl::StatusOr<ServerCallTracerFilter> ServerCallTracerFilter::Create(
} // namespace
void RegisterServerCallTracerFilter(CoreConfiguration::Builder* builder) {
if (IsChaoticGoodEnabled()) return;
builder->channel_init()->RegisterFilter<ServerCallTracerFilter>(
GRPC_SERVER_CHANNEL);
}

@ -56,13 +56,52 @@ struct Done<StatusFlag> {
static StatusFlag Make(bool cancelled) { return StatusFlag(!cancelled); }
};
template <typename T, typename SfinaeVoid = void>
struct NextValueTraits;
enum class NextValueType {
kValue,
kEndOfStream,
kError,
};
template <typename T>
struct NextValueTraits<T, absl::void_t<typename T::value_type>> {
using Value = typename T::value_type;
static NextValueType Type(const T& t) {
if (t.has_value()) return NextValueType::kValue;
if (t.cancelled()) return NextValueType::kError;
return NextValueType::kEndOfStream;
}
static Value& MutableValue(T& t) { return *t; }
};
template <typename T>
struct NextValueTraits<ValueOrFailure<absl::optional<T>>> {
using Value = T;
static NextValueType Type(const ValueOrFailure<absl::optional<T>>& t) {
if (t.ok()) {
if (t.value().has_value()) return NextValueType::kValue;
return NextValueType::kEndOfStream;
}
return NextValueType::kError;
}
static Value& MutableValue(ValueOrFailure<absl::optional<T>>& t) {
return **t;
}
};
template <typename Reader, typename Action>
class ForEach {
private:
using ReaderNext = decltype(std::declval<Reader>().Next());
using ReaderResult =
typename PollTraits<decltype(std::declval<ReaderNext>()())>::Type;
using ReaderResultValue = typename ReaderResult::value_type;
using ReaderResultValue = typename NextValueTraits<ReaderResult>::Value;
using ActionFactory =
promise_detail::RepeatedPromiseFactory<ReaderResultValue, Action>;
using ActionPromise = typename ActionFactory::Promise;
@ -120,22 +159,37 @@ class ForEach {
Poll<Result> PollReaderNext() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "%s PollReaderNext", DebugTag().c_str());
gpr_log(GPR_INFO, "%s PollReaderNext", DebugTag().c_str());
}
auto r = reader_next_();
if (auto* p = r.value_if_ready()) {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "%s PollReaderNext: got has_value=%s",
DebugTag().c_str(), p->has_value() ? "true" : "false");
}
if (p->has_value()) {
Destruct(&reader_next_);
auto action = action_factory_.Make(std::move(**p));
Construct(&in_action_, std::move(action), std::move(*p));
reading_next_ = false;
return PollAction();
} else {
return Done<Result>::Make(p->cancelled());
switch (NextValueTraits<ReaderResult>::Type(*p)) {
case NextValueType::kValue: {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%s PollReaderNext: got value",
DebugTag().c_str());
}
Destruct(&reader_next_);
auto action = action_factory_.Make(
std::move(NextValueTraits<ReaderResult>::MutableValue(*p)));
Construct(&in_action_, std::move(action), std::move(*p));
reading_next_ = false;
return PollAction();
}
case NextValueType::kEndOfStream: {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%s PollReaderNext: got end of stream",
DebugTag().c_str());
}
return Done<Result>::Make(false);
}
case NextValueType::kError: {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%s PollReaderNext: got error",
DebugTag().c_str());
}
return Done<Result>::Make(true);
}
}
}
return Pending();
@ -143,7 +197,7 @@ class ForEach {
Poll<Result> PollAction() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "%s PollAction", DebugTag().c_str());
gpr_log(GPR_INFO, "%s PollAction", DebugTag().c_str());
}
auto r = in_action_.promise();
if (auto* p = r.value_if_ready()) {

@ -89,7 +89,7 @@ class NextResult final {
const T& operator*() const;
T& operator*();
// Only valid if !has_value()
bool cancelled() { return cancelled_; }
bool cancelled() const { return cancelled_; }
private:
RefCountedPtr<pipe_detail::Center<T>> center_;

@ -2739,7 +2739,7 @@ class ClientPromiseBasedCall final : public PromiseBasedCall {
ScopedContext context(this);
args->channel->channel_stack()->stats_plugin_group->AddClientCallTracers(
*args->path, args->registered_method, this->context());
send_initial_metadata_ = GetContext<Arena>()->MakePooled<ClientMetadata>();
send_initial_metadata_ = Arena::MakePooled<ClientMetadata>();
send_initial_metadata_->Set(HttpPathMetadata(), std::move(*args->path));
if (args->authority.has_value()) {
send_initial_metadata_->Set(HttpAuthorityMetadata(),
@ -2818,7 +2818,7 @@ class ClientPromiseBasedCall final : public PromiseBasedCall {
}
RefCountedPtr<CallSpineInterface> MakeCallSpine(CallArgs call_args) final {
class WrappingCallSpine final : public CallSpineInterface {
class WrappingCallSpine final : public PipeBasedCallSpine {
public:
WrappingCallSpine(ClientPromiseBasedCall* call,
ClientMetadataHandle metadata)
@ -2859,14 +2859,14 @@ class ClientPromiseBasedCall final : public PromiseBasedCall {
return call_->server_to_client_messages_;
}
Pipe<ServerMetadataHandle>& server_trailing_metadata() override {
return server_trailing_metadata_;
}
Latch<ServerMetadataHandle>& cancel_latch() override {
return cancel_error_;
}
Latch<bool>& was_cancelled_latch() override {
return was_cancelled_latch_;
}
Party& party() override { return *call_; }
Arena* arena() override { return call_->arena(); }
@ -2886,6 +2886,7 @@ class ClientPromiseBasedCall final : public PromiseBasedCall {
Pipe<ClientMetadataHandle> client_initial_metadata_{call_->arena()};
Pipe<ServerMetadataHandle> server_trailing_metadata_{call_->arena()};
Latch<ServerMetadataHandle> cancel_error_;
Latch<bool> was_cancelled_latch_;
};
GPR_ASSERT(call_args.server_initial_metadata ==
&server_initial_metadata_.sender);
@ -3700,11 +3701,12 @@ ServerPromiseBasedCall::MakeTopOfServerCallPromise(
///////////////////////////////////////////////////////////////////////////////
// CallSpine based Server Call
class ServerCallSpine final : public CallSpineInterface,
class ServerCallSpine final : public PipeBasedCallSpine,
public ServerCallContext,
public BasicPromiseBasedCall {
public:
ServerCallSpine(ServerInterface* server, Channel* channel, Arena* arena);
ServerCallSpine(ClientMetadataHandle client_initial_metadata,
ServerInterface* server, Channel* channel, Arena* arena);
// CallSpineInterface
Pipe<ClientMetadataHandle>& client_initial_metadata() override {
@ -3719,10 +3721,8 @@ class ServerCallSpine final : public CallSpineInterface,
Pipe<MessageHandle>& server_to_client_messages() override {
return server_to_client_messages_;
}
Pipe<ServerMetadataHandle>& server_trailing_metadata() override {
return server_trailing_metadata_;
}
Latch<ServerMetadataHandle>& cancel_latch() override { return cancel_latch_; }
Latch<bool>& was_cancelled_latch() override { return was_cancelled_latch_; }
Party& party() override { return *this; }
Arena* arena() override { return BasicPromiseBasedCall::arena(); }
void IncrementRefCount() override { InternalRef("CallSpine"); }
@ -3735,7 +3735,9 @@ class ServerCallSpine final : public CallSpineInterface,
}
void CancelWithError(grpc_error_handle error) override {
SpawnInfallible("CancelWithError", [this, error = std::move(error)] {
std::ignore = Cancel(ServerMetadataFromStatus(error));
auto status = ServerMetadataFromStatus(error);
status->Set(GrpcCallWasCancelled(), true);
PushServerTrailingMetadata(std::move(status));
return Empty{};
});
}
@ -3784,15 +3786,15 @@ class ServerCallSpine final : public CallSpineInterface,
Pipe<MessageHandle> client_to_server_messages_;
// Messages travelling from the transport to the application.
Pipe<MessageHandle> server_to_client_messages_;
// Trailing metadata from server to client
Pipe<ServerMetadataHandle> server_trailing_metadata_;
// Latch that can be set to terminate the call
Latch<ServerMetadataHandle> cancel_latch_;
Latch<bool> was_cancelled_latch_;
grpc_byte_buffer** recv_message_ = nullptr;
ClientMetadataHandle client_initial_metadata_stored_;
};
ServerCallSpine::ServerCallSpine(ServerInterface* server, Channel* channel,
ServerCallSpine::ServerCallSpine(ClientMetadataHandle client_initial_metadata,
ServerInterface* server, Channel* channel,
Arena* arena)
: BasicPromiseBasedCall(arena, 0, 1,
[channel, server]() -> grpc_call_create_args {
@ -3811,11 +3813,15 @@ ServerCallSpine::ServerCallSpine(ServerInterface* server, Channel* channel,
client_initial_metadata_(arena),
server_initial_metadata_(arena),
client_to_server_messages_(arena),
server_to_client_messages_(arena),
server_trailing_metadata_(arena) {
server_to_client_messages_(arena) {
global_stats().IncrementServerCallsCreated();
ScopedContext ctx(this);
channel->channel_stack()->InitServerCallSpine(this);
SpawnGuarded("push_client_initial_metadata",
[this, md = std::move(client_initial_metadata)]() mutable {
return Map(client_initial_metadata_.sender.Push(std::move(md)),
[](bool r) { return StatusFlag(r); });
});
}
void ServerCallSpine::PublishInitialMetadata(
@ -4081,10 +4087,15 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
metadata->Set(GrpcMessageMetadata(),
Slice(grpc_slice_copy(*details)));
}
GPR_ASSERT(metadata != nullptr);
return [this, metadata = std::move(metadata)]() mutable {
server_to_client_messages_.sender.Close();
return Map(server_trailing_metadata_.sender.Push(std::move(metadata)),
[](bool r) { return StatusFlag(r); });
GPR_ASSERT(metadata != nullptr);
return [this,
metadata = std::move(metadata)]() mutable -> Poll<Success> {
GPR_ASSERT(metadata != nullptr);
PushServerTrailingMetadata(std::move(metadata));
return Success{};
};
};
});
auto recv_message =
@ -4099,13 +4110,15 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
};
});
auto primary_ops = AllOk<StatusFlag>(
std::move(send_initial_metadata), std::move(send_message),
std::move(send_trailing_metadata), std::move(recv_message));
TrySeq(AllOk<StatusFlag>(std::move(send_initial_metadata),
std::move(send_message)),
std::move(send_trailing_metadata)),
std::move(recv_message));
if (got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER] != 255) {
auto recv_trailing_metadata = MaybeOp(
ops, got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER], [this](const grpc_op& op) {
return [this, cancelled = op.data.recv_close_on_server.cancelled]() {
return Map(server_trailing_metadata_.receiver.AwaitClosed(),
return Map(WasCancelled(),
[cancelled, this](bool result) -> Success {
ResetDeadline();
*cancelled = result ? 1 : 0;
@ -4141,14 +4154,15 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
}
}
RefCountedPtr<CallSpineInterface> MakeServerCall(ServerInterface* server,
Channel* channel,
Arena* arena) {
return RefCountedPtr<ServerCallSpine>(
arena->New<ServerCallSpine>(server, channel, arena));
RefCountedPtr<CallSpineInterface> MakeServerCall(
ClientMetadataHandle client_initial_metadata, ServerInterface* server,
Channel* channel, Arena* arena) {
return RefCountedPtr<ServerCallSpine>(arena->New<ServerCallSpine>(
std::move(client_initial_metadata), server, channel, arena));
}
#else
RefCountedPtr<CallSpineInterface> MakeServerCall(ServerInterface*, Channel*,
RefCountedPtr<CallSpineInterface> MakeServerCall(ClientMetadataHandle,
ServerInterface*, Channel*,
Arena*) {
Crash("not implemented");
}

@ -158,9 +158,10 @@ class CallContext {
template <>
struct ContextType<CallContext> {};
RefCountedPtr<CallSpineInterface> MakeServerCall(ServerInterface* server,
Channel* channel,
Arena* arena);
// TODO(ctiller): remove once call-v3 finalized
RefCountedPtr<CallSpineInterface> MakeServerCall(
ClientMetadataHandle client_initial_metadata, ServerInterface* server,
Channel* channel, Arena* arena);
} // namespace grpc_core

@ -232,7 +232,8 @@ struct Server::RequestedCall {
data.registered.optional_payload = optional_payload;
}
void Complete(NextResult<MessageHandle> payload, ClientMetadata& md) {
template <typename OptionalPayload>
void Complete(OptionalPayload payload, ClientMetadata& md) {
Timestamp deadline = GetContext<CallContext>()->deadline();
switch (type) {
case RequestedCall::Type::BATCH_CALL:
@ -1301,9 +1302,10 @@ Server::ChannelData::~ChannelData() {
Arena* Server::ChannelData::CreateArena() { return channel_->CreateArena(); }
absl::StatusOr<CallInitiator> Server::ChannelData::CreateCall(
ClientMetadata& client_initial_metadata, Arena* arena) {
SetRegisteredMethodOnMetadata(client_initial_metadata);
auto call = MakeServerCall(server_.get(), channel_.get(), arena);
ClientMetadataHandle client_initial_metadata, Arena* arena) {
SetRegisteredMethodOnMetadata(*client_initial_metadata);
auto call = MakeServerCall(std::move(client_initial_metadata), server_.get(),
channel_.get(), arena);
InitCall(call);
return CallInitiator(std::move(call));
}
@ -1427,10 +1429,10 @@ void Server::ChannelData::InitCall(RefCountedPtr<CallSpineInterface> call) {
call->SpawnGuarded("request_matcher", [this, call]() {
return TrySeq(
// Wait for initial metadata to pass through all filters
Map(call->client_initial_metadata().receiver.Next(),
[](NextResult<ClientMetadataHandle> md)
Map(call->PullClientInitialMetadata(),
[](ValueOrFailure<ClientMetadataHandle> md)
-> absl::StatusOr<ClientMetadataHandle> {
if (!md.has_value()) {
if (!md.ok()) {
return absl::InternalError("Missing metadata");
}
if (!md.value()->get_pointer(HttpPathMetadata())) {
@ -1456,24 +1458,19 @@ void Server::ChannelData::InitCall(RefCountedPtr<CallSpineInterface> call) {
}
auto maybe_read_first_message = If(
payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
[call]() {
return call->client_to_server_messages().receiver.Next();
},
[]() -> NextResult<MessageHandle> {
return NextResult<MessageHandle>();
[call]() { return call->PullClientToServerMessage(); },
[]() -> ValueOrFailure<absl::optional<MessageHandle>> {
return ValueOrFailure<absl::optional<MessageHandle>>(
absl::nullopt);
});
return TryJoin<absl::StatusOr>(
Map(std::move(maybe_read_first_message),
[](NextResult<MessageHandle> n) {
return ValueOrFailure<NextResult<MessageHandle>>{
std::move(n)};
}),
rm->MatchRequest(cq_idx()), [md = std::move(md)]() mutable {
std::move(maybe_read_first_message), rm->MatchRequest(cq_idx()),
[md = std::move(md)]() mutable {
return ValueOrFailure<ClientMetadataHandle>(std::move(md));
});
},
// Publish call to cq
[](std::tuple<NextResult<MessageHandle>,
[](std::tuple<absl::optional<MessageHandle>,
RequestMatcherInterface::MatchResult,
ClientMetadataHandle>
r) {

@ -246,7 +246,7 @@ class Server : public ServerInterface,
Arena* CreateArena() override;
absl::StatusOr<CallInitiator> CreateCall(
ClientMetadata& client_initial_metadata, Arena* arena) override;
ClientMetadataHandle client_initial_metadata, Arena* arena) override;
private:
class ConnectivityWatcher;

@ -147,8 +147,7 @@ class BatchBuilder {
absl::string_view name() const override { return "receive_message"; }
MessageHandle IntoMessageHandle() {
return GetContext<Arena>()->MakePooled<Message>(std::move(*payload),
flags);
return Arena::MakePooled<Message>(std::move(*payload), flags);
}
absl::optional<SliceBuffer> payload;
@ -161,7 +160,7 @@ class BatchBuilder {
using PendingCompletion::PendingCompletion;
Arena::PoolPtr<grpc_metadata_batch> metadata =
GetContext<Arena>()->MakePooled<grpc_metadata_batch>();
Arena::MakePooled<grpc_metadata_batch>();
protected:
~PendingReceiveMetadata() = default;
@ -328,7 +327,7 @@ inline auto BatchBuilder::SendClientTrailingMetadata(Target target) {
auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends);
batch->batch.on_complete = &pc->on_done_closure;
batch->batch.send_trailing_metadata = true;
auto metadata = GetContext<Arena>()->MakePooled<grpc_metadata_batch>();
auto metadata = Arena::MakePooled<grpc_metadata_batch>();
payload_->send_trailing_metadata.send_trailing_metadata = metadata.get();
payload_->send_trailing_metadata.sent = nullptr;
pc->send_trailing_metadata = std::move(metadata);

@ -18,16 +18,7 @@
namespace grpc_core {
void ForwardCall(CallHandler call_handler, CallInitiator call_initiator,
ClientMetadataHandle client_initial_metadata) {
// Send initial metadata.
call_initiator.SpawnGuarded(
"send_initial_metadata",
[client_initial_metadata = std::move(client_initial_metadata),
call_initiator]() mutable {
return call_initiator.PushClientInitialMetadata(
std::move(client_initial_metadata));
});
void ForwardCall(CallHandler call_handler, CallInitiator call_initiator) {
// Read messages from handler into initiator.
call_handler.SpawnGuarded("read_messages", [call_handler,
call_initiator]() mutable {
@ -88,10 +79,10 @@ void ForwardCall(CallHandler call_handler, CallInitiator call_initiator,
})),
call_initiator.PullServerTrailingMetadata(),
[call_handler](ServerMetadataHandle md) mutable {
call_handler.SpawnGuarded(
"recv_trailing_metadata",
[md = std::move(md), call_handler]() mutable {
return call_handler.PushServerTrailingMetadata(std::move(md));
call_handler.SpawnInfallible(
"recv_trailing", [call_handler, md = std::move(md)]() mutable {
call_handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
return Empty{};
});
@ -99,9 +90,12 @@ void ForwardCall(CallHandler call_handler, CallInitiator call_initiator,
}
CallInitiatorAndHandler MakeCall(
grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena) {
auto spine = CallSpine::Create(event_engine, arena);
return {CallInitiator(spine), CallHandler(spine)};
ClientMetadataHandle client_initial_metadata,
grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena,
bool is_arena_owned) {
auto spine = CallSpine::Create(std::move(client_initial_metadata),
event_engine, arena, is_arena_owned);
return {CallInitiator(spine), UnstartedCallHandler(spine)};
}
} // namespace grpc_core

@ -25,6 +25,7 @@
#include "src/core/lib/promise/party.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/prioritized_race.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/status_flag.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/transport/message.h"
@ -42,12 +43,6 @@ namespace grpc_core {
class CallSpineInterface {
public:
virtual ~CallSpineInterface() = default;
virtual Pipe<ClientMetadataHandle>& client_initial_metadata() = 0;
virtual Pipe<ServerMetadataHandle>& server_initial_metadata() = 0;
virtual Pipe<MessageHandle>& client_to_server_messages() = 0;
virtual Pipe<MessageHandle>& server_to_client_messages() = 0;
virtual Pipe<ServerMetadataHandle>& server_trailing_metadata() = 0;
virtual Latch<ServerMetadataHandle>& cancel_latch() = 0;
// Add a callback to be called when server trailing metadata is received.
void OnDone(absl::AnyInvocable<void()> fn) {
if (on_done_ == nullptr) {
@ -67,33 +62,24 @@ class CallSpineInterface {
virtual void IncrementRefCount() = 0;
virtual void Unref() = 0;
// Cancel the call with the given metadata.
// Regarding the `MUST_USE_RESULT absl::nullopt_t`:
// Most cancellation calls right now happen in pipe interceptors;
// there `nullopt` indicates terminate processing of this pipe and close with
// error.
// It's convenient then to have the Cancel operation (setting the latch to
// terminate the call) be the last thing that occurs in a pipe interceptor,
// and this construction supports that (and has helped the author not write
// some bugs).
GRPC_MUST_USE_RESULT absl::nullopt_t Cancel(ServerMetadataHandle metadata) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
auto& c = cancel_latch();
if (c.is_set()) return absl::nullopt;
c.Set(std::move(metadata));
CallOnDone();
client_initial_metadata().sender.CloseWithError();
server_initial_metadata().sender.CloseWithError();
client_to_server_messages().sender.CloseWithError();
server_to_client_messages().sender.CloseWithError();
server_trailing_metadata().sender.CloseWithError();
return absl::nullopt;
}
auto WaitForCancel() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return cancel_latch().Wait();
}
virtual Promise<ValueOrFailure<absl::optional<ServerMetadataHandle>>>
PullServerInitialMetadata() = 0;
virtual Promise<ServerMetadataHandle> PullServerTrailingMetadata() = 0;
virtual Promise<StatusFlag> PushClientToServerMessage(
MessageHandle message) = 0;
virtual Promise<ValueOrFailure<absl::optional<MessageHandle>>>
PullClientToServerMessage() = 0;
virtual Promise<StatusFlag> PushServerToClientMessage(
MessageHandle message) = 0;
virtual Promise<ValueOrFailure<absl::optional<MessageHandle>>>
PullServerToClientMessage() = 0;
virtual void PushServerTrailingMetadata(ServerMetadataHandle md) = 0;
virtual void FinishSends() = 0;
virtual Promise<ValueOrFailure<ClientMetadataHandle>>
PullClientInitialMetadata() = 0;
virtual Promise<StatusFlag> PushServerInitialMetadata(
absl::optional<ServerMetadataHandle> md) = 0;
virtual Promise<bool> WasCancelled() = 0;
// Wrap a promise so that if it returns failure it automatically cancels
// the rest of the call.
@ -105,7 +91,7 @@ class CallSpineInterface {
using ResultType = typename P::Result;
return Map(std::move(promise), [this](ResultType r) {
if (!IsStatusOk(r)) {
std::ignore = Cancel(StatusCast<ServerMetadataHandle>(r));
PushServerTrailingMetadata(StatusCast<ServerMetadataHandle>(r));
}
return r;
});
@ -121,7 +107,8 @@ class CallSpineInterface {
// Spawn a promise that returns some status-like type; if the status
// represents failure automatically cancel the rest of the call.
template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) {
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory,
DebugLocation whence = {}) {
using FactoryType =
promise_detail::OncePromiseFactory<void, PromiseFactory>;
using PromiseType = typename FactoryType::Promise;
@ -130,27 +117,158 @@ class CallSpineInterface {
std::is_same<bool,
decltype(IsStatusOk(std::declval<ResultType>()))>::value,
"SpawnGuarded promise must return a status-like object");
party().Spawn(name, std::move(promise_factory), [this](ResultType r) {
if (!IsStatusOk(r)) {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "SpawnGuarded sees failure: %s",
r.ToString().c_str());
}
std::ignore = Cancel(StatusCast<ServerMetadataHandle>(std::move(r)));
}
});
party().Spawn(
name, std::move(promise_factory), [this, whence](ResultType r) {
if (!IsStatusOk(r)) {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "SpawnGuarded sees failure: %s (source: %s:%d)",
r.ToString().c_str(), whence.file(), whence.line());
}
auto status = StatusCast<ServerMetadataHandle>(std::move(r));
status->Set(GrpcCallWasCancelled(), true);
PushServerTrailingMetadata(std::move(status));
}
});
}
private:
absl::AnyInvocable<void()> on_done_{nullptr};
};
class CallSpine final : public CallSpineInterface, public Party {
// Implementation of CallSpine atop the v2 Pipe based arrangement.
// This implementation will go away in favor of an implementation atop
// CallFilters by the time v3 lands.
class PipeBasedCallSpine : public CallSpineInterface {
public:
virtual Pipe<ClientMetadataHandle>& client_initial_metadata() = 0;
virtual Pipe<ServerMetadataHandle>& server_initial_metadata() = 0;
virtual Pipe<MessageHandle>& client_to_server_messages() = 0;
virtual Pipe<MessageHandle>& server_to_client_messages() = 0;
virtual Latch<ServerMetadataHandle>& cancel_latch() = 0;
virtual Latch<bool>& was_cancelled_latch() = 0;
Promise<ValueOrFailure<absl::optional<ServerMetadataHandle>>>
PullServerInitialMetadata() final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return Map(server_initial_metadata().receiver.Next(),
[](NextResult<ServerMetadataHandle> md)
-> ValueOrFailure<absl::optional<ServerMetadataHandle>> {
if (!md.has_value()) {
if (md.cancelled()) return Failure{};
return absl::optional<ServerMetadataHandle>();
}
return absl::optional<ServerMetadataHandle>(std::move(*md));
});
}
Promise<ServerMetadataHandle> PullServerTrailingMetadata() final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return cancel_latch().Wait();
}
Promise<ValueOrFailure<absl::optional<MessageHandle>>>
PullServerToClientMessage() final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return Map(server_to_client_messages().receiver.Next(), MapNextMessage);
}
Promise<StatusFlag> PushClientToServerMessage(MessageHandle message) final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return Map(client_to_server_messages().sender.Push(std::move(message)),
[](bool r) { return StatusFlag(r); });
}
Promise<ValueOrFailure<absl::optional<MessageHandle>>>
PullClientToServerMessage() final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return Map(client_to_server_messages().receiver.Next(), MapNextMessage);
}
Promise<StatusFlag> PushServerToClientMessage(MessageHandle message) final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return Map(server_to_client_messages().sender.Push(std::move(message)),
[](bool r) { return StatusFlag(r); });
}
void FinishSends() final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
client_to_server_messages().sender.Close();
}
void PushServerTrailingMetadata(ServerMetadataHandle metadata) final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
auto& c = cancel_latch();
if (c.is_set()) return;
const bool was_cancelled =
metadata->get(GrpcCallWasCancelled()).value_or(false);
c.Set(std::move(metadata));
CallOnDone();
was_cancelled_latch().Set(was_cancelled);
client_initial_metadata().sender.CloseWithError();
server_initial_metadata().sender.Close();
client_to_server_messages().sender.CloseWithError();
server_to_client_messages().sender.Close();
}
Promise<bool> WasCancelled() final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return was_cancelled_latch().Wait();
}
Promise<ValueOrFailure<ClientMetadataHandle>> PullClientInitialMetadata()
final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return Map(client_initial_metadata().receiver.Next(),
[](NextResult<ClientMetadataHandle> md)
-> ValueOrFailure<ClientMetadataHandle> {
if (!md.has_value()) return Failure{};
return std::move(*md);
});
}
Promise<StatusFlag> PushServerInitialMetadata(
absl::optional<ServerMetadataHandle> md) final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return If(
md.has_value(),
[&md, this]() {
return Map(server_initial_metadata().sender.Push(std::move(*md)),
[](bool ok) { return StatusFlag(ok); });
},
[this]() {
server_initial_metadata().sender.Close();
return []() -> StatusFlag { return Success{}; };
});
}
private:
static ValueOrFailure<absl::optional<MessageHandle>> MapNextMessage(
NextResult<MessageHandle> r) {
if (!r.has_value()) {
if (r.cancelled()) return Failure{};
return absl::optional<MessageHandle>();
}
return absl::optional<MessageHandle>(std::move(*r));
}
};
class CallSpine final : public PipeBasedCallSpine, public Party {
public:
static RefCountedPtr<CallSpine> Create(
grpc_event_engine::experimental::EventEngine* event_engine,
Arena* arena) {
return RefCountedPtr<CallSpine>(arena->New<CallSpine>(event_engine, arena));
ClientMetadataHandle client_initial_metadata,
grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena,
bool is_arena_owned) {
auto spine = RefCountedPtr<CallSpine>(
arena->New<CallSpine>(event_engine, arena, is_arena_owned));
spine->SpawnInfallible(
"push_client_initial_metadata",
[spine = spine.get(), client_initial_metadata = std::move(
client_initial_metadata)]() mutable {
return Map(spine->client_initial_metadata_.sender.Push(
std::move(client_initial_metadata)),
[](bool) { return Empty{}; });
});
return spine;
}
Pipe<ClientMetadataHandle>& client_initial_metadata() override {
@ -165,10 +283,8 @@ class CallSpine final : public CallSpineInterface, public Party {
Pipe<MessageHandle>& server_to_client_messages() override {
return server_to_client_messages_;
}
Pipe<ServerMetadataHandle>& server_trailing_metadata() override {
return server_trailing_metadata_;
}
Latch<ServerMetadataHandle>& cancel_latch() override { return cancel_latch_; }
Latch<bool>& was_cancelled_latch() override { return was_cancelled_latch_; }
Party& party() override { return *this; }
Arena* arena() override { return arena_; }
void IncrementRefCount() override { Party::IncrementRefCount(); }
@ -177,8 +293,11 @@ class CallSpine final : public CallSpineInterface, public Party {
private:
friend class Arena;
CallSpine(grpc_event_engine::experimental::EventEngine* event_engine,
Arena* arena)
: Party(1), arena_(arena), event_engine_(event_engine) {}
Arena* arena, bool is_arena_owned)
: Party(1),
arena_(arena),
is_arena_owned_(is_arena_owned),
event_engine_(event_engine) {}
class ScopedContext : public ScopedActivity,
public promise_detail::Context<Arena> {
@ -208,6 +327,7 @@ class CallSpine final : public CallSpineInterface, public Party {
}
Arena* arena_;
bool is_arena_owned_;
// Initial metadata from client to server
Pipe<ClientMetadataHandle> client_initial_metadata_{arena()};
// Initial metadata from server to client
@ -216,10 +336,9 @@ class CallSpine final : public CallSpineInterface, public Party {
Pipe<MessageHandle> client_to_server_messages_{arena()};
// Messages travelling from the transport to the application.
Pipe<MessageHandle> server_to_client_messages_{arena()};
// Trailing metadata from server to client
Pipe<ServerMetadataHandle> server_trailing_metadata_{arena()};
// Latch that can be set to terminate the call
Latch<ServerMetadataHandle> cancel_latch_;
Latch<bool> was_cancelled_latch_;
// Event engine associated with this call
grpc_event_engine::experimental::EventEngine* const event_engine_;
};
@ -229,73 +348,31 @@ class CallInitiator {
explicit CallInitiator(RefCountedPtr<CallSpineInterface> spine)
: spine_(std::move(spine)) {}
auto PushClientInitialMetadata(ClientMetadataHandle md) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(spine_->client_initial_metadata().sender.Push(std::move(md)),
[](bool ok) { return StatusFlag(ok); });
template <typename Promise>
auto CancelIfFails(Promise promise) {
return spine_->CancelIfFails(std::move(promise));
}
auto PullServerInitialMetadata() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(spine_->server_initial_metadata().receiver.Next(),
[](NextResult<ServerMetadataHandle> md)
-> ValueOrFailure<absl::optional<ServerMetadataHandle>> {
if (!md.has_value()) {
if (md.cancelled()) return Failure{};
return absl::optional<ServerMetadataHandle>();
}
return absl::optional<ServerMetadataHandle>(std::move(*md));
});
}
auto PullServerTrailingMetadata() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return PrioritizedRace(
Seq(spine_->server_trailing_metadata().receiver.Next(),
[spine = spine_](NextResult<ServerMetadataHandle> md) mutable {
return [md = std::move(md),
spine]() mutable -> Poll<ServerMetadataHandle> {
// If the pipe was closed at cancellation time, we'll see no
// value here. Return pending and allow the cancellation to win
// the race.
if (!md.has_value()) return Pending{};
spine->server_trailing_metadata().sender.Close();
return std::move(*md);
};
}),
Map(spine_->WaitForCancel(),
[spine = spine_](ServerMetadataHandle md) -> ServerMetadataHandle {
spine->server_trailing_metadata().sender.CloseWithError();
return md;
}));
}
auto PullMessage() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return spine_->server_to_client_messages().receiver.Next();
return spine_->PullServerInitialMetadata();
}
auto PushMessage(MessageHandle message) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(
spine_->client_to_server_messages().sender.Push(std::move(message)),
[](bool r) { return StatusFlag(r); });
return spine_->PushClientToServerMessage(std::move(message));
}
void FinishSends() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
spine_->client_to_server_messages().sender.Close();
}
void FinishSends() { spine_->FinishSends(); }
template <typename Promise>
auto CancelIfFails(Promise promise) {
return spine_->CancelIfFails(std::move(promise));
auto PullMessage() { return spine_->PullServerToClientMessage(); }
auto PullServerTrailingMetadata() {
return spine_->PullServerTrailingMetadata();
}
void Cancel() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
std::ignore =
spine_->Cancel(ServerMetadataFromStatus(absl::CancelledError()));
auto status = ServerMetadataFromStatus(absl::CancelledError());
status->Set(GrpcCallWasCancelled(), true);
spine_->PushServerTrailingMetadata(std::move(status));
}
void OnDone(absl::AnyInvocable<void()> fn) { spine_->OnDone(std::move(fn)); }
@ -327,55 +404,59 @@ class CallHandler {
: spine_(std::move(spine)) {}
auto PullClientInitialMetadata() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(spine_->client_initial_metadata().receiver.Next(),
[](NextResult<ClientMetadataHandle> md)
-> ValueOrFailure<ClientMetadataHandle> {
if (!md.has_value()) return Failure{};
return std::move(*md);
});
return spine_->PullClientInitialMetadata();
}
auto PushServerInitialMetadata(absl::optional<ServerMetadataHandle> md) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return If(
md.has_value(),
[&md, this]() {
return Map(
spine_->server_initial_metadata().sender.Push(std::move(*md)),
[](bool ok) { return StatusFlag(ok); });
},
[this]() {
spine_->server_initial_metadata().sender.Close();
return []() -> StatusFlag { return Success{}; };
});
return spine_->PushServerInitialMetadata(std::move(md));
}
auto PushServerTrailingMetadata(ServerMetadataHandle md) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
spine_->server_initial_metadata().sender.Close();
spine_->server_to_client_messages().sender.Close();
spine_->client_to_server_messages().receiver.CloseWithError();
spine_->CallOnDone();
return Map(spine_->server_trailing_metadata().sender.Push(std::move(md)),
[](bool ok) { return StatusFlag(ok); });
void PushServerTrailingMetadata(ServerMetadataHandle status) {
spine_->PushServerTrailingMetadata(std::move(status));
}
auto PullMessage() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return spine_->client_to_server_messages().receiver.Next();
void OnDone(absl::AnyInvocable<void()> fn) { spine_->OnDone(std::move(fn)); }
template <typename Promise>
auto CancelIfFails(Promise promise) {
return spine_->CancelIfFails(std::move(promise));
}
auto PushMessage(MessageHandle message) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(
spine_->server_to_client_messages().sender.Push(std::move(message)),
[](bool ok) { return StatusFlag(ok); });
return spine_->PushServerToClientMessage(std::move(message));
}
auto PullMessage() { return spine_->PullClientToServerMessage(); }
template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory,
DebugLocation whence = {}) {
spine_->SpawnGuarded(name, std::move(promise_factory), whence);
}
void Cancel(ServerMetadataHandle status) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
std::ignore = spine_->Cancel(std::move(status));
template <typename PromiseFactory>
void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnInfallible(name, std::move(promise_factory));
}
template <typename PromiseFactory>
auto SpawnWaitable(absl::string_view name, PromiseFactory promise_factory) {
return spine_->party().SpawnWaitable(name, std::move(promise_factory));
}
Arena* arena() { return spine_->arena(); }
private:
RefCountedPtr<CallSpineInterface> spine_;
};
class UnstartedCallHandler {
public:
explicit UnstartedCallHandler(RefCountedPtr<CallSpineInterface> spine)
: spine_(std::move(spine)) {}
void PushServerTrailingMetadata(ServerMetadataHandle status) {
spine_->PushServerTrailingMetadata(std::move(status));
}
void OnDone(absl::AnyInvocable<void()> fn) { spine_->OnDone(std::move(fn)); }
@ -386,8 +467,9 @@ class CallHandler {
}
template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnGuarded(name, std::move(promise_factory));
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory,
DebugLocation whence = {}) {
spine_->SpawnGuarded(name, std::move(promise_factory), whence);
}
template <typename PromiseFactory>
@ -400,6 +482,11 @@ class CallHandler {
return spine_->party().SpawnWaitable(name, std::move(promise_factory));
}
CallHandler V2HackToStartCallWithoutACallFilterStack() {
GPR_ASSERT(DownCast<PipeBasedCallSpine*>(spine_.get()) != nullptr);
return CallHandler(std::move(spine_));
}
Arena* arena() { return spine_->arena(); }
private:
@ -408,11 +495,13 @@ class CallHandler {
struct CallInitiatorAndHandler {
CallInitiator initiator;
CallHandler handler;
UnstartedCallHandler handler;
};
CallInitiatorAndHandler MakeCall(
grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena);
ClientMetadataHandle client_initial_metadata,
grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena,
bool is_arena_owned);
template <typename CallHalf>
auto OutgoingMessages(CallHalf h) {
@ -425,8 +514,7 @@ auto OutgoingMessages(CallHalf h) {
// Forward a call from `call_handler` to `call_initiator` (with initial metadata
// `client_initial_metadata`)
void ForwardCall(CallHandler call_handler, CallInitiator call_initiator,
ClientMetadataHandle client_initial_metadata);
void ForwardCall(CallHandler call_handler, CallInitiator call_initiator);
} // namespace grpc_core

@ -559,7 +559,7 @@ class ServerTransport {
// Create a call at the server (or fail)
// arena must have been previously allocated by CreateArena()
virtual absl::StatusOr<CallInitiator> CreateCall(
ClientMetadata& client_initial_metadata, Arena* arena) = 0;
ClientMetadataHandle client_initial_metadata, Arena* arena) = 0;
protected:
~Acceptor() = default;

@ -102,13 +102,8 @@ const grpc_channel_filter test_filter = {
return Immediate(ServerMetadataFromStatus(
absl::PermissionDeniedError("Failure that's not preventable.")));
},
[](grpc_channel_element*, CallSpineInterface* args) {
args->client_initial_metadata().receiver.InterceptAndMap(
[args](ClientMetadataHandle) {
return args->Cancel(
ServerMetadataFromStatus(absl::PermissionDeniedError(
"Failure that's not preventable.")));
});
[](grpc_channel_element*, CallSpineInterface*) {
Crash("Should never be called");
},
grpc_channel_next_op,
sizeof(call_data),

@ -105,22 +105,21 @@ struct MockPromiseEndpoint {
auto SendClientToServerMessages(CallInitiator initiator, int num_messages) {
return Loop([initiator, num_messages]() mutable {
bool has_message = (num_messages > 0);
return If(
has_message,
Seq(initiator.PushMessage(GetContext<Arena>()->MakePooled<Message>()),
[&num_messages]() -> LoopCtl<absl::Status> {
--num_messages;
return Continue();
}),
[initiator]() mutable -> LoopCtl<absl::Status> {
initiator.FinishSends();
return absl::OkStatus();
});
return If(has_message,
Seq(initiator.PushMessage(Arena::MakePooled<Message>()),
[&num_messages]() -> LoopCtl<absl::Status> {
--num_messages;
return Continue();
}),
[initiator]() mutable -> LoopCtl<absl::Status> {
initiator.FinishSends();
return absl::OkStatus();
});
});
}
ClientMetadataHandle TestInitialMetadata() {
auto md = GetContext<Arena>()->MakePooled<ClientMetadata>();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromStaticString("/test"));
return md;
}
@ -178,14 +177,13 @@ TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call =
MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator()));
transport->StartCall(std::move(call.handler));
call.initiator.SpawnGuarded("test-send", [initiator =
call.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 1));
});
auto call = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack());
call.initiator.SpawnGuarded("test-send",
[initiator = call.initiator]() mutable {
return SendClientToServerMessages(initiator, 1);
});
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
call.initiator.SpawnInfallible(
@ -193,7 +191,7 @@ TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
EXPECT_TRUE(md.ok());
return Empty{};
},
initiator.PullServerTrailingMetadata(),
@ -224,14 +222,13 @@ TEST_F(ClientTransportTest, AddOneStreamWithReadFailed) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call =
MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator()));
transport->StartCall(std::move(call.handler));
call.initiator.SpawnGuarded("test-send", [initiator =
call.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 1));
});
auto call = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack());
call.initiator.SpawnGuarded("test-send",
[initiator = call.initiator]() mutable {
return SendClientToServerMessages(initiator, 1);
});
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
call.initiator.SpawnInfallible(
@ -239,7 +236,7 @@ TEST_F(ClientTransportTest, AddOneStreamWithReadFailed) {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
EXPECT_TRUE(md.ok());
return Empty{};
},
initiator.PullServerTrailingMetadata(),
@ -278,22 +275,22 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call1 =
MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator()));
transport->StartCall(std::move(call1.handler));
auto call2 =
MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator()));
transport->StartCall(std::move(call2.handler));
call1.initiator.SpawnGuarded("test-send-1", [initiator =
call1.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 1));
});
call2.initiator.SpawnGuarded("test-send-2", [initiator =
call2.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 1));
});
auto call1 = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
transport->StartCall(
call1.handler.V2HackToStartCallWithoutACallFilterStack());
auto call2 = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
transport->StartCall(
call2.handler.V2HackToStartCallWithoutACallFilterStack());
call1.initiator.SpawnGuarded(
"test-send-1", [initiator = call1.initiator]() mutable {
return SendClientToServerMessages(initiator, 1);
});
call2.initiator.SpawnGuarded(
"test-send-2", [initiator = call2.initiator]() mutable {
return SendClientToServerMessages(initiator, 1);
});
StrictMock<MockFunction<void()>> on_done1;
EXPECT_CALL(on_done1, Call());
StrictMock<MockFunction<void()>> on_done2;
@ -303,7 +300,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
EXPECT_TRUE(md.ok());
return Empty{};
},
initiator.PullServerTrailingMetadata(),
@ -319,7 +316,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
EXPECT_TRUE(md.ok());
return Empty{};
},
initiator.PullServerTrailingMetadata(),
@ -350,22 +347,22 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call1 =
MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator()));
transport->StartCall(std::move(call1.handler));
auto call2 =
MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator()));
transport->StartCall(std::move(call2.handler));
call1.initiator.SpawnGuarded("test-send", [initiator =
call1.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 1));
});
call2.initiator.SpawnGuarded("test-send", [initiator =
call2.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 1));
});
auto call1 = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
transport->StartCall(
call1.handler.V2HackToStartCallWithoutACallFilterStack());
auto call2 = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
transport->StartCall(
call2.handler.V2HackToStartCallWithoutACallFilterStack());
call1.initiator.SpawnGuarded(
"test-send", [initiator = call1.initiator]() mutable {
return SendClientToServerMessages(initiator, 1);
});
call2.initiator.SpawnGuarded(
"test-send", [initiator = call2.initiator]() mutable {
return SendClientToServerMessages(initiator, 1);
});
StrictMock<MockFunction<void()>> on_done1;
EXPECT_CALL(on_done1, Call());
StrictMock<MockFunction<void()>> on_done2;
@ -375,7 +372,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
EXPECT_TRUE(md.ok());
return Empty{};
},
initiator.PullServerTrailingMetadata(),
@ -391,7 +388,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
EXPECT_TRUE(md.ok());
return Empty{};
},
initiator.PullServerTrailingMetadata(),

@ -67,7 +67,7 @@ const uint8_t kGrpcStatus0[] = {0x10, 0x0b, 0x67, 0x72, 0x70, 0x63, 0x2d, 0x73,
0x74, 0x61, 0x74, 0x75, 0x73, 0x01, 0x30};
ClientMetadataHandle TestInitialMetadata() {
auto md = GetContext<Arena>()->MakePooled<ClientMetadata>();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromStaticString("/demo.Service/Step"));
return md;
}
@ -78,7 +78,7 @@ auto SendClientToServerMessages(CallInitiator initiator, int num_messages) {
bool has_message = (i < num_messages);
return If(
has_message,
Seq(initiator.PushMessage(GetContext<Arena>()->MakePooled<Message>(
Seq(initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString(std::to_string(i))), 0)),
[&i]() -> LoopCtl<absl::Status> {
++i;
@ -115,9 +115,9 @@ TEST_F(TransportTest, AddOneStream) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call =
MakeCall(event_engine().get(), Arena::Create(1024, memory_allocator()));
transport->StartCall(std::move(call.handler));
auto call = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(1024, memory_allocator()), true);
transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack());
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
control_endpoint.ExpectWrite(
@ -133,11 +133,10 @@ TEST_F(TransportTest, AddOneStream) {
{EventEngineSlice::FromCopiedString("0"), Zeros(63)}, nullptr);
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr);
call.initiator.SpawnGuarded("test-send", [initiator =
call.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 1));
});
call.initiator.SpawnGuarded("test-send",
[initiator = call.initiator]() mutable {
return SendClientToServerMessages(initiator, 1);
});
call.initiator.SpawnInfallible(
"test-read", [&on_done, initiator = call.initiator]() mutable {
return Seq(
@ -152,18 +151,23 @@ TEST_F(TransportTest, AddOneStream) {
"/demo.Service/Step");
return Empty{};
},
initiator.PullMessage(),
[](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "12345678");
[initiator]() mutable { return initiator.PullMessage(); },
[](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"12345678");
return Empty{};
},
initiator.PullMessage(),
[](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
[initiator]() mutable { return initiator.PullMessage(); },
[](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return Empty{};
},
initiator.PullServerTrailingMetadata(),
[initiator]() mutable {
return initiator.PullServerTrailingMetadata();
},
[&on_done](ServerMetadataHandle md) {
EXPECT_EQ(md->get(GrpcStatusMetadata()).value(), GRPC_STATUS_OK);
on_done.Call();
@ -198,9 +202,9 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call =
MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator()));
transport->StartCall(std::move(call.handler));
auto call = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack());
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
control_endpoint.ExpectWrite(
@ -221,11 +225,10 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) {
{EventEngineSlice::FromCopiedString("1"), Zeros(63)}, nullptr);
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr);
call.initiator.SpawnGuarded("test-send", [initiator =
call.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 2));
});
call.initiator.SpawnGuarded("test-send",
[initiator = call.initiator]() mutable {
return SendClientToServerMessages(initiator, 2);
});
call.initiator.SpawnInfallible(
"test-read", [&on_done, initiator = call.initiator]() mutable {
return Seq(
@ -241,20 +244,25 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) {
return Empty{};
},
initiator.PullMessage(),
[](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "12345678");
[](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"12345678");
return Empty{};
},
initiator.PullMessage(),
[](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "87654321");
[](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"87654321");
return Empty{};
},
initiator.PullMessage(),
[](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
[](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return Empty{};
},
initiator.PullServerTrailingMetadata(),

@ -71,13 +71,13 @@ const uint8_t kGrpcStatus0[] = {0x40, 0x0b, 0x67, 0x72, 0x70, 0x63, 0x2d, 0x73,
0x74, 0x61, 0x74, 0x75, 0x73, 0x01, 0x30};
ServerMetadataHandle TestInitialMetadata() {
auto md = GetContext<Arena>()->MakePooled<ServerMetadata>();
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(HttpPathMetadata(), Slice::FromStaticString("/demo.Service/Step"));
return md;
}
ServerMetadataHandle TestTrailingMetadata() {
auto md = GetContext<Arena>()->MakePooled<ServerMetadata>();
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_OK);
return md;
}
@ -87,7 +87,7 @@ class MockAcceptor : public ServerTransport::Acceptor {
virtual ~MockAcceptor() = default;
MOCK_METHOD(Arena*, CreateArena, (), (override));
MOCK_METHOD(absl::StatusOr<CallInitiator>, CreateCall,
(ClientMetadata & client_initial_metadata, Arena* arena),
(ClientMetadataHandle client_initial_metadata, Arena* arena),
(override));
};
@ -113,18 +113,59 @@ TEST_F(TransportTest, ReadAndWriteOneMessage) {
{EventEngineSlice::FromCopiedString("12345678"), Zeros(56)}, nullptr);
// Once that's read we'll create a new call
auto* call_arena = Arena::Create(1024, memory_allocator());
CallInitiatorAndHandler call = MakeCall(event_engine().get(), call_arena);
EXPECT_CALL(acceptor, CreateArena).WillOnce(Return(call_arena));
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(acceptor, CreateCall(_, call_arena))
.WillOnce(WithArgs<0>([call_initiator = std::move(call.initiator)](
ClientMetadata& client_initial_metadata) {
EXPECT_EQ(client_initial_metadata.get_pointer(HttpPathMetadata())
.WillOnce(WithArgs<0>([this, call_arena, &on_done](
ClientMetadataHandle client_initial_metadata) {
EXPECT_EQ(client_initial_metadata->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return call_initiator;
CallInitiatorAndHandler call =
MakeCall(std::move(client_initial_metadata), event_engine().get(),
call_arena, true);
auto handler = call.handler.V2HackToStartCallWithoutACallFilterStack();
handler.SpawnInfallible("test-io", [&on_done, handler]() mutable {
return Seq(
handler.PullClientInitialMetadata(),
[](ValueOrFailure<ClientMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(md.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return Empty{};
},
[handler]() mutable { return handler.PullMessage(); },
[](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"12345678");
return Empty{};
},
[handler]() mutable { return handler.PullMessage(); },
[](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return Empty{};
},
[handler]() mutable {
return handler.PushServerInitialMetadata(TestInitialMetadata());
},
[handler]() mutable {
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("87654321")), 0));
},
[handler, &on_done]() mutable {
handler.PushServerTrailingMetadata(TestTrailingMetadata());
on_done.Call();
return Empty{};
});
});
return std::move(call.initiator);
}));
transport->SetAcceptor(&acceptor);
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
EXPECT_CALL(*control_endpoint.endpoint, Read)
.InSequence(control_endpoint.read_sequence)
@ -145,39 +186,6 @@ TEST_F(TransportTest, ReadAndWriteOneMessage) {
sizeof(kGrpcStatus0)),
EventEngineSlice::FromCopiedBuffer(kGrpcStatus0, sizeof(kGrpcStatus0))},
nullptr);
call.handler.SpawnInfallible(
"test-io", [&on_done, handler = call.handler]() mutable {
return Seq(
handler.PullClientInitialMetadata(),
[](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(
md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"/demo.Service/Step");
return Empty{};
},
handler.PullMessage(),
[](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "12345678");
return Empty{};
},
handler.PullMessage(),
[](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
return Empty{};
},
handler.PushServerInitialMetadata(TestInitialMetadata()),
handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("87654321")), 0)),
[handler]() mutable {
return handler.PushServerTrailingMetadata(TestTrailingMetadata());
},
[&on_done]() mutable {
on_done.Call();
return Empty{};
});
});
// Wait until ClientTransport's internal activities to finish.
event_engine()->TickUntilIdle();
event_engine()->UnsetGlobalHooks();

@ -63,21 +63,17 @@ void FillMetadata(const std::vector<std::pair<std::string, std::string>>& md,
TRANSPORT_TEST(UnaryWithSomeContent) {
SetServerAcceptor();
auto initiator = CreateCall();
const auto client_initial_metadata = RandomMetadata();
const auto server_initial_metadata = RandomMetadata();
const auto server_trailing_metadata = RandomMetadata();
const auto client_payload = RandomMessage();
const auto server_payload = RandomMessage();
auto md = Arena::MakePooled<ClientMetadata>();
FillMetadata(client_initial_metadata, *md);
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
FillMetadata(client_initial_metadata, *md);
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
[&]() mutable {
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString(client_payload)), 0));
},
@ -93,14 +89,16 @@ TRANSPORT_TEST(UnaryWithSomeContent) {
UnorderedElementsAreArray(server_initial_metadata));
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), server_payload);
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
server_payload);
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
@ -118,14 +116,16 @@ TRANSPORT_TEST(UnaryWithSomeContent) {
UnorderedElementsAreArray(client_initial_metadata));
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), client_payload);
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
client_payload);
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
auto md = Arena::MakePooled<ServerMetadata>();
FillMetadata(server_initial_metadata, *md);
return handler.PushServerInitialMetadata(std::move(md));
@ -139,10 +139,7 @@ TRANSPORT_TEST(UnaryWithSomeContent) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
FillMetadata(server_trailing_metadata, *md);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();

@ -18,16 +18,12 @@ namespace grpc_core {
TRANSPORT_TEST(MetadataOnlyRequest) {
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
[&]() mutable {
initiator.FinishSends();
return initiator.PullServerInitialMetadata();
},
@ -53,8 +49,9 @@ TRANSPORT_TEST(MetadataOnlyRequest) {
"/foo/bar");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md));
@ -63,10 +60,7 @@ TRANSPORT_TEST(MetadataOnlyRequest) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();
@ -79,16 +73,12 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsAfterInitialMetadata) {
"rolling out soon, so leaving this disabled.";
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
[&]() mutable {
// We don't close the sending stream here.
return initiator.PullServerInitialMetadata();
},
@ -123,10 +113,7 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsAfterInitialMetadata) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();
@ -139,16 +126,12 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsImmediately) {
"rolling out soon, so leaving this disabled.";
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
[&]() mutable {
// We don't close the sending stream here.
return initiator.PullServerInitialMetadata();
},
@ -175,10 +158,7 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsImmediately) {
// and don't send initial metadata - just trailing metadata.
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();
@ -186,18 +166,9 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsImmediately) {
TRANSPORT_TEST(CanCreateCallThenAbandonIt) {
SetServerAcceptor();
auto initiator = CreateCall();
SpawnTestSeq(
initiator, "start-call",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return Empty{};
});
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
auto handler = TickUntilServerCall();
SpawnTestSeq(initiator, "end-call", [&]() {
initiator.Cancel();
@ -208,16 +179,12 @@ TRANSPORT_TEST(CanCreateCallThenAbandonIt) {
TRANSPORT_TEST(UnaryRequest) {
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
[&]() mutable {
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("hello world")), 0));
},
@ -233,15 +200,16 @@ TRANSPORT_TEST(UnaryRequest) {
ContentTypeMetadata::kApplicationGrpc);
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
@ -259,14 +227,16 @@ TRANSPORT_TEST(UnaryRequest) {
"/foo/bar");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md));
@ -280,10 +250,7 @@ TRANSPORT_TEST(UnaryRequest) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();
@ -291,16 +258,12 @@ TRANSPORT_TEST(UnaryRequest) {
TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) {
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
[&]() mutable {
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("hello world")), 0));
},
@ -316,9 +279,10 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) {
ContentTypeMetadata::kApplicationGrpc);
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor");
return initiator.PullServerTrailingMetadata();
},
@ -337,9 +301,11 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) {
"/foo/bar");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world");
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md));
@ -353,10 +319,7 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();
@ -364,18 +327,12 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) {
TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) {
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PullServerInitialMetadata();
},
[&]() mutable { return initiator.PullServerInitialMetadata(); },
[&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
@ -389,15 +346,16 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) {
initiator.FinishSends();
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
@ -422,14 +380,16 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) {
EXPECT_TRUE(result.ok());
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0));
},
@ -437,10 +397,7 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();
@ -448,18 +405,12 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) {
TRANSPORT_TEST(ClientStreamingRequest) {
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PullServerInitialMetadata();
},
[&]() mutable { return initiator.PullServerInitialMetadata(); },
[&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
@ -493,9 +444,9 @@ TRANSPORT_TEST(ClientStreamingRequest) {
initiator.FinishSends();
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
@ -520,40 +471,47 @@ TRANSPORT_TEST(ClientStreamingRequest) {
EXPECT_TRUE(result.ok());
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (2)");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world (2)");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (3)");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world (3)");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (4)");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world (4)");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (5)");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world (5)");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();
@ -561,18 +519,12 @@ TRANSPORT_TEST(ClientStreamingRequest) {
TRANSPORT_TEST(ServerStreamingRequest) {
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PullServerInitialMetadata();
},
[&]() mutable { return initiator.PullServerInitialMetadata(); },
[&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
@ -581,45 +533,51 @@ TRANSPORT_TEST(ServerStreamingRequest) {
initiator.FinishSends();
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor (2)");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor (3)");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor (4)");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor (5)");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor (6)");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
@ -644,9 +602,9 @@ TRANSPORT_TEST(ServerStreamingRequest) {
EXPECT_TRUE(result.ok());
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0));
},
@ -679,10 +637,7 @@ TRANSPORT_TEST(ServerStreamingRequest) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();

@ -30,19 +30,14 @@ TRANSPORT_TEST(ManyUnaryRequests) {
std::map<int, std::string> client_messages;
std::map<int, std::string> server_messages;
for (int i = 0; i < kNumRequests; i++) {
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromCopiedString(std::to_string(i)));
auto initiator = CreateCall(std::move(md));
client_messages[i] = RandomMessage();
server_messages[i] = RandomMessage();
SpawnTestSeq(
initiator, make_call_name(i, "initiator"),
[initiator, i]() mutable {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(),
Slice::FromCopiedString(std::to_string(i)));
return initiator.PushClientInitialMetadata(std::move(md));
},
[initiator, i, &client_messages](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
[initiator, i, &client_messages]() mutable {
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString(client_messages[i])), 0));
},
@ -59,16 +54,17 @@ TRANSPORT_TEST(ManyUnaryRequests) {
ContentTypeMetadata::kApplicationGrpc);
return initiator.PullMessage();
},
[initiator, i,
&server_messages](NextResult<MessageHandle> msg) mutable {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[initiator, i, &server_messages](
ValueOrFailure<absl::optional<MessageHandle>> msg) mutable {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
server_messages[i]);
return initiator.PullMessage();
},
[initiator](NextResult<MessageHandle> msg) mutable {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[initiator](ValueOrFailure<absl::optional<MessageHandle>> msg) mutable {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return initiator.PullServerTrailingMetadata();
},
[initiator](ValueOrFailure<ServerMetadataHandle> md) mutable {
@ -92,16 +88,17 @@ TRANSPORT_TEST(ManyUnaryRequests) {
&*this_call_index));
return handler.PullMessage();
},
[handler, this_call_index,
&client_messages](NextResult<MessageHandle> msg) mutable {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[handler, this_call_index, &client_messages](
ValueOrFailure<absl::optional<MessageHandle>> msg) mutable {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
client_messages[*this_call_index]);
return handler.PullMessage();
},
[handler](NextResult<MessageHandle> msg) mutable {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[handler](ValueOrFailure<absl::optional<MessageHandle>> msg) mutable {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md));
@ -118,10 +115,7 @@ TRANSPORT_TEST(ManyUnaryRequests) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[handler](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
}

@ -56,12 +56,16 @@ void TransportTest::SetServerAcceptor() {
transport_pair_.server->server_transport()->SetAcceptor(&acceptor_);
}
CallInitiator TransportTest::CreateCall() {
auto call = MakeCall(event_engine_.get(), Arena::Create(1024, &allocator_));
call.handler.SpawnInfallible("start-call", [this, handler = call.handler]() {
transport_pair_.client->client_transport()->StartCall(handler);
return Empty{};
});
CallInitiator TransportTest::CreateCall(
ClientMetadataHandle client_initial_metadata) {
auto call = MakeCall(std::move(client_initial_metadata), event_engine_.get(),
Arena::Create(1024, &allocator_), true);
call.handler.SpawnInfallible(
"start-call", [this, handler = call.handler]() mutable {
transport_pair_.client->client_transport()->StartCall(
handler.V2HackToStartCallWithoutACallFilterStack());
return Empty{};
});
return std::move(call.initiator);
}
@ -231,9 +235,10 @@ Arena* TransportTest::Acceptor::CreateArena() {
}
absl::StatusOr<CallInitiator> TransportTest::Acceptor::CreateCall(
ClientMetadata&, Arena* arena) {
auto call = MakeCall(event_engine_, arena);
handlers_.push(std::move(call.handler));
ClientMetadataHandle client_initial_metadata, Arena* arena) {
auto call =
MakeCall(std::move(client_initial_metadata), event_engine_, arena, true);
handlers_.push(call.handler.V2HackToStartCallWithoutACallFilterStack());
return std::move(call.initiator);
}

@ -221,7 +221,7 @@ class TransportTest : public ::testing::Test {
rng_(rng) {}
void SetServerAcceptor();
CallInitiator CreateCall();
CallInitiator CreateCall(ClientMetadataHandle client_initial_metadata);
std::string RandomString(int min_length, int max_length,
absl::string_view character_set);
@ -272,7 +272,7 @@ class TransportTest : public ::testing::Test {
Arena* CreateArena() override;
absl::StatusOr<CallInitiator> CreateCall(
ClientMetadata& client_initial_metadata, Arena* arena) override;
ClientMetadataHandle client_initial_metadata, Arena* arena) override;
absl::optional<CallHandler> PopHandler();
private:

Loading…
Cancel
Save