pull/36355/head
Craig Tiller 10 months ago
commit 0eb054b748
  1. 1
      bazel/experiments.bzl
  2. 1
      examples/cpp/csm/BUILD
  3. 3
      examples/cpp/csm/csm_greeter_server.cc
  4. 1
      examples/cpp/helloworld/BUILD
  5. 1
      examples/cpp/helloworld/CMakeLists.txt
  6. 8
      examples/cpp/helloworld/xds_greeter_server.cc
  7. 1
      examples/cpp/xds/BUILD
  8. 8
      examples/cpp/xds/xds_greeter_server.cc
  9. 3
      src/core/BUILD
  10. 5
      src/core/ext/filters/deadline/deadline_filter.cc
  11. 2
      src/core/ext/filters/message_size/message_size_filter.cc
  12. 20
      src/core/ext/transport/chaotic_good/client_transport.cc
  13. 54
      src/core/ext/transport/chaotic_good/server_transport.cc
  14. 8
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  15. 3
      src/core/ext/transport/chttp2/transport/frame_data.cc
  16. 10
      src/core/ext/transport/inproc/inproc_transport.cc
  17. 16
      src/core/lib/channel/channel_stack_builder_impl.cc
  18. 59
      src/core/lib/channel/connected_channel.cc
  19. 4
      src/core/lib/channel/promise_based_filter.cc
  20. 145
      src/core/lib/channel/promise_based_filter.h
  21. 1
      src/core/lib/channel/server_call_tracer_filter.cc
  22. 42
      src/core/lib/experiments/experiments.cc
  23. 50
      src/core/lib/experiments/experiments.h
  24. 12
      src/core/lib/experiments/experiments.yaml
  25. 4
      src/core/lib/experiments/rollouts.yaml
  26. 84
      src/core/lib/promise/for_each.h
  27. 2
      src/core/lib/promise/pipe.h
  28. 72
      src/core/lib/surface/call.cc
  29. 7
      src/core/lib/surface/call.h
  30. 35
      src/core/lib/surface/server.cc
  31. 2
      src/core/lib/surface/server.h
  32. 7
      src/core/lib/transport/batch_builder.h
  33. 28
      src/core/lib/transport/call_spine.cc
  34. 394
      src/core/lib/transport/call_spine.h
  35. 2
      src/core/lib/transport/transport.h
  36. 9
      test/core/end2end/tests/filter_causes_close.cc
  37. 3
      test/core/end2end/tests/http2_stats.cc
  38. 129
      test/core/transport/chaotic_good/client_transport_error_test.cc
  39. 76
      test/core/transport/chaotic_good/client_transport_test.cc
  40. 92
      test/core/transport/chaotic_good/server_transport_test.cc
  41. 45
      test/core/transport/test_suite/call_content.cc
  42. 337
      test/core/transport/test_suite/call_shapes.cc
  43. 48
      test/core/transport/test_suite/stress.cc
  44. 23
      test/core/transport/test_suite/test.cc
  45. 4
      test/core/transport/test_suite/test.h
  46. 21
      test/distrib/python/test_packages.sh

@ -25,6 +25,7 @@ EXPERIMENT_ENABLES = {
"event_engine_dns": "event_engine_dns",
"event_engine_listener": "event_engine_listener",
"free_large_allocator": "free_large_allocator",
"http2_stats_fix": "http2_stats_fix",
"keepalive_fix": "keepalive_fix",
"keepalive_server_fix": "keepalive_server_fix",
"monitoring_experiment": "monitoring_experiment",

@ -41,6 +41,7 @@ cc_binary(
"//examples/protos:helloworld_cc_grpc",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/flags:parse",
"@com_google_absl//absl/log",
"@io_opentelemetry_cpp//exporters/prometheus:prometheus_exporter",
"@io_opentelemetry_cpp//sdk/src/metrics",
],

@ -22,6 +22,7 @@
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/log/log.h"
#include "absl/strings/str_cat.h"
#include "opentelemetry/exporters/prometheus/exporter_factory.h"
#include "opentelemetry/exporters/prometheus/exporter_options.h"
@ -91,7 +92,7 @@ void RunServer(const char* hostname) {
xds_builder.AddListeningPort(absl::StrCat("0.0.0.0:", port),
grpc::InsecureServerCredentials());
xds_enabled_server = xds_builder.BuildAndStart();
gpr_log(GPR_INFO, "Server starting on 0.0.0.0:%d", port);
LOG(INFO) << "Server starting on 0.0.0.0:" << port;
// Wait for the server to shutdown. Note that some other thread must be
// responsible for shutting down the server for this call to ever return.

@ -129,5 +129,6 @@ cc_binary(
"//examples/protos:helloworld_cc_grpc",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/flags:parse",
"@com_google_absl//absl/log",
],
)

@ -68,6 +68,7 @@ foreach(_target
absl::check
absl::flags
absl::flags_parse
absl::log
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})

@ -22,6 +22,7 @@
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/log/log.h"
#include "absl/strings/str_cat.h"
#include <grpcpp/ext/admin_services.h>
@ -79,21 +80,20 @@ void RunServer() {
absl::StrCat("0.0.0.0:", port),
grpc::XdsServerCredentials(grpc::InsecureServerCredentials()));
xds_enabled_server = xds_builder.BuildAndStart();
gpr_log(GPR_INFO, "Server starting on 0.0.0.0:%d", port);
LOG(INFO) << "Server starting on 0.0.0.0:" << port;
grpc::AddAdminServices(&builder);
// For the maintenance server, do not use any authentication mechanism.
builder.AddListeningPort(absl::StrCat("0.0.0.0:", maintenance_port),
grpc::InsecureServerCredentials());
server = builder.BuildAndStart();
gpr_log(GPR_INFO, "Maintenance server listening on 0.0.0.0:%d",
maintenance_port);
LOG(INFO) << "Maintenance server listening on 0.0.0.0:" << maintenance_port;
} else {
grpc::AddAdminServices(&xds_builder);
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(absl::StrCat("0.0.0.0:", port),
grpc::InsecureServerCredentials());
server = xds_builder.BuildAndStart();
gpr_log(GPR_INFO, "Server listening on 0.0.0.0:%d", port);
LOG(INFO) << "Server listening on 0.0.0.0:" << port;
}
// Wait for the server to shutdown. Note that some other thread must be

@ -37,5 +37,6 @@ cc_binary(
"//examples/protos:helloworld_cc_grpc",
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/flags:parse",
"@com_google_absl//absl/log",
],
)

@ -22,6 +22,7 @@
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/log/log.h"
#include "absl/strings/str_cat.h"
#include <grpcpp/ext/admin_services.h>
@ -84,21 +85,20 @@ void RunServer() {
absl::StrCat("0.0.0.0:", port),
grpc::XdsServerCredentials(grpc::InsecureServerCredentials()));
xds_enabled_server = xds_builder.BuildAndStart();
gpr_log(GPR_INFO, "Server starting on 0.0.0.0:%d", port);
LOG(INFO) << "Server starting on 0.0.0.0:" << port;
grpc::AddAdminServices(&builder);
// For the maintenance server, do not use any authentication mechanism.
builder.AddListeningPort(absl::StrCat("0.0.0.0:", maintenance_port),
grpc::InsecureServerCredentials());
server = builder.BuildAndStart();
gpr_log(GPR_INFO, "Maintenance server listening on 0.0.0.0:%d",
maintenance_port);
LOG(INFO) << "Maintenance server listening on 0.0.0.0:" << maintenance_port;
} else {
grpc::AddAdminServices(&xds_builder);
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(absl::StrCat("0.0.0.0:", port),
grpc::InsecureServerCredentials());
server = xds_builder.BuildAndStart();
gpr_log(GPR_INFO, "Server listening on 0.0.0.0:%d", port);
LOG(INFO) << "Server listening on 0.0.0.0:" << port;
}
// Wait for the server to shutdown. Note that some other thread must be

@ -6451,7 +6451,6 @@ grpc_cc_library(
"//:endpoint_addresses",
"//:gpr",
"//:grpc_base",
"//:grpc_client_channel",
"//:grpc_public_hdrs",
"//:grpc_resolver",
"//:grpc_service_config_impl",
@ -7341,7 +7340,6 @@ grpc_cc_library(
],
deps = [
"1999",
"call_final_info",
"for_each",
"if",
"latch",
@ -7353,6 +7351,7 @@ grpc_cc_library(
"status_flag",
"try_seq",
"//:gpr",
"//:promise",
],
)

@ -370,8 +370,9 @@ const grpc_channel_filter grpc_server_deadline_filter = {
return next_promise_factory(std::move(call_args));
},
[](grpc_channel_element*, grpc_core::CallSpineInterface* spine) {
spine->client_initial_metadata().receiver.InterceptAndMap(
[](grpc_core::ClientMetadataHandle md) {
grpc_core::DownCast<grpc_core::PipeBasedCallSpine*>(spine)
->client_initial_metadata()
.receiver.InterceptAndMap([](grpc_core::ClientMetadataHandle md) {
auto deadline = md->get(grpc_core::GrpcTimeoutMetadata());
if (deadline.has_value()) {
grpc_core::GetContext<grpc_core::CallContext>()->UpdateDeadline(

@ -168,7 +168,7 @@ ServerMetadataHandle CheckPayload(const Message& msg,
is_send ? "send" : "recv", msg.payload()->Length(), *max_length);
}
if (msg.payload()->Length() <= *max_length) return nullptr;
auto r = GetContext<Arena>()->MakePooled<ServerMetadata>();
auto r = Arena::MakePooled<ServerMetadata>();
r->Set(GrpcStatusMetadata(), GRPC_STATUS_RESOURCE_EXHAUSTED);
r->Set(GrpcMessageMetadata(),
Slice::FromCopiedString(absl::StrFormat(

@ -102,14 +102,12 @@ auto ChaoticGoodClientTransport::PushFrameIntoCall(ServerFragmentFrame frame,
},
[]() -> StatusFlag { return Success{}; });
},
[call_handler, trailers = std::move(frame.trailers)]() mutable {
return If(
trailers != nullptr,
[&call_handler, &trailers]() mutable {
return call_handler.PushServerTrailingMetadata(
std::move(trailers));
},
[]() -> StatusFlag { return Success{}; });
[call_handler,
trailers = std::move(frame.trailers)]() mutable -> StatusFlag {
if (trailers != nullptr) {
call_handler.PushServerTrailingMetadata(std::move(trailers));
}
return Success{};
});
// Wrap the actual sequence with something that owns the call handler so that
// its lifetime extends until the push completes.
@ -223,7 +221,7 @@ void ChaoticGoodClientTransport::AbortWithError() {
for (const auto& pair : stream_map) {
auto call_handler = pair.second;
call_handler.SpawnInfallible("cancel", [call_handler]() mutable {
call_handler.Cancel(ServerMetadataFromStatus(
call_handler.PushServerTrailingMetadata(ServerMetadataFromStatus(
absl::UnavailableError("Transport closed.")));
return Empty{};
});
@ -300,6 +298,10 @@ void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {
const uint32_t stream_id = MakeStream(call_handler);
return Map(CallOutboundLoop(stream_id, call_handler),
[stream_id, this](absl::Status result) {
if (grpc_chaotic_good_trace.enabled()) {
gpr_log(GPR_INFO, "CHAOTIC_GOOD: Call %d finished with %s",
stream_id, result.ToString().c_str());
}
if (!result.ok()) {
CancelFrame frame;
frame.stream_id = stream_id;

@ -72,33 +72,29 @@ auto ChaoticGoodServerTransport::TransportWriteLoop(
auto ChaoticGoodServerTransport::PushFragmentIntoCall(
CallInitiator call_initiator, ClientFragmentFrame frame,
uint32_t stream_id) {
auto& headers = frame.headers;
return TrySeq(
If(
headers != nullptr,
[call_initiator, &headers]() mutable {
return call_initiator.PushClientInitialMetadata(std::move(headers));
},
[]() -> StatusFlag { return Success{}; }),
[call_initiator, message = std::move(frame.message)]() mutable {
return If(
message.has_value(),
[&call_initiator, &message]() mutable {
return call_initiator.PushMessage(std::move(message->message));
},
[]() -> StatusFlag { return Success{}; });
},
[this, call_initiator, end_of_stream = frame.end_of_stream,
stream_id]() mutable -> StatusFlag {
if (end_of_stream) {
call_initiator.FinishSends();
// We have received end_of_stream. It is now safe to remove the call
// from the stream map.
MutexLock lock(&mu_);
stream_map_.erase(stream_id);
}
return Success{};
});
GPR_DEBUG_ASSERT(frame.headers == nullptr);
if (grpc_chaotic_good_trace.enabled()) {
gpr_log(GPR_INFO, "CHAOTIC_GOOD: PushFragmentIntoCall: frame=%s",
frame.ToString().c_str());
}
return TrySeq(If(
frame.message.has_value(),
[&call_initiator, &frame]() mutable {
return call_initiator.PushMessage(
std::move(frame.message->message));
},
[]() -> StatusFlag { return Success{}; }),
[this, call_initiator, end_of_stream = frame.end_of_stream,
stream_id]() mutable -> StatusFlag {
if (end_of_stream) {
call_initiator.FinishSends();
// We have received end_of_stream. It is now safe to remove
// the call from the stream map.
MutexLock lock(&mu_);
stream_map_.erase(stream_id);
}
return Success{};
});
}
auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall(
@ -244,8 +240,8 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall(
FrameLimits{1024 * 1024 * 1024, aligned_bytes_ - 1});
absl::optional<CallInitiator> call_initiator;
if (status.ok()) {
auto create_call_result =
acceptor_->CreateCall(*fragment_frame.headers, arena.release());
auto create_call_result = acceptor_->CreateCall(
std::move(fragment_frame.headers), arena.release());
if (grpc_chaotic_good_trace.enabled()) {
gpr_log(GPR_INFO,
"CHAOTIC_GOOD: DeserializeAndPushFragmentToNewCall: "

@ -1471,9 +1471,11 @@ static void perform_stream_op_locked(void* stream_op,
frame_hdr[3] = static_cast<uint8_t>(len >> 8);
frame_hdr[4] = static_cast<uint8_t>(len);
s->stats.outgoing.framing_bytes += GRPC_HEADER_SIZE_IN_BYTES;
s->stats.outgoing.data_bytes +=
op_payload->send_message.send_message->Length();
if (grpc_core::IsHttp2StatsFixEnabled()) {
s->stats.outgoing.framing_bytes += GRPC_HEADER_SIZE_IN_BYTES;
s->stats.outgoing.data_bytes +=
op_payload->send_message.send_message->Length();
}
s->next_message_end_offset =
s->flow_controlled_bytes_written +
static_cast<int64_t>(s->flow_controlled_buffer.length) +

@ -78,6 +78,9 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf);
stats->framing_bytes += header_size;
if (!grpc_core::IsHttp2StatsFixEnabled()) {
stats->data_bytes += write_bytes;
}
}
grpc_core::Poll<grpc_error_handle> grpc_deframe_unprocessed_incoming_frames(

@ -83,7 +83,7 @@ class InprocServerTransport final : public RefCounted<InprocServerTransport>,
"inproc transport disconnected");
}
absl::StatusOr<CallInitiator> AcceptCall(ClientMetadata& md) {
absl::StatusOr<CallInitiator> AcceptCall(ClientMetadataHandle md) {
switch (state_.load(std::memory_order_acquire)) {
case ConnectionState::kInitial:
return absl::InternalError(
@ -93,7 +93,7 @@ class InprocServerTransport final : public RefCounted<InprocServerTransport>,
case ConnectionState::kReady:
break;
}
return acceptor_->CreateCall(md, acceptor_->CreateArena());
return acceptor_->CreateCall(std::move(md), acceptor_->CreateArena());
}
private:
@ -116,10 +116,10 @@ class InprocClientTransport final : public Transport, public ClientTransport {
TrySeq(call_handler.PullClientInitialMetadata(),
[server_transport = server_transport_,
call_handler](ClientMetadataHandle md) {
auto call_initiator = server_transport->AcceptCall(*md);
auto call_initiator =
server_transport->AcceptCall(std::move(md));
if (!call_initiator.ok()) return call_initiator.status();
ForwardCall(call_handler, std::move(*call_initiator),
std::move(md));
ForwardCall(call_handler, std::move(*call_initiator));
return absl::OkStatus();
}));
}

@ -95,43 +95,37 @@ const grpc_channel_filter* PromiseTracingFilterFor(
},
/* init_call: */
[](grpc_channel_element* elem, CallSpineInterface* call) {
auto* c = DownCast<PipeBasedCallSpine*>(call);
auto* source_filter =
static_cast<const DerivedFilter*>(elem->filter)->filter;
call->client_initial_metadata().receiver.InterceptAndMap(
c->client_initial_metadata().receiver.InterceptAndMap(
[source_filter](ClientMetadataHandle md) {
gpr_log(GPR_DEBUG, "%s[%s] OnClientInitialMetadata: %s",
GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, md->DebugString().c_str());
return md;
});
call->client_to_server_messages().receiver.InterceptAndMap(
c->client_to_server_messages().receiver.InterceptAndMap(
[source_filter](MessageHandle msg) {
gpr_log(GPR_DEBUG, "%s[%s] OnClientToServerMessage: %s",
GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, msg->DebugString().c_str());
return msg;
});
call->server_initial_metadata().sender.InterceptAndMap(
c->server_initial_metadata().sender.InterceptAndMap(
[source_filter](ServerMetadataHandle md) {
gpr_log(GPR_DEBUG, "%s[%s] OnServerInitialMetadata: %s",
GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, md->DebugString().c_str());
return md;
});
call->server_to_client_messages().sender.InterceptAndMap(
c->server_to_client_messages().sender.InterceptAndMap(
[source_filter](MessageHandle msg) {
gpr_log(GPR_DEBUG, "%s[%s] OnServerToClientMessage: %s",
GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, msg->DebugString().c_str());
return msg;
});
call->server_trailing_metadata().sender.InterceptAndMap(
[source_filter](ServerMetadataHandle md) {
gpr_log(GPR_DEBUG, "%s[%s] OnServerTrailingMetadata: %s",
GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, md->DebugString().c_str());
return md;
});
},
grpc_channel_next_op,
/* sizeof_call_data: */ 0,

@ -463,8 +463,7 @@ ArenaPromise<ServerMetadataHandle> MakeClientCallPromise(Transport* transport,
[](absl::Status) {});
// Start a promise to receive server initial metadata and then forward it up
// through the receiving pipe.
auto server_initial_metadata =
GetContext<Arena>()->MakePooled<ServerMetadata>();
auto server_initial_metadata = Arena::MakePooled<ServerMetadata>();
party->Spawn(
"recv_initial_metadata",
TrySeq(GetContext<BatchBuilder>()->ReceiveServerInitialMetadata(
@ -501,27 +500,25 @@ ArenaPromise<ServerMetadataHandle> MakeClientCallPromise(Transport* transport,
// Create a promise that will receive server trailing metadata.
// If this fails, we massage the error into metadata that we can report
// upwards.
auto server_trailing_metadata =
GetContext<Arena>()->MakePooled<ServerMetadata>();
auto recv_trailing_metadata =
Map(GetContext<BatchBuilder>()->ReceiveServerTrailingMetadata(
stream->batch_target()),
[](absl::StatusOr<ServerMetadataHandle> status) mutable {
if (!status.ok()) {
auto server_trailing_metadata =
GetContext<Arena>()->MakePooled<ServerMetadata>();
grpc_status_code status_code = GRPC_STATUS_UNKNOWN;
std::string message;
grpc_error_get_status(status.status(), Timestamp::InfFuture(),
&status_code, &message, nullptr, nullptr);
server_trailing_metadata->Set(GrpcStatusMetadata(), status_code);
server_trailing_metadata->Set(GrpcMessageMetadata(),
Slice::FromCopiedString(message));
return server_trailing_metadata;
} else {
return std::move(*status);
}
});
auto server_trailing_metadata = Arena::MakePooled<ServerMetadata>();
auto recv_trailing_metadata = Map(
GetContext<BatchBuilder>()->ReceiveServerTrailingMetadata(
stream->batch_target()),
[](absl::StatusOr<ServerMetadataHandle> status) mutable {
if (!status.ok()) {
auto server_trailing_metadata = Arena::MakePooled<ServerMetadata>();
grpc_status_code status_code = GRPC_STATUS_UNKNOWN;
std::string message;
grpc_error_get_status(status.status(), Timestamp::InfFuture(),
&status_code, &message, nullptr, nullptr);
server_trailing_metadata->Set(GrpcStatusMetadata(), status_code);
server_trailing_metadata->Set(GrpcMessageMetadata(),
Slice::FromCopiedString(message));
return server_trailing_metadata;
} else {
return std::move(*status);
}
});
// Finally the main call promise.
// Concurrently: send initial metadata and receive messages, until BOTH
// complete (or one fails).
@ -784,8 +781,7 @@ ArenaPromise<ServerMetadataHandle> MakeServerCallPromise(
if (status.ok()) {
trailing_metadata = std::move(*status);
} else {
trailing_metadata =
GetContext<Arena>()->MakePooled<ClientMetadata>();
trailing_metadata = Arena::MakePooled<ClientMetadata>();
grpc_status_code status_code = GRPC_STATUS_UNKNOWN;
std::string message;
grpc_error_get_status(status.status(), Timestamp::InfFuture(),
@ -888,18 +884,7 @@ ArenaPromise<ServerMetadataHandle> MakeClientTransportCallPromise(
Transport* transport, CallArgs call_args, NextPromiseFactory) {
auto spine = GetContext<CallContext>()->MakeCallSpine(std::move(call_args));
transport->client_transport()->StartCall(CallHandler{spine});
return Map(spine->server_trailing_metadata().receiver.Next(),
[](NextResult<ServerMetadataHandle> r) {
if (r.has_value()) {
auto md = std::move(r.value());
md->Set(GrpcStatusFromWire(), true);
return md;
}
auto m = GetContext<Arena>()->MakePooled<ServerMetadata>();
m->Set(GrpcStatusMetadata(), GRPC_STATUS_CANCELLED);
m->Set(GrpcCallWasCancelled(), true);
return m;
});
return spine->PullServerTrailingMetadata();
}
const grpc_channel_filter kClientPromiseBasedTransportFilter =

@ -508,7 +508,7 @@ void BaseCallData::SendMessage::WakeInsideCombiner(Flusher* flusher,
case State::kGotBatch:
if (allow_push_to_pipe) {
state_ = State::kPushedToPipe;
auto message = GetContext<Arena>()->MakePooled<Message>();
auto message = Arena::MakePooled<Message>();
message->payload()->Swap(batch_->payload->send_message.send_message);
message->mutable_flags() = batch_->payload->send_message.flags;
push_ = interceptor()->Push()->Push(std::move(message));
@ -839,7 +839,7 @@ void BaseCallData::ReceiveMessage::WakeInsideCombiner(Flusher* flusher,
} else {
state_ = State::kCompletedWhilePushedToPipe;
}
auto message = GetContext<Arena>()->MakePooled<Message>();
auto message = Arena::MakePooled<Message>();
message->payload()->Swap(&**intercepted_slice_buffer_);
message->mutable_flags() = *intercepted_flags_;
push_ = interceptor()->Push()->Push(std::move(message));

@ -534,13 +534,14 @@ inline void InterceptClientToServerMessage(const NoInterceptor*, void*, void*,
template <typename Derived>
inline void InterceptClientToServerMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, call_spine](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnClientToServerMessage(*msg);
if (return_md == nullptr) return std::move(msg);
return call_spine->Cancel(std::move(return_md));
call_spine->PushServerTrailingMetadata(std::move(return_md));
return absl::nullopt;
});
}
@ -548,14 +549,15 @@ template <typename Derived>
inline void InterceptClientToServerMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnClientToServerMessage(*msg, channel);
if (return_md == nullptr) return std::move(msg);
return call_spine->Cancel(std::move(return_md));
call_spine->PushServerTrailingMetadata(std::move(return_md));
return absl::nullopt;
});
}
@ -563,7 +565,7 @@ template <typename Derived>
inline void InterceptClientToServerMessage(
MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, channel](MessageHandle msg) {
@ -575,24 +577,26 @@ template <typename Derived>
inline void InterceptClientToServerMessage(
absl::StatusOr<MessageHandle> (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->client_to_server_messages().receiver.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call->OnClientToServerMessage(std::move(msg), channel);
if (r.ok()) return std::move(*r);
return call_spine->Cancel(ServerMetadataFromStatus(r.status()));
call_spine->PushServerTrailingMetadata(
ServerMetadataFromStatus(r.status()));
return absl::nullopt;
});
}
inline void InterceptClientInitialMetadata(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
PipeBasedCallSpine*) {}
template <typename Derived>
inline void InterceptClientInitialMetadata(
void (Derived::Call::*fn)(ClientMetadata& md), typename Derived::Call* call,
Derived*, CallSpineInterface* call_spine) {
Derived*, PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call](ClientMetadataHandle md) {
@ -605,7 +609,7 @@ template <typename Derived>
inline void InterceptClientInitialMetadata(
void (Derived::Call::*fn)(ClientMetadata& md, Derived* channel),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call, channel](ClientMetadataHandle md) {
@ -617,14 +621,15 @@ inline void InterceptClientInitialMetadata(
template <typename Derived>
inline void InterceptClientInitialMetadata(
ServerMetadataHandle (Derived::Call::*fn)(ClientMetadata& md),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call_spine,
call](ClientMetadataHandle md) -> absl::optional<ClientMetadataHandle> {
auto return_md = call->OnClientInitialMetadata(*md);
if (return_md == nullptr) return std::move(md);
return call_spine->Cancel(std::move(return_md));
call_spine->PushServerTrailingMetadata(std::move(return_md));
return absl::nullopt;
});
}
@ -633,28 +638,31 @@ inline void InterceptClientInitialMetadata(
ServerMetadataHandle (Derived::Call::*fn)(ClientMetadata& md,
Derived* channel),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call_spine, call, channel](
ClientMetadataHandle md) -> absl::optional<ClientMetadataHandle> {
auto return_md = call->OnClientInitialMetadata(*md, channel);
if (return_md == nullptr) return std::move(md);
return call_spine->Cancel(std::move(return_md));
call_spine->PushServerTrailingMetadata(std::move(return_md));
return absl::nullopt;
});
}
template <typename Derived>
inline void InterceptClientInitialMetadata(
absl::Status (Derived::Call::*fn)(ClientMetadata& md),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call_spine,
call](ClientMetadataHandle md) -> absl::optional<ClientMetadataHandle> {
auto status = call->OnClientInitialMetadata(*md);
if (status.ok()) return std::move(md);
return call_spine->Cancel(ServerMetadataFromStatus(status));
call_spine->PushServerTrailingMetadata(
ServerMetadataFromStatus(status));
return absl::nullopt;
});
}
@ -662,14 +670,16 @@ template <typename Derived>
inline void InterceptClientInitialMetadata(
absl::Status (Derived::Call::*fn)(ClientMetadata& md, Derived* channel),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call_spine, call, channel](
ClientMetadataHandle md) -> absl::optional<ClientMetadataHandle> {
auto status = call->OnClientInitialMetadata(*md, channel);
if (status.ok()) return std::move(md);
return call_spine->Cancel(ServerMetadataFromStatus(status));
call_spine->PushServerTrailingMetadata(
ServerMetadataFromStatus(status));
return absl::nullopt;
});
}
@ -681,7 +691,7 @@ absl::void_t<decltype(StatusCast<ServerMetadataHandle>(
InterceptClientInitialMetadata(Promise (Derived::Call::*promise_factory)(
ClientMetadata& md, Derived* channel),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(promise_factory == &Derived::Call::OnClientInitialMetadata);
call_spine->client_initial_metadata().receiver.InterceptAndMap(
[call, call_spine, channel](ClientMetadataHandle md) {
@ -691,8 +701,9 @@ InterceptClientInitialMetadata(Promise (Derived::Call::*promise_factory)(
call_spine](PromiseResult<Promise> status) mutable
-> absl::optional<ClientMetadataHandle> {
if (IsStatusOk(status)) return std::move(md);
return call_spine->Cancel(
call_spine->PushServerTrailingMetadata(
StatusCast<ServerMetadataHandle>(std::move(status)));
return absl::nullopt;
});
});
}
@ -766,7 +777,7 @@ inline void InterceptServerInitialMetadata(const NoInterceptor*, void*, void*,
template <typename Derived>
inline void InterceptServerInitialMetadata(
void (Derived::Call::*fn)(ServerMetadata&), typename Derived::Call* call,
Derived*, CallSpineInterface* call_spine) {
Derived*, PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_spine->server_initial_metadata().sender.InterceptAndMap(
[call](ServerMetadataHandle md) {
@ -778,14 +789,16 @@ inline void InterceptServerInitialMetadata(
template <typename Derived>
inline void InterceptServerInitialMetadata(
absl::Status (Derived::Call::*fn)(ServerMetadata&),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_spine->server_initial_metadata().sender.InterceptAndMap(
[call, call_spine](
ServerMetadataHandle md) -> absl::optional<ServerMetadataHandle> {
auto status = call->OnServerInitialMetadata(*md);
if (status.ok()) return std::move(md);
return call_spine->Cancel(ServerMetadataFromStatus(status));
call_spine->PushServerTrailingMetadata(
ServerMetadataFromStatus(status));
return absl::nullopt;
});
}
@ -793,7 +806,7 @@ template <typename Derived>
inline void InterceptServerInitialMetadata(
void (Derived::Call::*fn)(ServerMetadata&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_spine->server_initial_metadata().sender.InterceptAndMap(
[call, channel](ServerMetadataHandle md) {
@ -806,14 +819,16 @@ template <typename Derived>
inline void InterceptServerInitialMetadata(
absl::Status (Derived::Call::*fn)(ServerMetadata&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerInitialMetadata);
call_spine->server_initial_metadata().sender.InterceptAndMap(
[call, call_spine, channel](
ServerMetadataHandle md) -> absl::optional<ServerMetadataHandle> {
auto status = call->OnServerInitialMetadata(*md, channel);
if (status.ok()) return std::move(md);
return call_spine->Cancel(ServerMetadataFromStatus(status));
call_spine->PullServerTrailingMetadata(
ServerMetadataFromStatus(status));
return absl::nullopt;
});
}
@ -885,13 +900,14 @@ inline void InterceptServerToClientMessage(const NoInterceptor*, void*, void*,
template <typename Derived>
inline void InterceptServerToClientMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
typename Derived::Call* call, Derived*, PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, call_spine](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnServerToClientMessage(*msg);
if (return_md == nullptr) return std::move(msg);
return call_spine->Cancel(std::move(return_md));
call_spine->PushServerTrailingMetadata(std::move(return_md));
return absl::nullopt;
});
}
@ -899,14 +915,15 @@ template <typename Derived>
inline void InterceptServerToClientMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnServerToClientMessage(*msg, channel);
if (return_md == nullptr) return std::move(msg);
return call_spine->Cancel(std::move(return_md));
call_spine->PushServerTrailingMetadata(std::move(return_md));
return absl::nullopt;
});
}
@ -914,7 +931,7 @@ template <typename Derived>
inline void InterceptServerToClientMessage(
MessageHandle (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, channel](MessageHandle msg) {
@ -926,14 +943,16 @@ template <typename Derived>
inline void InterceptServerToClientMessage(
absl::StatusOr<MessageHandle> (Derived::Call::*fn)(MessageHandle, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
PipeBasedCallSpine* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto r = call->OnServerToClientMessage(std::move(msg), channel);
if (r.ok()) return std::move(*r);
return call_spine->Cancel(ServerMetadataFromStatus(r.status()));
call_spine->PushServerTrailingMetadata(
ServerMetadataFromStatus(r.status()));
return absl::nullopt;
});
}
@ -942,40 +961,25 @@ inline void InterceptServerTrailingMetadata(const NoInterceptor*, void*, void*,
template <typename Derived>
inline void InterceptServerTrailingMetadata(
void (Derived::Call::*fn)(ServerMetadata&), typename Derived::Call* call,
Derived*, CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata);
call_spine->server_trailing_metadata().sender.InterceptAndMap(
[call](ServerMetadataHandle md) {
call->OnServerTrailingMetadata(*md);
return md;
});
void (Derived::Call::*)(ServerMetadata&), typename Derived::Call*, Derived*,
PipeBasedCallSpine*) {
gpr_log(GPR_ERROR,
"InterceptServerTrailingMetadata not available for call v2.5");
}
template <typename Derived>
inline void InterceptServerTrailingMetadata(
void (Derived::Call::*fn)(ServerMetadata&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata);
call_spine->server_trailing_metadata().sender.InterceptAndMap(
[call, channel](ServerMetadataHandle md) {
call->OnServerTrailingMetadata(*md, channel);
return md;
});
void (Derived::Call::*)(ServerMetadata&, Derived*), typename Derived::Call*,
Derived*, PipeBasedCallSpine*) {
gpr_log(GPR_ERROR,
"InterceptServerTrailingMetadata not available for call v2.5");
}
template <typename Derived>
inline void InterceptServerTrailingMetadata(
absl::Status (Derived::Call::*fn)(ServerMetadata&),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerTrailingMetadata);
call_spine->server_trailing_metadata().sender.InterceptAndMap(
[call](ServerMetadataHandle md) -> absl::optional<ServerMetadataHandle> {
auto status = call->OnServerTrailingMetadata(*md);
if (status.ok()) return std::move(md);
return ServerMetadataFromStatus(status);
});
absl::Status (Derived::Call::*)(ServerMetadata&), typename Derived::Call*,
Derived*, PipeBasedCallSpine*) {
gpr_log(GPR_ERROR,
"InterceptServerTrailingMetadata not available for call v2.5");
}
inline void InterceptFinalize(const NoInterceptor*, void*, void*) {}
@ -1085,23 +1089,20 @@ class ImplementChannelFilter : public ChannelFilter,
GetContext<Arena>()
->ManagedNew<promise_filter_detail::CallWrapper<Derived>>(
static_cast<Derived*>(this));
auto* c = DownCast<PipeBasedCallSpine*>(call_spine);
auto* d = static_cast<Derived*>(this);
promise_filter_detail::InterceptClientInitialMetadata(
&Derived::Call::OnClientInitialMetadata, call,
static_cast<Derived*>(this), call_spine);
&Derived::Call::OnClientInitialMetadata, call, d, c);
promise_filter_detail::InterceptClientToServerMessage(
&Derived::Call::OnClientToServerMessage, call,
static_cast<Derived*>(this), call_spine);
&Derived::Call::OnClientToServerMessage, call, d, c);
promise_filter_detail::InterceptServerInitialMetadata(
&Derived::Call::OnServerInitialMetadata, call,
static_cast<Derived*>(this), call_spine);
&Derived::Call::OnServerInitialMetadata, call, d, c);
promise_filter_detail::InterceptServerToClientMessage(
&Derived::Call::OnServerToClientMessage, call,
static_cast<Derived*>(this), call_spine);
&Derived::Call::OnServerToClientMessage, call, d, c);
promise_filter_detail::InterceptServerTrailingMetadata(
&Derived::Call::OnServerTrailingMetadata, call,
static_cast<Derived*>(this), call_spine);
promise_filter_detail::InterceptFinalize(&Derived::Call::OnFinalize,
static_cast<Derived*>(this), call);
&Derived::Call::OnServerTrailingMetadata, call, d, c);
promise_filter_detail::InterceptFinalize(&Derived::Call::OnFinalize, d,
call);
}
// Polyfill for the original promise scheme.

@ -108,6 +108,7 @@ ServerCallTracerFilter::Create(const ChannelArgs& /*args*/,
} // namespace
void RegisterServerCallTracerFilter(CoreConfiguration::Builder* builder) {
if (IsChaoticGoodEnabled()) return;
builder->channel_init()->RegisterFilter<ServerCallTracerFilter>(
GRPC_SERVER_CHANNEL);
}

@ -48,6 +48,9 @@ const char* const additional_constraints_event_engine_listener = "{}";
const char* const description_free_large_allocator =
"If set, return all free bytes from a \042big\042 allocator";
const char* const additional_constraints_free_large_allocator = "{}";
const char* const description_http2_stats_fix =
"Fix on HTTP2 outgoing data stats reporting";
const char* const additional_constraints_http2_stats_fix = "{}";
const char* const description_keepalive_fix =
"Allows overriding keepalive_permit_without_calls. Refer "
"https://github.com/grpc/grpc/pull/33428 for more information.";
@ -139,11 +142,6 @@ const char* const description_work_serializer_dispatch =
const char* const additional_constraints_work_serializer_dispatch = "{}";
const uint8_t required_experiments_work_serializer_dispatch[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineClient)};
#ifdef NDEBUG
const bool kDefaultForDebugOnly = false;
#else
const bool kDefaultForDebugOnly = true;
#endif
} // namespace
namespace grpc_core {
@ -152,7 +150,7 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"call_status_override_on_cancellation",
description_call_status_override_on_cancellation,
additional_constraints_call_status_override_on_cancellation, nullptr, 0,
kDefaultForDebugOnly, true},
true, true},
{"call_v3", description_call_v3, additional_constraints_call_v3, nullptr, 0,
false, true},
{"canary_client_privacy", description_canary_client_privacy,
@ -167,6 +165,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_event_engine_listener, nullptr, 0, false, true},
{"free_large_allocator", description_free_large_allocator,
additional_constraints_free_large_allocator, nullptr, 0, false, true},
{"http2_stats_fix", description_http2_stats_fix,
additional_constraints_http2_stats_fix, nullptr, 0, true, true},
{"keepalive_fix", description_keepalive_fix,
additional_constraints_keepalive_fix, nullptr, 0, false, false},
{"keepalive_server_fix", description_keepalive_server_fix,
@ -206,7 +206,7 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"tcp_rcv_lowat", description_tcp_rcv_lowat,
additional_constraints_tcp_rcv_lowat, nullptr, 0, false, true},
{"trace_record_callops", description_trace_record_callops,
additional_constraints_trace_record_callops, nullptr, 0, false, true},
additional_constraints_trace_record_callops, nullptr, 0, true, true},
{"unconstrained_max_quota_buffer_size",
description_unconstrained_max_quota_buffer_size,
additional_constraints_unconstrained_max_quota_buffer_size, nullptr, 0,
@ -248,6 +248,9 @@ const char* const additional_constraints_event_engine_listener = "{}";
const char* const description_free_large_allocator =
"If set, return all free bytes from a \042big\042 allocator";
const char* const additional_constraints_free_large_allocator = "{}";
const char* const description_http2_stats_fix =
"Fix on HTTP2 outgoing data stats reporting";
const char* const additional_constraints_http2_stats_fix = "{}";
const char* const description_keepalive_fix =
"Allows overriding keepalive_permit_without_calls. Refer "
"https://github.com/grpc/grpc/pull/33428 for more information.";
@ -339,11 +342,6 @@ const char* const description_work_serializer_dispatch =
const char* const additional_constraints_work_serializer_dispatch = "{}";
const uint8_t required_experiments_work_serializer_dispatch[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineClient)};
#ifdef NDEBUG
const bool kDefaultForDebugOnly = false;
#else
const bool kDefaultForDebugOnly = true;
#endif
} // namespace
namespace grpc_core {
@ -352,7 +350,7 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"call_status_override_on_cancellation",
description_call_status_override_on_cancellation,
additional_constraints_call_status_override_on_cancellation, nullptr, 0,
kDefaultForDebugOnly, true},
true, true},
{"call_v3", description_call_v3, additional_constraints_call_v3, nullptr, 0,
false, true},
{"canary_client_privacy", description_canary_client_privacy,
@ -367,6 +365,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_event_engine_listener, nullptr, 0, true, true},
{"free_large_allocator", description_free_large_allocator,
additional_constraints_free_large_allocator, nullptr, 0, false, true},
{"http2_stats_fix", description_http2_stats_fix,
additional_constraints_http2_stats_fix, nullptr, 0, true, true},
{"keepalive_fix", description_keepalive_fix,
additional_constraints_keepalive_fix, nullptr, 0, false, false},
{"keepalive_server_fix", description_keepalive_server_fix,
@ -406,7 +406,7 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"tcp_rcv_lowat", description_tcp_rcv_lowat,
additional_constraints_tcp_rcv_lowat, nullptr, 0, false, true},
{"trace_record_callops", description_trace_record_callops,
additional_constraints_trace_record_callops, nullptr, 0, false, true},
additional_constraints_trace_record_callops, nullptr, 0, true, true},
{"unconstrained_max_quota_buffer_size",
description_unconstrained_max_quota_buffer_size,
additional_constraints_unconstrained_max_quota_buffer_size, nullptr, 0,
@ -448,6 +448,9 @@ const char* const additional_constraints_event_engine_listener = "{}";
const char* const description_free_large_allocator =
"If set, return all free bytes from a \042big\042 allocator";
const char* const additional_constraints_free_large_allocator = "{}";
const char* const description_http2_stats_fix =
"Fix on HTTP2 outgoing data stats reporting";
const char* const additional_constraints_http2_stats_fix = "{}";
const char* const description_keepalive_fix =
"Allows overriding keepalive_permit_without_calls. Refer "
"https://github.com/grpc/grpc/pull/33428 for more information.";
@ -539,11 +542,6 @@ const char* const description_work_serializer_dispatch =
const char* const additional_constraints_work_serializer_dispatch = "{}";
const uint8_t required_experiments_work_serializer_dispatch[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineClient)};
#ifdef NDEBUG
const bool kDefaultForDebugOnly = false;
#else
const bool kDefaultForDebugOnly = true;
#endif
} // namespace
namespace grpc_core {
@ -552,7 +550,7 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"call_status_override_on_cancellation",
description_call_status_override_on_cancellation,
additional_constraints_call_status_override_on_cancellation, nullptr, 0,
kDefaultForDebugOnly, true},
true, true},
{"call_v3", description_call_v3, additional_constraints_call_v3, nullptr, 0,
false, true},
{"canary_client_privacy", description_canary_client_privacy,
@ -567,6 +565,8 @@ const ExperimentMetadata g_experiment_metadata[] = {
additional_constraints_event_engine_listener, nullptr, 0, true, true},
{"free_large_allocator", description_free_large_allocator,
additional_constraints_free_large_allocator, nullptr, 0, false, true},
{"http2_stats_fix", description_http2_stats_fix,
additional_constraints_http2_stats_fix, nullptr, 0, true, true},
{"keepalive_fix", description_keepalive_fix,
additional_constraints_keepalive_fix, nullptr, 0, false, false},
{"keepalive_server_fix", description_keepalive_server_fix,
@ -606,7 +606,7 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"tcp_rcv_lowat", description_tcp_rcv_lowat,
additional_constraints_tcp_rcv_lowat, nullptr, 0, false, true},
{"trace_record_callops", description_trace_record_callops,
additional_constraints_trace_record_callops, nullptr, 0, false, true},
additional_constraints_trace_record_callops, nullptr, 0, true, true},
{"unconstrained_max_quota_buffer_size",
description_unconstrained_max_quota_buffer_size,
additional_constraints_unconstrained_max_quota_buffer_size, nullptr, 0,

@ -57,16 +57,8 @@ namespace grpc_core {
#ifdef GRPC_EXPERIMENTS_ARE_FINAL
#if defined(GRPC_CFSTREAM)
#ifndef NDEBUG
#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION
#endif
inline bool IsCallStatusOverrideOnCancellationEnabled() {
#ifdef NDEBUG
return false;
#else
return true;
#endif
}
inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; }
inline bool IsCallV3Enabled() { return false; }
inline bool IsCanaryClientPrivacyEnabled() { return false; }
inline bool IsClientPrivacyEnabled() { return false; }
@ -74,6 +66,8 @@ inline bool IsEventEngineClientEnabled() { return false; }
inline bool IsEventEngineDnsEnabled() { return false; }
inline bool IsEventEngineListenerEnabled() { return false; }
inline bool IsFreeLargeAllocatorEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_HTTP2_STATS_FIX
inline bool IsHttp2StatsFixEnabled() { return true; }
inline bool IsKeepaliveFixEnabled() { return false; }
inline bool IsKeepaliveServerFixEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_MONITORING_EXPERIMENT
@ -93,23 +87,16 @@ inline bool IsScheduleCancellationOverWriteEnabled() { return false; }
inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsTcpFrameSizeTuningEnabled() { return false; }
inline bool IsTcpRcvLowatEnabled() { return false; }
inline bool IsTraceRecordCallopsEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS
inline bool IsTraceRecordCallopsEnabled() { return true; }
inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
#elif defined(GPR_WINDOWS)
#ifndef NDEBUG
#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION
#endif
inline bool IsCallStatusOverrideOnCancellationEnabled() {
#ifdef NDEBUG
return false;
#else
return true;
#endif
}
inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; }
inline bool IsCallV3Enabled() { return false; }
inline bool IsCanaryClientPrivacyEnabled() { return false; }
inline bool IsClientPrivacyEnabled() { return false; }
@ -118,6 +105,8 @@ inline bool IsEventEngineDnsEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_EVENT_ENGINE_LISTENER
inline bool IsEventEngineListenerEnabled() { return true; }
inline bool IsFreeLargeAllocatorEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_HTTP2_STATS_FIX
inline bool IsHttp2StatsFixEnabled() { return true; }
inline bool IsKeepaliveFixEnabled() { return false; }
inline bool IsKeepaliveServerFixEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_MONITORING_EXPERIMENT
@ -137,23 +126,16 @@ inline bool IsScheduleCancellationOverWriteEnabled() { return false; }
inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsTcpFrameSizeTuningEnabled() { return false; }
inline bool IsTcpRcvLowatEnabled() { return false; }
inline bool IsTraceRecordCallopsEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS
inline bool IsTraceRecordCallopsEnabled() { return true; }
inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
inline bool IsWorkSerializerDispatchEnabled() { return false; }
#else
#ifndef NDEBUG
#define GRPC_EXPERIMENT_IS_INCLUDED_CALL_STATUS_OVERRIDE_ON_CANCELLATION
#endif
inline bool IsCallStatusOverrideOnCancellationEnabled() {
#ifdef NDEBUG
return false;
#else
return true;
#endif
}
inline bool IsCallStatusOverrideOnCancellationEnabled() { return true; }
inline bool IsCallV3Enabled() { return false; }
inline bool IsCanaryClientPrivacyEnabled() { return false; }
inline bool IsClientPrivacyEnabled() { return false; }
@ -163,6 +145,8 @@ inline bool IsEventEngineDnsEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_EVENT_ENGINE_LISTENER
inline bool IsEventEngineListenerEnabled() { return true; }
inline bool IsFreeLargeAllocatorEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_HTTP2_STATS_FIX
inline bool IsHttp2StatsFixEnabled() { return true; }
inline bool IsKeepaliveFixEnabled() { return false; }
inline bool IsKeepaliveServerFixEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_MONITORING_EXPERIMENT
@ -182,7 +166,8 @@ inline bool IsScheduleCancellationOverWriteEnabled() { return false; }
inline bool IsServerPrivacyEnabled() { return false; }
inline bool IsTcpFrameSizeTuningEnabled() { return false; }
inline bool IsTcpRcvLowatEnabled() { return false; }
inline bool IsTraceRecordCallopsEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_TRACE_RECORD_CALLOPS
inline bool IsTraceRecordCallopsEnabled() { return true; }
inline bool IsUnconstrainedMaxQuotaBufferSizeEnabled() { return false; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
inline bool IsWorkSerializerClearsTimeCacheEnabled() { return true; }
@ -200,6 +185,7 @@ enum ExperimentIds {
kExperimentIdEventEngineDns,
kExperimentIdEventEngineListener,
kExperimentIdFreeLargeAllocator,
kExperimentIdHttp2StatsFix,
kExperimentIdKeepaliveFix,
kExperimentIdKeepaliveServerFix,
kExperimentIdMonitoringExperiment,
@ -254,6 +240,10 @@ inline bool IsEventEngineListenerEnabled() {
inline bool IsFreeLargeAllocatorEnabled() {
return IsExperimentEnabled(kExperimentIdFreeLargeAllocator);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_HTTP2_STATS_FIX
inline bool IsHttp2StatsFixEnabled() {
return IsExperimentEnabled(kExperimentIdHttp2StatsFix);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_KEEPALIVE_FIX
inline bool IsKeepaliveFixEnabled() {
return IsExperimentEnabled(kExperimentIdKeepaliveFix);

@ -44,7 +44,7 @@
description:
Avoid overriding call status of successfully finished calls if it races with
cancellation.
expiry: 2024/04/01
expiry: 2024/08/01
owner: vigneshbabu@google.com
test_tags: []
- name: call_v3
@ -98,6 +98,12 @@
expiry: 2024/08/01
owner: alishananda@google.com
test_tags: [resource_quota_test]
- name: http2_stats_fix
description:
Fix on HTTP2 outgoing data stats reporting
expiry: 2024/09/30
owner: yashkt@google.com
test_tags: []
- name: keepalive_fix
description:
Allows overriding keepalive_permit_without_calls.
@ -180,7 +186,7 @@
test_tags: [flow_control_test]
- name: schedule_cancellation_over_write
description: Allow cancellation op to be scheduled over a write
expiry: 2024/04/01
expiry: 2024/08/01
owner: vigneshbabu@google.com
test_tags: []
- name: server_privacy
@ -206,7 +212,7 @@
test_tags: ["endpoint_test", "flow_control_test"]
- name: trace_record_callops
description: Enables tracing of call batch initiation and completion.
expiry: 2024/04/01
expiry: 2024/08/01
owner: vigneshbabu@google.com
test_tags: []
- name: unconstrained_max_quota_buffer_size

@ -41,7 +41,7 @@
# Supported platforms: ios, windows, posix
- name: call_status_override_on_cancellation
default: debug
default: true
- name: call_v3
default: false
- name: canary_client_privacy
@ -109,7 +109,7 @@
- name: tcp_rcv_lowat
default: false
- name: trace_record_callops
default: false
default: true
- name: unconstrained_max_quota_buffer_size
default: false
- name: work_serializer_clears_time_cache

@ -56,13 +56,52 @@ struct Done<StatusFlag> {
static StatusFlag Make(bool cancelled) { return StatusFlag(!cancelled); }
};
template <typename T, typename SfinaeVoid = void>
struct NextValueTraits;
enum class NextValueType {
kValue,
kEndOfStream,
kError,
};
template <typename T>
struct NextValueTraits<T, absl::void_t<typename T::value_type>> {
using Value = typename T::value_type;
static NextValueType Type(const T& t) {
if (t.has_value()) return NextValueType::kValue;
if (t.cancelled()) return NextValueType::kError;
return NextValueType::kEndOfStream;
}
static Value& MutableValue(T& t) { return *t; }
};
template <typename T>
struct NextValueTraits<ValueOrFailure<absl::optional<T>>> {
using Value = T;
static NextValueType Type(const ValueOrFailure<absl::optional<T>>& t) {
if (t.ok()) {
if (t.value().has_value()) return NextValueType::kValue;
return NextValueType::kEndOfStream;
}
return NextValueType::kError;
}
static Value& MutableValue(ValueOrFailure<absl::optional<T>>& t) {
return **t;
}
};
template <typename Reader, typename Action>
class ForEach {
private:
using ReaderNext = decltype(std::declval<Reader>().Next());
using ReaderResult =
typename PollTraits<decltype(std::declval<ReaderNext>()())>::Type;
using ReaderResultValue = typename ReaderResult::value_type;
using ReaderResultValue = typename NextValueTraits<ReaderResult>::Value;
using ActionFactory =
promise_detail::RepeatedPromiseFactory<ReaderResultValue, Action>;
using ActionPromise = typename ActionFactory::Promise;
@ -120,22 +159,37 @@ class ForEach {
Poll<Result> PollReaderNext() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "%s PollReaderNext", DebugTag().c_str());
gpr_log(GPR_INFO, "%s PollReaderNext", DebugTag().c_str());
}
auto r = reader_next_();
if (auto* p = r.value_if_ready()) {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "%s PollReaderNext: got has_value=%s",
DebugTag().c_str(), p->has_value() ? "true" : "false");
}
if (p->has_value()) {
Destruct(&reader_next_);
auto action = action_factory_.Make(std::move(**p));
Construct(&in_action_, std::move(action), std::move(*p));
reading_next_ = false;
return PollAction();
} else {
return Done<Result>::Make(p->cancelled());
switch (NextValueTraits<ReaderResult>::Type(*p)) {
case NextValueType::kValue: {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%s PollReaderNext: got value",
DebugTag().c_str());
}
Destruct(&reader_next_);
auto action = action_factory_.Make(
std::move(NextValueTraits<ReaderResult>::MutableValue(*p)));
Construct(&in_action_, std::move(action), std::move(*p));
reading_next_ = false;
return PollAction();
}
case NextValueType::kEndOfStream: {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%s PollReaderNext: got end of stream",
DebugTag().c_str());
}
return Done<Result>::Make(false);
}
case NextValueType::kError: {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%s PollReaderNext: got error",
DebugTag().c_str());
}
return Done<Result>::Make(true);
}
}
}
return Pending();
@ -143,7 +197,7 @@ class ForEach {
Poll<Result> PollAction() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "%s PollAction", DebugTag().c_str());
gpr_log(GPR_INFO, "%s PollAction", DebugTag().c_str());
}
auto r = in_action_.promise();
if (auto* p = r.value_if_ready()) {

@ -89,7 +89,7 @@ class NextResult final {
const T& operator*() const;
T& operator*();
// Only valid if !has_value()
bool cancelled() { return cancelled_; }
bool cancelled() const { return cancelled_; }
private:
RefCountedPtr<pipe_detail::Center<T>> center_;

@ -2739,7 +2739,7 @@ class ClientPromiseBasedCall final : public PromiseBasedCall {
ScopedContext context(this);
args->channel->channel_stack()->stats_plugin_group->AddClientCallTracers(
*args->path, args->registered_method, this->context());
send_initial_metadata_ = GetContext<Arena>()->MakePooled<ClientMetadata>();
send_initial_metadata_ = Arena::MakePooled<ClientMetadata>();
send_initial_metadata_->Set(HttpPathMetadata(), std::move(*args->path));
if (args->authority.has_value()) {
send_initial_metadata_->Set(HttpAuthorityMetadata(),
@ -2818,7 +2818,7 @@ class ClientPromiseBasedCall final : public PromiseBasedCall {
}
RefCountedPtr<CallSpineInterface> MakeCallSpine(CallArgs call_args) final {
class WrappingCallSpine final : public CallSpineInterface {
class WrappingCallSpine final : public PipeBasedCallSpine {
public:
WrappingCallSpine(ClientPromiseBasedCall* call,
ClientMetadataHandle metadata)
@ -2859,14 +2859,14 @@ class ClientPromiseBasedCall final : public PromiseBasedCall {
return call_->server_to_client_messages_;
}
Pipe<ServerMetadataHandle>& server_trailing_metadata() override {
return server_trailing_metadata_;
}
Latch<ServerMetadataHandle>& cancel_latch() override {
return cancel_error_;
}
Latch<bool>& was_cancelled_latch() override {
return was_cancelled_latch_;
}
Party& party() override { return *call_; }
Arena* arena() override { return call_->arena(); }
@ -2886,6 +2886,7 @@ class ClientPromiseBasedCall final : public PromiseBasedCall {
Pipe<ClientMetadataHandle> client_initial_metadata_{call_->arena()};
Pipe<ServerMetadataHandle> server_trailing_metadata_{call_->arena()};
Latch<ServerMetadataHandle> cancel_error_;
Latch<bool> was_cancelled_latch_;
};
GPR_ASSERT(call_args.server_initial_metadata ==
&server_initial_metadata_.sender);
@ -3700,11 +3701,12 @@ ServerPromiseBasedCall::MakeTopOfServerCallPromise(
///////////////////////////////////////////////////////////////////////////////
// CallSpine based Server Call
class ServerCallSpine final : public CallSpineInterface,
class ServerCallSpine final : public PipeBasedCallSpine,
public ServerCallContext,
public BasicPromiseBasedCall {
public:
ServerCallSpine(ServerInterface* server, Channel* channel, Arena* arena);
ServerCallSpine(ClientMetadataHandle client_initial_metadata,
ServerInterface* server, Channel* channel, Arena* arena);
// CallSpineInterface
Pipe<ClientMetadataHandle>& client_initial_metadata() override {
@ -3719,10 +3721,8 @@ class ServerCallSpine final : public CallSpineInterface,
Pipe<MessageHandle>& server_to_client_messages() override {
return server_to_client_messages_;
}
Pipe<ServerMetadataHandle>& server_trailing_metadata() override {
return server_trailing_metadata_;
}
Latch<ServerMetadataHandle>& cancel_latch() override { return cancel_latch_; }
Latch<bool>& was_cancelled_latch() override { return was_cancelled_latch_; }
Party& party() override { return *this; }
Arena* arena() override { return BasicPromiseBasedCall::arena(); }
void IncrementRefCount() override { InternalRef("CallSpine"); }
@ -3735,7 +3735,9 @@ class ServerCallSpine final : public CallSpineInterface,
}
void CancelWithError(grpc_error_handle error) override {
SpawnInfallible("CancelWithError", [this, error = std::move(error)] {
std::ignore = Cancel(ServerMetadataFromStatus(error));
auto status = ServerMetadataFromStatus(error);
status->Set(GrpcCallWasCancelled(), true);
PushServerTrailingMetadata(std::move(status));
return Empty{};
});
}
@ -3784,15 +3786,15 @@ class ServerCallSpine final : public CallSpineInterface,
Pipe<MessageHandle> client_to_server_messages_;
// Messages travelling from the transport to the application.
Pipe<MessageHandle> server_to_client_messages_;
// Trailing metadata from server to client
Pipe<ServerMetadataHandle> server_trailing_metadata_;
// Latch that can be set to terminate the call
Latch<ServerMetadataHandle> cancel_latch_;
Latch<bool> was_cancelled_latch_;
grpc_byte_buffer** recv_message_ = nullptr;
ClientMetadataHandle client_initial_metadata_stored_;
};
ServerCallSpine::ServerCallSpine(ServerInterface* server, Channel* channel,
ServerCallSpine::ServerCallSpine(ClientMetadataHandle client_initial_metadata,
ServerInterface* server, Channel* channel,
Arena* arena)
: BasicPromiseBasedCall(arena, 0, 1,
[channel, server]() -> grpc_call_create_args {
@ -3811,11 +3813,15 @@ ServerCallSpine::ServerCallSpine(ServerInterface* server, Channel* channel,
client_initial_metadata_(arena),
server_initial_metadata_(arena),
client_to_server_messages_(arena),
server_to_client_messages_(arena),
server_trailing_metadata_(arena) {
server_to_client_messages_(arena) {
global_stats().IncrementServerCallsCreated();
ScopedContext ctx(this);
channel->channel_stack()->InitServerCallSpine(this);
SpawnGuarded("push_client_initial_metadata",
[this, md = std::move(client_initial_metadata)]() mutable {
return Map(client_initial_metadata_.sender.Push(std::move(md)),
[](bool r) { return StatusFlag(r); });
});
}
void ServerCallSpine::PublishInitialMetadata(
@ -4081,10 +4087,15 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
metadata->Set(GrpcMessageMetadata(),
Slice(grpc_slice_copy(*details)));
}
GPR_ASSERT(metadata != nullptr);
return [this, metadata = std::move(metadata)]() mutable {
server_to_client_messages_.sender.Close();
return Map(server_trailing_metadata_.sender.Push(std::move(metadata)),
[](bool r) { return StatusFlag(r); });
GPR_ASSERT(metadata != nullptr);
return [this,
metadata = std::move(metadata)]() mutable -> Poll<Success> {
GPR_ASSERT(metadata != nullptr);
PushServerTrailingMetadata(std::move(metadata));
return Success{};
};
};
});
auto recv_message =
@ -4099,13 +4110,15 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
};
});
auto primary_ops = AllOk<StatusFlag>(
std::move(send_initial_metadata), std::move(send_message),
std::move(send_trailing_metadata), std::move(recv_message));
TrySeq(AllOk<StatusFlag>(std::move(send_initial_metadata),
std::move(send_message)),
std::move(send_trailing_metadata)),
std::move(recv_message));
if (got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER] != 255) {
auto recv_trailing_metadata = MaybeOp(
ops, got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER], [this](const grpc_op& op) {
return [this, cancelled = op.data.recv_close_on_server.cancelled]() {
return Map(server_trailing_metadata_.receiver.AwaitClosed(),
return Map(WasCancelled(),
[cancelled, this](bool result) -> Success {
ResetDeadline();
*cancelled = result ? 1 : 0;
@ -4141,14 +4154,15 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
}
}
RefCountedPtr<CallSpineInterface> MakeServerCall(ServerInterface* server,
Channel* channel,
Arena* arena) {
return RefCountedPtr<ServerCallSpine>(
arena->New<ServerCallSpine>(server, channel, arena));
RefCountedPtr<CallSpineInterface> MakeServerCall(
ClientMetadataHandle client_initial_metadata, ServerInterface* server,
Channel* channel, Arena* arena) {
return RefCountedPtr<ServerCallSpine>(arena->New<ServerCallSpine>(
std::move(client_initial_metadata), server, channel, arena));
}
#else
RefCountedPtr<CallSpineInterface> MakeServerCall(ServerInterface*, Channel*,
RefCountedPtr<CallSpineInterface> MakeServerCall(ClientMetadataHandle,
ServerInterface*, Channel*,
Arena*) {
Crash("not implemented");
}

@ -158,9 +158,10 @@ class CallContext {
template <>
struct ContextType<CallContext> {};
RefCountedPtr<CallSpineInterface> MakeServerCall(ServerInterface* server,
Channel* channel,
Arena* arena);
// TODO(ctiller): remove once call-v3 finalized
RefCountedPtr<CallSpineInterface> MakeServerCall(
ClientMetadataHandle client_initial_metadata, ServerInterface* server,
Channel* channel, Arena* arena);
} // namespace grpc_core

@ -232,7 +232,8 @@ struct Server::RequestedCall {
data.registered.optional_payload = optional_payload;
}
void Complete(NextResult<MessageHandle> payload, ClientMetadata& md) {
template <typename OptionalPayload>
void Complete(OptionalPayload payload, ClientMetadata& md) {
Timestamp deadline = GetContext<CallContext>()->deadline();
switch (type) {
case RequestedCall::Type::BATCH_CALL:
@ -1301,9 +1302,10 @@ Server::ChannelData::~ChannelData() {
Arena* Server::ChannelData::CreateArena() { return channel_->CreateArena(); }
absl::StatusOr<CallInitiator> Server::ChannelData::CreateCall(
ClientMetadata& client_initial_metadata, Arena* arena) {
SetRegisteredMethodOnMetadata(client_initial_metadata);
auto call = MakeServerCall(server_.get(), channel_.get(), arena);
ClientMetadataHandle client_initial_metadata, Arena* arena) {
SetRegisteredMethodOnMetadata(*client_initial_metadata);
auto call = MakeServerCall(std::move(client_initial_metadata), server_.get(),
channel_.get(), arena);
InitCall(call);
return CallInitiator(std::move(call));
}
@ -1427,10 +1429,10 @@ void Server::ChannelData::InitCall(RefCountedPtr<CallSpineInterface> call) {
call->SpawnGuarded("request_matcher", [this, call]() {
return TrySeq(
// Wait for initial metadata to pass through all filters
Map(call->client_initial_metadata().receiver.Next(),
[](NextResult<ClientMetadataHandle> md)
Map(call->PullClientInitialMetadata(),
[](ValueOrFailure<ClientMetadataHandle> md)
-> absl::StatusOr<ClientMetadataHandle> {
if (!md.has_value()) {
if (!md.ok()) {
return absl::InternalError("Missing metadata");
}
if (!md.value()->get_pointer(HttpPathMetadata())) {
@ -1456,24 +1458,19 @@ void Server::ChannelData::InitCall(RefCountedPtr<CallSpineInterface> call) {
}
auto maybe_read_first_message = If(
payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
[call]() {
return call->client_to_server_messages().receiver.Next();
},
[]() -> NextResult<MessageHandle> {
return NextResult<MessageHandle>();
[call]() { return call->PullClientToServerMessage(); },
[]() -> ValueOrFailure<absl::optional<MessageHandle>> {
return ValueOrFailure<absl::optional<MessageHandle>>(
absl::nullopt);
});
return TryJoin<absl::StatusOr>(
Map(std::move(maybe_read_first_message),
[](NextResult<MessageHandle> n) {
return ValueOrFailure<NextResult<MessageHandle>>{
std::move(n)};
}),
rm->MatchRequest(cq_idx()), [md = std::move(md)]() mutable {
std::move(maybe_read_first_message), rm->MatchRequest(cq_idx()),
[md = std::move(md)]() mutable {
return ValueOrFailure<ClientMetadataHandle>(std::move(md));
});
},
// Publish call to cq
[](std::tuple<NextResult<MessageHandle>,
[](std::tuple<absl::optional<MessageHandle>,
RequestMatcherInterface::MatchResult,
ClientMetadataHandle>
r) {

@ -246,7 +246,7 @@ class Server : public ServerInterface,
Arena* CreateArena() override;
absl::StatusOr<CallInitiator> CreateCall(
ClientMetadata& client_initial_metadata, Arena* arena) override;
ClientMetadataHandle client_initial_metadata, Arena* arena) override;
private:
class ConnectivityWatcher;

@ -147,8 +147,7 @@ class BatchBuilder {
absl::string_view name() const override { return "receive_message"; }
MessageHandle IntoMessageHandle() {
return GetContext<Arena>()->MakePooled<Message>(std::move(*payload),
flags);
return Arena::MakePooled<Message>(std::move(*payload), flags);
}
absl::optional<SliceBuffer> payload;
@ -161,7 +160,7 @@ class BatchBuilder {
using PendingCompletion::PendingCompletion;
Arena::PoolPtr<grpc_metadata_batch> metadata =
GetContext<Arena>()->MakePooled<grpc_metadata_batch>();
Arena::MakePooled<grpc_metadata_batch>();
protected:
~PendingReceiveMetadata() = default;
@ -328,7 +327,7 @@ inline auto BatchBuilder::SendClientTrailingMetadata(Target target) {
auto* pc = batch->GetInitializedCompletion(&Batch::pending_sends);
batch->batch.on_complete = &pc->on_done_closure;
batch->batch.send_trailing_metadata = true;
auto metadata = GetContext<Arena>()->MakePooled<grpc_metadata_batch>();
auto metadata = Arena::MakePooled<grpc_metadata_batch>();
payload_->send_trailing_metadata.send_trailing_metadata = metadata.get();
payload_->send_trailing_metadata.sent = nullptr;
pc->send_trailing_metadata = std::move(metadata);

@ -18,16 +18,7 @@
namespace grpc_core {
void ForwardCall(CallHandler call_handler, CallInitiator call_initiator,
ClientMetadataHandle client_initial_metadata) {
// Send initial metadata.
call_initiator.SpawnGuarded(
"send_initial_metadata",
[client_initial_metadata = std::move(client_initial_metadata),
call_initiator]() mutable {
return call_initiator.PushClientInitialMetadata(
std::move(client_initial_metadata));
});
void ForwardCall(CallHandler call_handler, CallInitiator call_initiator) {
// Read messages from handler into initiator.
call_handler.SpawnGuarded("read_messages", [call_handler,
call_initiator]() mutable {
@ -88,10 +79,10 @@ void ForwardCall(CallHandler call_handler, CallInitiator call_initiator,
})),
call_initiator.PullServerTrailingMetadata(),
[call_handler](ServerMetadataHandle md) mutable {
call_handler.SpawnGuarded(
"recv_trailing_metadata",
[md = std::move(md), call_handler]() mutable {
return call_handler.PushServerTrailingMetadata(std::move(md));
call_handler.SpawnInfallible(
"recv_trailing", [call_handler, md = std::move(md)]() mutable {
call_handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
return Empty{};
});
@ -99,9 +90,12 @@ void ForwardCall(CallHandler call_handler, CallInitiator call_initiator,
}
CallInitiatorAndHandler MakeCall(
grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena) {
auto spine = CallSpine::Create(event_engine, arena);
return {CallInitiator(spine), CallHandler(spine)};
ClientMetadataHandle client_initial_metadata,
grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena,
bool is_arena_owned) {
auto spine = CallSpine::Create(std::move(client_initial_metadata),
event_engine, arena, is_arena_owned);
return {CallInitiator(spine), UnstartedCallHandler(spine)};
}
} // namespace grpc_core

@ -25,6 +25,7 @@
#include "src/core/lib/promise/party.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/prioritized_race.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/status_flag.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/transport/message.h"
@ -42,12 +43,6 @@ namespace grpc_core {
class CallSpineInterface {
public:
virtual ~CallSpineInterface() = default;
virtual Pipe<ClientMetadataHandle>& client_initial_metadata() = 0;
virtual Pipe<ServerMetadataHandle>& server_initial_metadata() = 0;
virtual Pipe<MessageHandle>& client_to_server_messages() = 0;
virtual Pipe<MessageHandle>& server_to_client_messages() = 0;
virtual Pipe<ServerMetadataHandle>& server_trailing_metadata() = 0;
virtual Latch<ServerMetadataHandle>& cancel_latch() = 0;
// Add a callback to be called when server trailing metadata is received.
void OnDone(absl::AnyInvocable<void()> fn) {
if (on_done_ == nullptr) {
@ -67,33 +62,24 @@ class CallSpineInterface {
virtual void IncrementRefCount() = 0;
virtual void Unref() = 0;
// Cancel the call with the given metadata.
// Regarding the `MUST_USE_RESULT absl::nullopt_t`:
// Most cancellation calls right now happen in pipe interceptors;
// there `nullopt` indicates terminate processing of this pipe and close with
// error.
// It's convenient then to have the Cancel operation (setting the latch to
// terminate the call) be the last thing that occurs in a pipe interceptor,
// and this construction supports that (and has helped the author not write
// some bugs).
GRPC_MUST_USE_RESULT absl::nullopt_t Cancel(ServerMetadataHandle metadata) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
auto& c = cancel_latch();
if (c.is_set()) return absl::nullopt;
c.Set(std::move(metadata));
CallOnDone();
client_initial_metadata().sender.CloseWithError();
server_initial_metadata().sender.CloseWithError();
client_to_server_messages().sender.CloseWithError();
server_to_client_messages().sender.CloseWithError();
server_trailing_metadata().sender.CloseWithError();
return absl::nullopt;
}
auto WaitForCancel() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return cancel_latch().Wait();
}
virtual Promise<ValueOrFailure<absl::optional<ServerMetadataHandle>>>
PullServerInitialMetadata() = 0;
virtual Promise<ServerMetadataHandle> PullServerTrailingMetadata() = 0;
virtual Promise<StatusFlag> PushClientToServerMessage(
MessageHandle message) = 0;
virtual Promise<ValueOrFailure<absl::optional<MessageHandle>>>
PullClientToServerMessage() = 0;
virtual Promise<StatusFlag> PushServerToClientMessage(
MessageHandle message) = 0;
virtual Promise<ValueOrFailure<absl::optional<MessageHandle>>>
PullServerToClientMessage() = 0;
virtual void PushServerTrailingMetadata(ServerMetadataHandle md) = 0;
virtual void FinishSends() = 0;
virtual Promise<ValueOrFailure<ClientMetadataHandle>>
PullClientInitialMetadata() = 0;
virtual Promise<StatusFlag> PushServerInitialMetadata(
absl::optional<ServerMetadataHandle> md) = 0;
virtual Promise<bool> WasCancelled() = 0;
// Wrap a promise so that if it returns failure it automatically cancels
// the rest of the call.
@ -105,7 +91,7 @@ class CallSpineInterface {
using ResultType = typename P::Result;
return Map(std::move(promise), [this](ResultType r) {
if (!IsStatusOk(r)) {
std::ignore = Cancel(StatusCast<ServerMetadataHandle>(r));
PushServerTrailingMetadata(StatusCast<ServerMetadataHandle>(r));
}
return r;
});
@ -121,7 +107,8 @@ class CallSpineInterface {
// Spawn a promise that returns some status-like type; if the status
// represents failure automatically cancel the rest of the call.
template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) {
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory,
DebugLocation whence = {}) {
using FactoryType =
promise_detail::OncePromiseFactory<void, PromiseFactory>;
using PromiseType = typename FactoryType::Promise;
@ -130,27 +117,158 @@ class CallSpineInterface {
std::is_same<bool,
decltype(IsStatusOk(std::declval<ResultType>()))>::value,
"SpawnGuarded promise must return a status-like object");
party().Spawn(name, std::move(promise_factory), [this](ResultType r) {
if (!IsStatusOk(r)) {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "SpawnGuarded sees failure: %s",
r.ToString().c_str());
}
std::ignore = Cancel(StatusCast<ServerMetadataHandle>(std::move(r)));
}
});
party().Spawn(
name, std::move(promise_factory), [this, whence](ResultType r) {
if (!IsStatusOk(r)) {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "SpawnGuarded sees failure: %s (source: %s:%d)",
r.ToString().c_str(), whence.file(), whence.line());
}
auto status = StatusCast<ServerMetadataHandle>(std::move(r));
status->Set(GrpcCallWasCancelled(), true);
PushServerTrailingMetadata(std::move(status));
}
});
}
private:
absl::AnyInvocable<void()> on_done_{nullptr};
};
class CallSpine final : public CallSpineInterface, public Party {
// Implementation of CallSpine atop the v2 Pipe based arrangement.
// This implementation will go away in favor of an implementation atop
// CallFilters by the time v3 lands.
class PipeBasedCallSpine : public CallSpineInterface {
public:
virtual Pipe<ClientMetadataHandle>& client_initial_metadata() = 0;
virtual Pipe<ServerMetadataHandle>& server_initial_metadata() = 0;
virtual Pipe<MessageHandle>& client_to_server_messages() = 0;
virtual Pipe<MessageHandle>& server_to_client_messages() = 0;
virtual Latch<ServerMetadataHandle>& cancel_latch() = 0;
virtual Latch<bool>& was_cancelled_latch() = 0;
Promise<ValueOrFailure<absl::optional<ServerMetadataHandle>>>
PullServerInitialMetadata() final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return Map(server_initial_metadata().receiver.Next(),
[](NextResult<ServerMetadataHandle> md)
-> ValueOrFailure<absl::optional<ServerMetadataHandle>> {
if (!md.has_value()) {
if (md.cancelled()) return Failure{};
return absl::optional<ServerMetadataHandle>();
}
return absl::optional<ServerMetadataHandle>(std::move(*md));
});
}
Promise<ServerMetadataHandle> PullServerTrailingMetadata() final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return cancel_latch().Wait();
}
Promise<ValueOrFailure<absl::optional<MessageHandle>>>
PullServerToClientMessage() final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return Map(server_to_client_messages().receiver.Next(), MapNextMessage);
}
Promise<StatusFlag> PushClientToServerMessage(MessageHandle message) final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return Map(client_to_server_messages().sender.Push(std::move(message)),
[](bool r) { return StatusFlag(r); });
}
Promise<ValueOrFailure<absl::optional<MessageHandle>>>
PullClientToServerMessage() final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return Map(client_to_server_messages().receiver.Next(), MapNextMessage);
}
Promise<StatusFlag> PushServerToClientMessage(MessageHandle message) final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return Map(server_to_client_messages().sender.Push(std::move(message)),
[](bool r) { return StatusFlag(r); });
}
void FinishSends() final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
client_to_server_messages().sender.Close();
}
void PushServerTrailingMetadata(ServerMetadataHandle metadata) final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
auto& c = cancel_latch();
if (c.is_set()) return;
const bool was_cancelled =
metadata->get(GrpcCallWasCancelled()).value_or(false);
c.Set(std::move(metadata));
CallOnDone();
was_cancelled_latch().Set(was_cancelled);
client_initial_metadata().sender.CloseWithError();
server_initial_metadata().sender.Close();
client_to_server_messages().sender.CloseWithError();
server_to_client_messages().sender.Close();
}
Promise<bool> WasCancelled() final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return was_cancelled_latch().Wait();
}
Promise<ValueOrFailure<ClientMetadataHandle>> PullClientInitialMetadata()
final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return Map(client_initial_metadata().receiver.Next(),
[](NextResult<ClientMetadataHandle> md)
-> ValueOrFailure<ClientMetadataHandle> {
if (!md.has_value()) return Failure{};
return std::move(*md);
});
}
Promise<StatusFlag> PushServerInitialMetadata(
absl::optional<ServerMetadataHandle> md) final {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return If(
md.has_value(),
[&md, this]() {
return Map(server_initial_metadata().sender.Push(std::move(*md)),
[](bool ok) { return StatusFlag(ok); });
},
[this]() {
server_initial_metadata().sender.Close();
return []() -> StatusFlag { return Success{}; };
});
}
private:
static ValueOrFailure<absl::optional<MessageHandle>> MapNextMessage(
NextResult<MessageHandle> r) {
if (!r.has_value()) {
if (r.cancelled()) return Failure{};
return absl::optional<MessageHandle>();
}
return absl::optional<MessageHandle>(std::move(*r));
}
};
class CallSpine final : public PipeBasedCallSpine, public Party {
public:
static RefCountedPtr<CallSpine> Create(
grpc_event_engine::experimental::EventEngine* event_engine,
Arena* arena) {
return RefCountedPtr<CallSpine>(arena->New<CallSpine>(event_engine, arena));
ClientMetadataHandle client_initial_metadata,
grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena,
bool is_arena_owned) {
auto spine = RefCountedPtr<CallSpine>(
arena->New<CallSpine>(event_engine, arena, is_arena_owned));
spine->SpawnInfallible(
"push_client_initial_metadata",
[spine = spine.get(), client_initial_metadata = std::move(
client_initial_metadata)]() mutable {
return Map(spine->client_initial_metadata_.sender.Push(
std::move(client_initial_metadata)),
[](bool) { return Empty{}; });
});
return spine;
}
Pipe<ClientMetadataHandle>& client_initial_metadata() override {
@ -165,10 +283,8 @@ class CallSpine final : public CallSpineInterface, public Party {
Pipe<MessageHandle>& server_to_client_messages() override {
return server_to_client_messages_;
}
Pipe<ServerMetadataHandle>& server_trailing_metadata() override {
return server_trailing_metadata_;
}
Latch<ServerMetadataHandle>& cancel_latch() override { return cancel_latch_; }
Latch<bool>& was_cancelled_latch() override { return was_cancelled_latch_; }
Party& party() override { return *this; }
Arena* arena() override { return arena_; }
void IncrementRefCount() override { Party::IncrementRefCount(); }
@ -177,8 +293,11 @@ class CallSpine final : public CallSpineInterface, public Party {
private:
friend class Arena;
CallSpine(grpc_event_engine::experimental::EventEngine* event_engine,
Arena* arena)
: Party(1), arena_(arena), event_engine_(event_engine) {}
Arena* arena, bool is_arena_owned)
: Party(1),
arena_(arena),
is_arena_owned_(is_arena_owned),
event_engine_(event_engine) {}
class ScopedContext : public ScopedActivity,
public promise_detail::Context<Arena> {
@ -208,6 +327,7 @@ class CallSpine final : public CallSpineInterface, public Party {
}
Arena* arena_;
bool is_arena_owned_;
// Initial metadata from client to server
Pipe<ClientMetadataHandle> client_initial_metadata_{arena()};
// Initial metadata from server to client
@ -216,10 +336,9 @@ class CallSpine final : public CallSpineInterface, public Party {
Pipe<MessageHandle> client_to_server_messages_{arena()};
// Messages travelling from the transport to the application.
Pipe<MessageHandle> server_to_client_messages_{arena()};
// Trailing metadata from server to client
Pipe<ServerMetadataHandle> server_trailing_metadata_{arena()};
// Latch that can be set to terminate the call
Latch<ServerMetadataHandle> cancel_latch_;
Latch<bool> was_cancelled_latch_;
// Event engine associated with this call
grpc_event_engine::experimental::EventEngine* const event_engine_;
};
@ -229,73 +348,31 @@ class CallInitiator {
explicit CallInitiator(RefCountedPtr<CallSpineInterface> spine)
: spine_(std::move(spine)) {}
auto PushClientInitialMetadata(ClientMetadataHandle md) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(spine_->client_initial_metadata().sender.Push(std::move(md)),
[](bool ok) { return StatusFlag(ok); });
template <typename Promise>
auto CancelIfFails(Promise promise) {
return spine_->CancelIfFails(std::move(promise));
}
auto PullServerInitialMetadata() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(spine_->server_initial_metadata().receiver.Next(),
[](NextResult<ServerMetadataHandle> md)
-> ValueOrFailure<absl::optional<ServerMetadataHandle>> {
if (!md.has_value()) {
if (md.cancelled()) return Failure{};
return absl::optional<ServerMetadataHandle>();
}
return absl::optional<ServerMetadataHandle>(std::move(*md));
});
}
auto PullServerTrailingMetadata() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return PrioritizedRace(
Seq(spine_->server_trailing_metadata().receiver.Next(),
[spine = spine_](NextResult<ServerMetadataHandle> md) mutable {
return [md = std::move(md),
spine]() mutable -> Poll<ServerMetadataHandle> {
// If the pipe was closed at cancellation time, we'll see no
// value here. Return pending and allow the cancellation to win
// the race.
if (!md.has_value()) return Pending{};
spine->server_trailing_metadata().sender.Close();
return std::move(*md);
};
}),
Map(spine_->WaitForCancel(),
[spine = spine_](ServerMetadataHandle md) -> ServerMetadataHandle {
spine->server_trailing_metadata().sender.CloseWithError();
return md;
}));
}
auto PullMessage() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return spine_->server_to_client_messages().receiver.Next();
return spine_->PullServerInitialMetadata();
}
auto PushMessage(MessageHandle message) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(
spine_->client_to_server_messages().sender.Push(std::move(message)),
[](bool r) { return StatusFlag(r); });
return spine_->PushClientToServerMessage(std::move(message));
}
void FinishSends() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
spine_->client_to_server_messages().sender.Close();
}
void FinishSends() { spine_->FinishSends(); }
template <typename Promise>
auto CancelIfFails(Promise promise) {
return spine_->CancelIfFails(std::move(promise));
auto PullMessage() { return spine_->PullServerToClientMessage(); }
auto PullServerTrailingMetadata() {
return spine_->PullServerTrailingMetadata();
}
void Cancel() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
std::ignore =
spine_->Cancel(ServerMetadataFromStatus(absl::CancelledError()));
auto status = ServerMetadataFromStatus(absl::CancelledError());
status->Set(GrpcCallWasCancelled(), true);
spine_->PushServerTrailingMetadata(std::move(status));
}
void OnDone(absl::AnyInvocable<void()> fn) { spine_->OnDone(std::move(fn)); }
@ -327,55 +404,59 @@ class CallHandler {
: spine_(std::move(spine)) {}
auto PullClientInitialMetadata() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(spine_->client_initial_metadata().receiver.Next(),
[](NextResult<ClientMetadataHandle> md)
-> ValueOrFailure<ClientMetadataHandle> {
if (!md.has_value()) return Failure{};
return std::move(*md);
});
return spine_->PullClientInitialMetadata();
}
auto PushServerInitialMetadata(absl::optional<ServerMetadataHandle> md) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return If(
md.has_value(),
[&md, this]() {
return Map(
spine_->server_initial_metadata().sender.Push(std::move(*md)),
[](bool ok) { return StatusFlag(ok); });
},
[this]() {
spine_->server_initial_metadata().sender.Close();
return []() -> StatusFlag { return Success{}; };
});
return spine_->PushServerInitialMetadata(std::move(md));
}
auto PushServerTrailingMetadata(ServerMetadataHandle md) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
spine_->server_initial_metadata().sender.Close();
spine_->server_to_client_messages().sender.Close();
spine_->client_to_server_messages().receiver.CloseWithError();
spine_->CallOnDone();
return Map(spine_->server_trailing_metadata().sender.Push(std::move(md)),
[](bool ok) { return StatusFlag(ok); });
void PushServerTrailingMetadata(ServerMetadataHandle status) {
spine_->PushServerTrailingMetadata(std::move(status));
}
auto PullMessage() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return spine_->client_to_server_messages().receiver.Next();
void OnDone(absl::AnyInvocable<void()> fn) { spine_->OnDone(std::move(fn)); }
template <typename Promise>
auto CancelIfFails(Promise promise) {
return spine_->CancelIfFails(std::move(promise));
}
auto PushMessage(MessageHandle message) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(
spine_->server_to_client_messages().sender.Push(std::move(message)),
[](bool ok) { return StatusFlag(ok); });
return spine_->PushServerToClientMessage(std::move(message));
}
auto PullMessage() { return spine_->PullClientToServerMessage(); }
template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory,
DebugLocation whence = {}) {
spine_->SpawnGuarded(name, std::move(promise_factory), whence);
}
void Cancel(ServerMetadataHandle status) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
std::ignore = spine_->Cancel(std::move(status));
template <typename PromiseFactory>
void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnInfallible(name, std::move(promise_factory));
}
template <typename PromiseFactory>
auto SpawnWaitable(absl::string_view name, PromiseFactory promise_factory) {
return spine_->party().SpawnWaitable(name, std::move(promise_factory));
}
Arena* arena() { return spine_->arena(); }
private:
RefCountedPtr<CallSpineInterface> spine_;
};
class UnstartedCallHandler {
public:
explicit UnstartedCallHandler(RefCountedPtr<CallSpineInterface> spine)
: spine_(std::move(spine)) {}
void PushServerTrailingMetadata(ServerMetadataHandle status) {
spine_->PushServerTrailingMetadata(std::move(status));
}
void OnDone(absl::AnyInvocable<void()> fn) { spine_->OnDone(std::move(fn)); }
@ -386,8 +467,9 @@ class CallHandler {
}
template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnGuarded(name, std::move(promise_factory));
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory,
DebugLocation whence = {}) {
spine_->SpawnGuarded(name, std::move(promise_factory), whence);
}
template <typename PromiseFactory>
@ -400,6 +482,11 @@ class CallHandler {
return spine_->party().SpawnWaitable(name, std::move(promise_factory));
}
CallHandler V2HackToStartCallWithoutACallFilterStack() {
GPR_ASSERT(DownCast<PipeBasedCallSpine*>(spine_.get()) != nullptr);
return CallHandler(std::move(spine_));
}
Arena* arena() { return spine_->arena(); }
private:
@ -408,11 +495,13 @@ class CallHandler {
struct CallInitiatorAndHandler {
CallInitiator initiator;
CallHandler handler;
UnstartedCallHandler handler;
};
CallInitiatorAndHandler MakeCall(
grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena);
ClientMetadataHandle client_initial_metadata,
grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena,
bool is_arena_owned);
template <typename CallHalf>
auto OutgoingMessages(CallHalf h) {
@ -425,8 +514,7 @@ auto OutgoingMessages(CallHalf h) {
// Forward a call from `call_handler` to `call_initiator` (with initial metadata
// `client_initial_metadata`)
void ForwardCall(CallHandler call_handler, CallInitiator call_initiator,
ClientMetadataHandle client_initial_metadata);
void ForwardCall(CallHandler call_handler, CallInitiator call_initiator);
} // namespace grpc_core

@ -559,7 +559,7 @@ class ServerTransport {
// Create a call at the server (or fail)
// arena must have been previously allocated by CreateArena()
virtual absl::StatusOr<CallInitiator> CreateCall(
ClientMetadata& client_initial_metadata, Arena* arena) = 0;
ClientMetadataHandle client_initial_metadata, Arena* arena) = 0;
protected:
~Acceptor() = default;

@ -102,13 +102,8 @@ const grpc_channel_filter test_filter = {
return Immediate(ServerMetadataFromStatus(
absl::PermissionDeniedError("Failure that's not preventable.")));
},
[](grpc_channel_element*, CallSpineInterface* args) {
args->client_initial_metadata().receiver.InterceptAndMap(
[args](ClientMetadataHandle) {
return args->Cancel(
ServerMetadataFromStatus(absl::PermissionDeniedError(
"Failure that's not preventable.")));
});
[](grpc_channel_element*, CallSpineInterface*) {
Crash("Should never be called");
},
grpc_channel_next_op,
sizeof(call_data),

@ -193,6 +193,9 @@ class NewFakeStatsPlugin : public FakeStatsPlugin {
// This test verifies the HTTP2 stats on a stream
CORE_END2END_TEST(Http2FullstackSingleHopTest, StreamStats) {
if (!IsHttp2StatsFixEnabled()) {
GTEST_SKIP() << "Test needs http2_stats_fix experiment to be enabled";
}
g_mu = new Mutex();
g_client_call_ended_notify = new Notification();
g_server_call_ended_notify = new Notification();

@ -105,22 +105,21 @@ struct MockPromiseEndpoint {
auto SendClientToServerMessages(CallInitiator initiator, int num_messages) {
return Loop([initiator, num_messages]() mutable {
bool has_message = (num_messages > 0);
return If(
has_message,
Seq(initiator.PushMessage(GetContext<Arena>()->MakePooled<Message>()),
[&num_messages]() -> LoopCtl<absl::Status> {
--num_messages;
return Continue();
}),
[initiator]() mutable -> LoopCtl<absl::Status> {
initiator.FinishSends();
return absl::OkStatus();
});
return If(has_message,
Seq(initiator.PushMessage(Arena::MakePooled<Message>()),
[&num_messages]() -> LoopCtl<absl::Status> {
--num_messages;
return Continue();
}),
[initiator]() mutable -> LoopCtl<absl::Status> {
initiator.FinishSends();
return absl::OkStatus();
});
});
}
ClientMetadataHandle TestInitialMetadata() {
auto md = GetContext<Arena>()->MakePooled<ClientMetadata>();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromStaticString("/test"));
return md;
}
@ -178,14 +177,13 @@ TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call =
MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator()));
transport->StartCall(std::move(call.handler));
call.initiator.SpawnGuarded("test-send", [initiator =
call.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 1));
});
auto call = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack());
call.initiator.SpawnGuarded("test-send",
[initiator = call.initiator]() mutable {
return SendClientToServerMessages(initiator, 1);
});
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
call.initiator.SpawnInfallible(
@ -193,7 +191,7 @@ TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
EXPECT_TRUE(md.ok());
return Empty{};
},
initiator.PullServerTrailingMetadata(),
@ -224,14 +222,13 @@ TEST_F(ClientTransportTest, AddOneStreamWithReadFailed) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call =
MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator()));
transport->StartCall(std::move(call.handler));
call.initiator.SpawnGuarded("test-send", [initiator =
call.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 1));
});
auto call = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack());
call.initiator.SpawnGuarded("test-send",
[initiator = call.initiator]() mutable {
return SendClientToServerMessages(initiator, 1);
});
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
call.initiator.SpawnInfallible(
@ -239,7 +236,7 @@ TEST_F(ClientTransportTest, AddOneStreamWithReadFailed) {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
EXPECT_TRUE(md.ok());
return Empty{};
},
initiator.PullServerTrailingMetadata(),
@ -278,22 +275,22 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call1 =
MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator()));
transport->StartCall(std::move(call1.handler));
auto call2 =
MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator()));
transport->StartCall(std::move(call2.handler));
call1.initiator.SpawnGuarded("test-send-1", [initiator =
call1.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 1));
});
call2.initiator.SpawnGuarded("test-send-2", [initiator =
call2.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 1));
});
auto call1 = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
transport->StartCall(
call1.handler.V2HackToStartCallWithoutACallFilterStack());
auto call2 = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
transport->StartCall(
call2.handler.V2HackToStartCallWithoutACallFilterStack());
call1.initiator.SpawnGuarded(
"test-send-1", [initiator = call1.initiator]() mutable {
return SendClientToServerMessages(initiator, 1);
});
call2.initiator.SpawnGuarded(
"test-send-2", [initiator = call2.initiator]() mutable {
return SendClientToServerMessages(initiator, 1);
});
StrictMock<MockFunction<void()>> on_done1;
EXPECT_CALL(on_done1, Call());
StrictMock<MockFunction<void()>> on_done2;
@ -303,7 +300,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
EXPECT_TRUE(md.ok());
return Empty{};
},
initiator.PullServerTrailingMetadata(),
@ -319,7 +316,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
EXPECT_TRUE(md.ok());
return Empty{};
},
initiator.PullServerTrailingMetadata(),
@ -350,22 +347,22 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call1 =
MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator()));
transport->StartCall(std::move(call1.handler));
auto call2 =
MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator()));
transport->StartCall(std::move(call2.handler));
call1.initiator.SpawnGuarded("test-send", [initiator =
call1.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 1));
});
call2.initiator.SpawnGuarded("test-send", [initiator =
call2.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 1));
});
auto call1 = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
transport->StartCall(
call1.handler.V2HackToStartCallWithoutACallFilterStack());
auto call2 = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
transport->StartCall(
call2.handler.V2HackToStartCallWithoutACallFilterStack());
call1.initiator.SpawnGuarded(
"test-send", [initiator = call1.initiator]() mutable {
return SendClientToServerMessages(initiator, 1);
});
call2.initiator.SpawnGuarded(
"test-send", [initiator = call2.initiator]() mutable {
return SendClientToServerMessages(initiator, 1);
});
StrictMock<MockFunction<void()>> on_done1;
EXPECT_CALL(on_done1, Call());
StrictMock<MockFunction<void()>> on_done2;
@ -375,7 +372,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
EXPECT_TRUE(md.ok());
return Empty{};
},
initiator.PullServerTrailingMetadata(),
@ -391,7 +388,7 @@ TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) {
return Seq(
initiator.PullServerInitialMetadata(),
[](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_FALSE(md.ok());
EXPECT_TRUE(md.ok());
return Empty{};
},
initiator.PullServerTrailingMetadata(),

@ -67,7 +67,7 @@ const uint8_t kGrpcStatus0[] = {0x10, 0x0b, 0x67, 0x72, 0x70, 0x63, 0x2d, 0x73,
0x74, 0x61, 0x74, 0x75, 0x73, 0x01, 0x30};
ClientMetadataHandle TestInitialMetadata() {
auto md = GetContext<Arena>()->MakePooled<ClientMetadata>();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromStaticString("/demo.Service/Step"));
return md;
}
@ -78,7 +78,7 @@ auto SendClientToServerMessages(CallInitiator initiator, int num_messages) {
bool has_message = (i < num_messages);
return If(
has_message,
Seq(initiator.PushMessage(GetContext<Arena>()->MakePooled<Message>(
Seq(initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString(std::to_string(i))), 0)),
[&i]() -> LoopCtl<absl::Status> {
++i;
@ -115,9 +115,9 @@ TEST_F(TransportTest, AddOneStream) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call =
MakeCall(event_engine().get(), Arena::Create(1024, memory_allocator()));
transport->StartCall(std::move(call.handler));
auto call = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(1024, memory_allocator()), true);
transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack());
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
control_endpoint.ExpectWrite(
@ -133,11 +133,10 @@ TEST_F(TransportTest, AddOneStream) {
{EventEngineSlice::FromCopiedString("0"), Zeros(63)}, nullptr);
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr);
call.initiator.SpawnGuarded("test-send", [initiator =
call.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 1));
});
call.initiator.SpawnGuarded("test-send",
[initiator = call.initiator]() mutable {
return SendClientToServerMessages(initiator, 1);
});
call.initiator.SpawnInfallible(
"test-read", [&on_done, initiator = call.initiator]() mutable {
return Seq(
@ -152,18 +151,23 @@ TEST_F(TransportTest, AddOneStream) {
"/demo.Service/Step");
return Empty{};
},
initiator.PullMessage(),
[](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "12345678");
[initiator]() mutable { return initiator.PullMessage(); },
[](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"12345678");
return Empty{};
},
initiator.PullMessage(),
[](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
[initiator]() mutable { return initiator.PullMessage(); },
[](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return Empty{};
},
initiator.PullServerTrailingMetadata(),
[initiator]() mutable {
return initiator.PullServerTrailingMetadata();
},
[&on_done](ServerMetadataHandle md) {
EXPECT_EQ(md->get(GrpcStatusMetadata()).value(), GRPC_STATUS_OK);
on_done.Call();
@ -198,9 +202,9 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) {
std::move(control_endpoint.promise_endpoint),
std::move(data_endpoint.promise_endpoint), MakeChannelArgs(),
event_engine(), HPackParser(), HPackCompressor());
auto call =
MakeCall(event_engine().get(), Arena::Create(8192, memory_allocator()));
transport->StartCall(std::move(call.handler));
auto call = MakeCall(TestInitialMetadata(), event_engine().get(),
Arena::Create(8192, memory_allocator()), true);
transport->StartCall(call.handler.V2HackToStartCallWithoutACallFilterStack());
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
control_endpoint.ExpectWrite(
@ -221,11 +225,10 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) {
{EventEngineSlice::FromCopiedString("1"), Zeros(63)}, nullptr);
control_endpoint.ExpectWrite(
{SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, 0)}, nullptr);
call.initiator.SpawnGuarded("test-send", [initiator =
call.initiator]() mutable {
return TrySeq(initiator.PushClientInitialMetadata(TestInitialMetadata()),
SendClientToServerMessages(initiator, 2));
});
call.initiator.SpawnGuarded("test-send",
[initiator = call.initiator]() mutable {
return SendClientToServerMessages(initiator, 2);
});
call.initiator.SpawnInfallible(
"test-read", [&on_done, initiator = call.initiator]() mutable {
return Seq(
@ -241,20 +244,25 @@ TEST_F(TransportTest, AddOneStreamMultipleMessages) {
return Empty{};
},
initiator.PullMessage(),
[](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "12345678");
[](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"12345678");
return Empty{};
},
initiator.PullMessage(),
[](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "87654321");
[](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"87654321");
return Empty{};
},
initiator.PullMessage(),
[](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
[](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return Empty{};
},
initiator.PullServerTrailingMetadata(),

@ -71,13 +71,13 @@ const uint8_t kGrpcStatus0[] = {0x40, 0x0b, 0x67, 0x72, 0x70, 0x63, 0x2d, 0x73,
0x74, 0x61, 0x74, 0x75, 0x73, 0x01, 0x30};
ServerMetadataHandle TestInitialMetadata() {
auto md = GetContext<Arena>()->MakePooled<ServerMetadata>();
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(HttpPathMetadata(), Slice::FromStaticString("/demo.Service/Step"));
return md;
}
ServerMetadataHandle TestTrailingMetadata() {
auto md = GetContext<Arena>()->MakePooled<ServerMetadata>();
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_OK);
return md;
}
@ -87,7 +87,7 @@ class MockAcceptor : public ServerTransport::Acceptor {
virtual ~MockAcceptor() = default;
MOCK_METHOD(Arena*, CreateArena, (), (override));
MOCK_METHOD(absl::StatusOr<CallInitiator>, CreateCall,
(ClientMetadata & client_initial_metadata, Arena* arena),
(ClientMetadataHandle client_initial_metadata, Arena* arena),
(override));
};
@ -113,18 +113,59 @@ TEST_F(TransportTest, ReadAndWriteOneMessage) {
{EventEngineSlice::FromCopiedString("12345678"), Zeros(56)}, nullptr);
// Once that's read we'll create a new call
auto* call_arena = Arena::Create(1024, memory_allocator());
CallInitiatorAndHandler call = MakeCall(event_engine().get(), call_arena);
EXPECT_CALL(acceptor, CreateArena).WillOnce(Return(call_arena));
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(acceptor, CreateCall(_, call_arena))
.WillOnce(WithArgs<0>([call_initiator = std::move(call.initiator)](
ClientMetadata& client_initial_metadata) {
EXPECT_EQ(client_initial_metadata.get_pointer(HttpPathMetadata())
.WillOnce(WithArgs<0>([this, call_arena, &on_done](
ClientMetadataHandle client_initial_metadata) {
EXPECT_EQ(client_initial_metadata->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return call_initiator;
CallInitiatorAndHandler call =
MakeCall(std::move(client_initial_metadata), event_engine().get(),
call_arena, true);
auto handler = call.handler.V2HackToStartCallWithoutACallFilterStack();
handler.SpawnInfallible("test-io", [&on_done, handler]() mutable {
return Seq(
handler.PullClientInitialMetadata(),
[](ValueOrFailure<ClientMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(md.value()
->get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
return Empty{};
},
[handler]() mutable { return handler.PullMessage(); },
[](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"12345678");
return Empty{};
},
[handler]() mutable { return handler.PullMessage(); },
[](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return Empty{};
},
[handler]() mutable {
return handler.PushServerInitialMetadata(TestInitialMetadata());
},
[handler]() mutable {
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("87654321")), 0));
},
[handler, &on_done]() mutable {
handler.PushServerTrailingMetadata(TestTrailingMetadata());
on_done.Call();
return Empty{};
});
});
return std::move(call.initiator);
}));
transport->SetAcceptor(&acceptor);
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(on_done, Call());
EXPECT_CALL(*control_endpoint.endpoint, Read)
.InSequence(control_endpoint.read_sequence)
@ -145,39 +186,6 @@ TEST_F(TransportTest, ReadAndWriteOneMessage) {
sizeof(kGrpcStatus0)),
EventEngineSlice::FromCopiedBuffer(kGrpcStatus0, sizeof(kGrpcStatus0))},
nullptr);
call.handler.SpawnInfallible(
"test-io", [&on_done, handler = call.handler]() mutable {
return Seq(
handler.PullClientInitialMetadata(),
[](ValueOrFailure<ServerMetadataHandle> md) {
EXPECT_TRUE(md.ok());
EXPECT_EQ(
md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
"/demo.Service/Step");
return Empty{};
},
handler.PullMessage(),
[](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "12345678");
return Empty{};
},
handler.PullMessage(),
[](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
return Empty{};
},
handler.PushServerInitialMetadata(TestInitialMetadata()),
handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("87654321")), 0)),
[handler]() mutable {
return handler.PushServerTrailingMetadata(TestTrailingMetadata());
},
[&on_done]() mutable {
on_done.Call();
return Empty{};
});
});
// Wait until ClientTransport's internal activities to finish.
event_engine()->TickUntilIdle();
event_engine()->UnsetGlobalHooks();

@ -63,21 +63,17 @@ void FillMetadata(const std::vector<std::pair<std::string, std::string>>& md,
TRANSPORT_TEST(UnaryWithSomeContent) {
SetServerAcceptor();
auto initiator = CreateCall();
const auto client_initial_metadata = RandomMetadata();
const auto server_initial_metadata = RandomMetadata();
const auto server_trailing_metadata = RandomMetadata();
const auto client_payload = RandomMessage();
const auto server_payload = RandomMessage();
auto md = Arena::MakePooled<ClientMetadata>();
FillMetadata(client_initial_metadata, *md);
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
FillMetadata(client_initial_metadata, *md);
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
[&]() mutable {
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString(client_payload)), 0));
},
@ -93,14 +89,16 @@ TRANSPORT_TEST(UnaryWithSomeContent) {
UnorderedElementsAreArray(server_initial_metadata));
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), server_payload);
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
server_payload);
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
@ -118,14 +116,16 @@ TRANSPORT_TEST(UnaryWithSomeContent) {
UnorderedElementsAreArray(client_initial_metadata));
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), client_payload);
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
client_payload);
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
auto md = Arena::MakePooled<ServerMetadata>();
FillMetadata(server_initial_metadata, *md);
return handler.PushServerInitialMetadata(std::move(md));
@ -139,10 +139,7 @@ TRANSPORT_TEST(UnaryWithSomeContent) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
FillMetadata(server_trailing_metadata, *md);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();

@ -18,16 +18,12 @@ namespace grpc_core {
TRANSPORT_TEST(MetadataOnlyRequest) {
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
[&]() mutable {
initiator.FinishSends();
return initiator.PullServerInitialMetadata();
},
@ -53,8 +49,9 @@ TRANSPORT_TEST(MetadataOnlyRequest) {
"/foo/bar");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md));
@ -63,10 +60,7 @@ TRANSPORT_TEST(MetadataOnlyRequest) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();
@ -79,16 +73,12 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsAfterInitialMetadata) {
"rolling out soon, so leaving this disabled.";
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
[&]() mutable {
// We don't close the sending stream here.
return initiator.PullServerInitialMetadata();
},
@ -123,10 +113,7 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsAfterInitialMetadata) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();
@ -139,16 +126,12 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsImmediately) {
"rolling out soon, so leaving this disabled.";
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
[&]() mutable {
// We don't close the sending stream here.
return initiator.PullServerInitialMetadata();
},
@ -175,10 +158,7 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsImmediately) {
// and don't send initial metadata - just trailing metadata.
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();
@ -186,18 +166,9 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsImmediately) {
TRANSPORT_TEST(CanCreateCallThenAbandonIt) {
SetServerAcceptor();
auto initiator = CreateCall();
SpawnTestSeq(
initiator, "start-call",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return Empty{};
});
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
auto handler = TickUntilServerCall();
SpawnTestSeq(initiator, "end-call", [&]() {
initiator.Cancel();
@ -208,16 +179,12 @@ TRANSPORT_TEST(CanCreateCallThenAbandonIt) {
TRANSPORT_TEST(UnaryRequest) {
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
[&]() mutable {
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("hello world")), 0));
},
@ -233,15 +200,16 @@ TRANSPORT_TEST(UnaryRequest) {
ContentTypeMetadata::kApplicationGrpc);
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
@ -259,14 +227,16 @@ TRANSPORT_TEST(UnaryRequest) {
"/foo/bar");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md));
@ -280,10 +250,7 @@ TRANSPORT_TEST(UnaryRequest) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();
@ -291,16 +258,12 @@ TRANSPORT_TEST(UnaryRequest) {
TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) {
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
[&]() mutable {
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("hello world")), 0));
},
@ -316,9 +279,10 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) {
ContentTypeMetadata::kApplicationGrpc);
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor");
return initiator.PullServerTrailingMetadata();
},
@ -337,9 +301,11 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) {
"/foo/bar");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world");
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md));
@ -353,10 +319,7 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();
@ -364,18 +327,12 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) {
TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) {
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PullServerInitialMetadata();
},
[&]() mutable { return initiator.PullServerInitialMetadata(); },
[&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
@ -389,15 +346,16 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) {
initiator.FinishSends();
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
@ -422,14 +380,16 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) {
EXPECT_TRUE(result.ok());
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0));
},
@ -437,10 +397,7 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();
@ -448,18 +405,12 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) {
TRANSPORT_TEST(ClientStreamingRequest) {
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PullServerInitialMetadata();
},
[&]() mutable { return initiator.PullServerInitialMetadata(); },
[&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
@ -493,9 +444,9 @@ TRANSPORT_TEST(ClientStreamingRequest) {
initiator.FinishSends();
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
@ -520,40 +471,47 @@ TRANSPORT_TEST(ClientStreamingRequest) {
EXPECT_TRUE(result.ok());
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (2)");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world (2)");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (3)");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world (3)");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (4)");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world (4)");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (5)");
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"hello world (5)");
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();
@ -561,18 +519,12 @@ TRANSPORT_TEST(ClientStreamingRequest) {
TRANSPORT_TEST(ServerStreamingRequest) {
SetServerAcceptor();
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
SpawnTestSeq(
initiator, "initiator",
[&]() {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
return initiator.PushClientInitialMetadata(std::move(md));
},
[&](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
return initiator.PullServerInitialMetadata();
},
[&]() mutable { return initiator.PullServerInitialMetadata(); },
[&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
EXPECT_TRUE(md.ok());
EXPECT_TRUE(md.value().has_value());
@ -581,45 +533,51 @@ TRANSPORT_TEST(ServerStreamingRequest) {
initiator.FinishSends();
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor (2)");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor (3)");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor (4)");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor (5)");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
"why hello neighbor (6)");
return initiator.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return initiator.PullServerTrailingMetadata();
},
[&](ValueOrFailure<ServerMetadataHandle> md) {
@ -644,9 +602,9 @@ TRANSPORT_TEST(ServerStreamingRequest) {
EXPECT_TRUE(result.ok());
return handler.PullMessage();
},
[&](NextResult<MessageHandle> msg) {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[&](ValueOrFailure<absl::optional<MessageHandle>> msg) {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return handler.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0));
},
@ -679,10 +637,7 @@ TRANSPORT_TEST(ServerStreamingRequest) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[&](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
WaitForAllPendingWork();

@ -30,19 +30,14 @@ TRANSPORT_TEST(ManyUnaryRequests) {
std::map<int, std::string> client_messages;
std::map<int, std::string> server_messages;
for (int i = 0; i < kNumRequests; i++) {
auto initiator = CreateCall();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromCopiedString(std::to_string(i)));
auto initiator = CreateCall(std::move(md));
client_messages[i] = RandomMessage();
server_messages[i] = RandomMessage();
SpawnTestSeq(
initiator, make_call_name(i, "initiator"),
[initiator, i]() mutable {
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(),
Slice::FromCopiedString(std::to_string(i)));
return initiator.PushClientInitialMetadata(std::move(md));
},
[initiator, i, &client_messages](StatusFlag status) mutable {
EXPECT_TRUE(status.ok());
[initiator, i, &client_messages]() mutable {
return initiator.PushMessage(Arena::MakePooled<Message>(
SliceBuffer(Slice::FromCopiedString(client_messages[i])), 0));
},
@ -59,16 +54,17 @@ TRANSPORT_TEST(ManyUnaryRequests) {
ContentTypeMetadata::kApplicationGrpc);
return initiator.PullMessage();
},
[initiator, i,
&server_messages](NextResult<MessageHandle> msg) mutable {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[initiator, i, &server_messages](
ValueOrFailure<absl::optional<MessageHandle>> msg) mutable {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
server_messages[i]);
return initiator.PullMessage();
},
[initiator](NextResult<MessageHandle> msg) mutable {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[initiator](ValueOrFailure<absl::optional<MessageHandle>> msg) mutable {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
return initiator.PullServerTrailingMetadata();
},
[initiator](ValueOrFailure<ServerMetadataHandle> md) mutable {
@ -92,16 +88,17 @@ TRANSPORT_TEST(ManyUnaryRequests) {
&*this_call_index));
return handler.PullMessage();
},
[handler, this_call_index,
&client_messages](NextResult<MessageHandle> msg) mutable {
EXPECT_TRUE(msg.has_value());
EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
[handler, this_call_index, &client_messages](
ValueOrFailure<absl::optional<MessageHandle>> msg) mutable {
EXPECT_TRUE(msg.ok());
EXPECT_TRUE(msg.value().has_value());
EXPECT_EQ(msg.value().value()->payload()->JoinIntoString(),
client_messages[*this_call_index]);
return handler.PullMessage();
},
[handler](NextResult<MessageHandle> msg) mutable {
EXPECT_FALSE(msg.has_value());
EXPECT_FALSE(msg.cancelled());
[handler](ValueOrFailure<absl::optional<MessageHandle>> msg) mutable {
EXPECT_TRUE(msg.ok());
EXPECT_FALSE(msg.value().has_value());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
return handler.PushServerInitialMetadata(std::move(md));
@ -118,10 +115,7 @@ TRANSPORT_TEST(ManyUnaryRequests) {
EXPECT_TRUE(result.ok());
auto md = Arena::MakePooled<ServerMetadata>();
md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
return handler.PushServerTrailingMetadata(std::move(md));
},
[handler](StatusFlag result) mutable {
EXPECT_TRUE(result.ok());
handler.PushServerTrailingMetadata(std::move(md));
return Empty{};
});
}

@ -56,12 +56,16 @@ void TransportTest::SetServerAcceptor() {
transport_pair_.server->server_transport()->SetAcceptor(&acceptor_);
}
CallInitiator TransportTest::CreateCall() {
auto call = MakeCall(event_engine_.get(), Arena::Create(1024, &allocator_));
call.handler.SpawnInfallible("start-call", [this, handler = call.handler]() {
transport_pair_.client->client_transport()->StartCall(handler);
return Empty{};
});
CallInitiator TransportTest::CreateCall(
ClientMetadataHandle client_initial_metadata) {
auto call = MakeCall(std::move(client_initial_metadata), event_engine_.get(),
Arena::Create(1024, &allocator_), true);
call.handler.SpawnInfallible(
"start-call", [this, handler = call.handler]() mutable {
transport_pair_.client->client_transport()->StartCall(
handler.V2HackToStartCallWithoutACallFilterStack());
return Empty{};
});
return std::move(call.initiator);
}
@ -231,9 +235,10 @@ Arena* TransportTest::Acceptor::CreateArena() {
}
absl::StatusOr<CallInitiator> TransportTest::Acceptor::CreateCall(
ClientMetadata&, Arena* arena) {
auto call = MakeCall(event_engine_, arena);
handlers_.push(std::move(call.handler));
ClientMetadataHandle client_initial_metadata, Arena* arena) {
auto call =
MakeCall(std::move(client_initial_metadata), event_engine_, arena, true);
handlers_.push(call.handler.V2HackToStartCallWithoutACallFilterStack());
return std::move(call.initiator);
}

@ -221,7 +221,7 @@ class TransportTest : public ::testing::Test {
rng_(rng) {}
void SetServerAcceptor();
CallInitiator CreateCall();
CallInitiator CreateCall(ClientMetadataHandle client_initial_metadata);
std::string RandomString(int min_length, int max_length,
absl::string_view character_set);
@ -272,7 +272,7 @@ class TransportTest : public ::testing::Test {
Arena* CreateArena() override;
absl::StatusOr<CallInitiator> CreateCall(
ClientMetadata& client_initial_metadata, Arena* arena) override;
ClientMetadataHandle client_initial_metadata, Arena* arena) override;
absl::optional<CallHandler> PopHandler();
private:

@ -19,22 +19,25 @@ cd "$(dirname "$0")"
shopt -s nullglob
echo "Testing Python packages with input artifacts:"
ls "$EXTERNAL_GIT_ROOT"/input_artifacts
if [[ "$1" == "binary" ]]
then
echo "Testing Python binary distribution"
ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio-[0-9]*.whl)
TOOLS_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio_tools-[0-9]*.whl)
OBSERVABILITY_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio_observability-[0-9]*.whl)
ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[-_0-9a-z.]*.whl)
TOOLS_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[_-]*tools[-_0-9a-z.]*.whl)
OBSERVABILITY_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[_-]*observability[-_0-9a-z.]*.whl)
else
echo "Testing Python source distribution"
ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio-[0-9]*.tar.gz)
TOOLS_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio-tools-[0-9]*.tar.gz)
OBSERVABILITY_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio-observability-[0-9]*.tar.gz)
ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[-_0-9a-z.]*.tar.gz)
TOOLS_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[_-]*tools[-_0-9a-z.]*.tar.gz)
OBSERVABILITY_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[_-]*observability[-_0-9a-z.]*.tar.gz)
fi
HEALTH_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio-health-checking-[0-9]*.tar.gz)
REFLECTION_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio-reflection-[0-9]*.tar.gz)
TESTING_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio-testing-[0-9]*.tar.gz)
HEALTH_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[_-]*health[_-]*checking[-_0-9a-z.]*.tar.gz)
REFLECTION_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[_-]*reflection[-_0-9a-z.]*.tar.gz)
TESTING_ARCHIVES=("$EXTERNAL_GIT_ROOT"/input_artifacts/grpcio[_-]*testing[-_0-9a-z.]*.tar.gz)
VIRTUAL_ENV=$(mktemp -d)
python3 -m virtualenv "$VIRTUAL_ENV"

Loading…
Cancel
Save