[call-v3] Server path (#36509)

<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes #36509

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36509 from ctiller:transport-refs-3 2771a2b0e1
PiperOrigin-RevId: 633240374
pull/36543/head^2
Craig Tiller 7 months ago committed by Copybara-Service
parent c31b115a63
commit 0ecee5ad3f
  1. 4
      BUILD
  2. 3
      CMakeLists.txt
  3. 1
      Makefile
  4. 3
      Package.swift
  5. 21
      bazel/experiments.bzl
  6. 9
      build_autogenerated.yaml
  7. 1
      config.m4
  8. 1
      config.w32
  9. 4
      gRPC-C++.podspec
  10. 5
      gRPC-Core.podspec
  11. 3
      grpc.gemspec
  12. 3
      package.xml
  13. 8
      src/core/BUILD
  14. 4
      src/core/ext/filters/logging/logging_filter.cc
  15. 76
      src/core/ext/transport/chaotic_good/server_transport.cc
  16. 12
      src/core/ext/transport/chaotic_good/server_transport.h
  17. 51
      src/core/ext/transport/inproc/inproc_transport.cc
  18. 2
      src/core/ext/transport/inproc/inproc_transport.h
  19. 26
      src/core/lib/channel/channel_stack.cc
  20. 1
      src/core/lib/channel/channel_stack.h
  21. 48
      src/core/lib/experiments/experiments.cc
  22. 8
      src/core/lib/experiments/experiments.h
  23. 11
      src/core/lib/experiments/experiments.yaml
  24. 2
      src/core/lib/experiments/rollouts.yaml
  25. 11
      src/core/lib/promise/detail/promise_like.h
  26. 2
      src/core/lib/promise/status_flag.h
  27. 1015
      src/core/lib/surface/call.cc
  28. 60
      src/core/lib/surface/call.h
  29. 84
      src/core/lib/surface/channel_init.cc
  30. 116
      src/core/lib/surface/channel_init.h
  31. 4
      src/core/lib/surface/init.cc
  32. 18
      src/core/lib/transport/call_destination.h
  33. 44
      src/core/lib/transport/call_filters.h
  34. 8
      src/core/lib/transport/call_spine.h
  35. 11
      src/core/lib/transport/interception_chain.h
  36. 67
      src/core/lib/transport/transport.h
  37. 3
      src/core/plugin_registry/grpc_plugin_registry.cc
  38. 632
      src/core/server/server.cc
  39. 33
      src/core/server/server.h
  40. 2
      src/core/server/server_interface.h
  41. 1
      src/python/grpcio/grpc_core_dependencies.py
  42. 3
      test/core/end2end/tests/filter_context.cc
  43. 33
      test/core/surface/channel_init_test.cc
  44. 6
      test/core/transport/chaotic_good/chaotic_good_server_test.cc
  45. 37
      test/core/transport/chaotic_good/server_transport_test.cc
  46. 2
      test/core/transport/test_suite/call_content.cc
  47. 18
      test/core/transport/test_suite/call_shapes.cc
  48. 6
      test/core/transport/test_suite/inproc_fixture.cc
  49. 4
      test/core/transport/test_suite/no_op.cc
  50. 2
      test/core/transport/test_suite/stress.cc
  51. 29
      test/core/transport/test_suite/test.cc
  52. 17
      test/core/transport/test_suite/test.h
  53. 1
      test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong_chaotic_good.cc
  54. 3
      tools/doxygen/Doxyfile.c++.internal
  55. 3
      tools/doxygen/Doxyfile.core.internal

@ -1942,9 +1942,7 @@ grpc_cc_library(
"promise",
"ref_counted_ptr",
"stats",
"//src/core:1999",
"//src/core:activity",
"//src/core:arena",
"//src/core:arena_promise",
"//src/core:cancel_callback",
"//src/core:channel_args",
@ -1958,6 +1956,7 @@ grpc_cc_library(
"//src/core:error",
"//src/core:error_utils",
"//src/core:experiments",
"//src/core:interception_chain",
"//src/core:iomgr_fwd",
"//src/core:map",
"//src/core:metadata_batch",
@ -2078,6 +2077,7 @@ grpc_cc_library(
"//src/core:arena_promise",
"//src/core:atomic_utils",
"//src/core:bitset",
"//src/core:call_destination",
"//src/core:call_filters",
"//src/core:call_final_info",
"//src/core:call_finalization",

3
CMakeLists.txt generated

@ -2502,6 +2502,7 @@ add_library(grpc
src/core/lib/transport/call_spine.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc
src/core/lib/transport/interception_chain.cc
src/core/lib/transport/message.cc
src/core/lib/transport/metadata.cc
src/core/lib/transport/metadata_batch.cc
@ -3255,6 +3256,7 @@ add_library(grpc_unsecure
src/core/lib/transport/call_spine.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc
src/core/lib/transport/interception_chain.cc
src/core/lib/transport/message.cc
src/core/lib/transport/metadata.cc
src/core/lib/transport/metadata_batch.cc
@ -5366,6 +5368,7 @@ add_library(grpc_authorization_provider
src/core/lib/transport/call_spine.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc
src/core/lib/transport/interception_chain.cc
src/core/lib/transport/message.cc
src/core/lib/transport/metadata.cc
src/core/lib/transport/metadata_batch.cc

1
Makefile generated

@ -1387,6 +1387,7 @@ LIBGRPC_SRC = \
src/core/lib/transport/call_spine.cc \
src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/error_utils.cc \
src/core/lib/transport/interception_chain.cc \
src/core/lib/transport/message.cc \
src/core/lib/transport/metadata.cc \
src/core/lib/transport/metadata_batch.cc \

3
Package.swift generated

@ -1749,6 +1749,7 @@ let package = Package(
"src/core/lib/transport/bdp_estimator.h",
"src/core/lib/transport/call_arena_allocator.cc",
"src/core/lib/transport/call_arena_allocator.h",
"src/core/lib/transport/call_destination.h",
"src/core/lib/transport/call_filters.cc",
"src/core/lib/transport/call_filters.h",
"src/core/lib/transport/call_final_info.cc",
@ -1761,6 +1762,8 @@ let package = Package(
"src/core/lib/transport/error_utils.cc",
"src/core/lib/transport/error_utils.h",
"src/core/lib/transport/http2_errors.h",
"src/core/lib/transport/interception_chain.cc",
"src/core/lib/transport/interception_chain.h",
"src/core/lib/transport/message.cc",
"src/core/lib/transport/message.h",
"src/core/lib/transport/metadata.cc",

@ -34,9 +34,8 @@ EXPERIMENT_ENABLES = {
"pending_queue_cap": "pending_queue_cap",
"pick_first_new": "pick_first_new",
"promise_based_client_call": "event_engine_client,event_engine_listener,promise_based_client_call",
"promise_based_server_call": "promise_based_server_call",
"chaotic_good": "chaotic_good,event_engine_client,event_engine_listener,promise_based_client_call,promise_based_server_call",
"promise_based_inproc_transport": "event_engine_client,event_engine_listener,promise_based_client_call,promise_based_inproc_transport,promise_based_server_call",
"chaotic_good": "chaotic_good,event_engine_client,event_engine_listener,promise_based_client_call",
"promise_based_inproc_transport": "event_engine_client,event_engine_listener,promise_based_client_call,promise_based_inproc_transport",
"rstpit": "rstpit",
"schedule_cancellation_over_write": "schedule_cancellation_over_write",
"server_privacy": "server_privacy",
@ -59,9 +58,6 @@ EXPERIMENTS = {
"dbg": {
},
"off": {
"core_end2end_test": [
"promise_based_server_call",
],
"endpoint_test": [
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
@ -73,9 +69,6 @@ EXPERIMENTS = {
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],
"logging_test": [
"promise_based_server_call",
],
"resource_quota_test": [
"free_large_allocator",
"unconstrained_max_quota_buffer_size",
@ -107,9 +100,6 @@ EXPERIMENTS = {
"dbg": {
},
"off": {
"core_end2end_test": [
"promise_based_server_call",
],
"endpoint_test": [
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
@ -121,9 +111,6 @@ EXPERIMENTS = {
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],
"logging_test": [
"promise_based_server_call",
],
"resource_quota_test": [
"free_large_allocator",
"unconstrained_max_quota_buffer_size",
@ -149,7 +136,6 @@ EXPERIMENTS = {
"chaotic_good",
"event_engine_client",
"promise_based_client_call",
"promise_based_server_call",
],
"endpoint_test": [
"tcp_frame_size_tuning",
@ -168,9 +154,6 @@ EXPERIMENTS = {
"lame_client_test": [
"promise_based_client_call",
],
"logging_test": [
"promise_based_server_call",
],
"resource_quota_test": [
"free_large_allocator",
"unconstrained_max_quota_buffer_size",

@ -1110,6 +1110,7 @@ libs:
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/call_arena_allocator.h
- src/core/lib/transport/call_destination.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_spine.h
@ -1117,6 +1118,7 @@ libs:
- src/core/lib/transport/custom_metadata.h
- src/core/lib/transport/error_utils.h
- src/core/lib/transport/http2_errors.h
- src/core/lib/transport/interception_chain.h
- src/core/lib/transport/message.h
- src/core/lib/transport/metadata.h
- src/core/lib/transport/metadata_batch.h
@ -1921,6 +1923,7 @@ libs:
- src/core/lib/transport/call_spine.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc
- src/core/lib/transport/interception_chain.cc
- src/core/lib/transport/message.cc
- src/core/lib/transport/metadata.cc
- src/core/lib/transport/metadata_batch.cc
@ -2609,6 +2612,7 @@ libs:
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/call_arena_allocator.h
- src/core/lib/transport/call_destination.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_spine.h
@ -2616,6 +2620,7 @@ libs:
- src/core/lib/transport/custom_metadata.h
- src/core/lib/transport/error_utils.h
- src/core/lib/transport/http2_errors.h
- src/core/lib/transport/interception_chain.h
- src/core/lib/transport/message.h
- src/core/lib/transport/metadata.h
- src/core/lib/transport/metadata_batch.h
@ -3032,6 +3037,7 @@ libs:
- src/core/lib/transport/call_spine.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc
- src/core/lib/transport/interception_chain.cc
- src/core/lib/transport/message.cc
- src/core/lib/transport/metadata.cc
- src/core/lib/transport/metadata_batch.cc
@ -4686,6 +4692,7 @@ libs:
- src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/call_arena_allocator.h
- src/core/lib/transport/call_destination.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_spine.h
@ -4693,6 +4700,7 @@ libs:
- src/core/lib/transport/custom_metadata.h
- src/core/lib/transport/error_utils.h
- src/core/lib/transport/http2_errors.h
- src/core/lib/transport/interception_chain.h
- src/core/lib/transport/message.h
- src/core/lib/transport/metadata.h
- src/core/lib/transport/metadata_batch.h
@ -4989,6 +4997,7 @@ libs:
- src/core/lib/transport/call_spine.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc
- src/core/lib/transport/interception_chain.cc
- src/core/lib/transport/message.cc
- src/core/lib/transport/metadata.cc
- src/core/lib/transport/metadata_batch.cc

1
config.m4 generated

@ -762,6 +762,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/transport/call_spine.cc \
src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/error_utils.cc \
src/core/lib/transport/interception_chain.cc \
src/core/lib/transport/message.cc \
src/core/lib/transport/metadata.cc \
src/core/lib/transport/metadata_batch.cc \

1
config.w32 generated

@ -727,6 +727,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\transport\\call_spine.cc " +
"src\\core\\lib\\transport\\connectivity_state.cc " +
"src\\core\\lib\\transport\\error_utils.cc " +
"src\\core\\lib\\transport\\interception_chain.cc " +
"src\\core\\lib\\transport\\message.cc " +
"src\\core\\lib\\transport\\metadata.cc " +
"src\\core\\lib\\transport\\metadata_batch.cc " +

4
gRPC-C++.podspec generated

@ -1213,6 +1213,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_arena_allocator.h',
'src/core/lib/transport/call_destination.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_spine.h',
@ -1220,6 +1221,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/custom_metadata.h',
'src/core/lib/transport/error_utils.h',
'src/core/lib/transport/http2_errors.h',
'src/core/lib/transport/interception_chain.h',
'src/core/lib/transport/message.h',
'src/core/lib/transport/metadata.h',
'src/core/lib/transport/metadata_batch.h',
@ -2482,6 +2484,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_arena_allocator.h',
'src/core/lib/transport/call_destination.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_spine.h',
@ -2489,6 +2492,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/custom_metadata.h',
'src/core/lib/transport/error_utils.h',
'src/core/lib/transport/http2_errors.h',
'src/core/lib/transport/interception_chain.h',
'src/core/lib/transport/message.h',
'src/core/lib/transport/metadata.h',
'src/core/lib/transport/metadata_batch.h',

5
gRPC-Core.podspec generated

@ -1864,6 +1864,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_arena_allocator.cc',
'src/core/lib/transport/call_arena_allocator.h',
'src/core/lib/transport/call_destination.h',
'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.cc',
@ -1876,6 +1877,8 @@ Pod::Spec.new do |s|
'src/core/lib/transport/error_utils.cc',
'src/core/lib/transport/error_utils.h',
'src/core/lib/transport/http2_errors.h',
'src/core/lib/transport/interception_chain.cc',
'src/core/lib/transport/interception_chain.h',
'src/core/lib/transport/message.cc',
'src/core/lib/transport/message.h',
'src/core/lib/transport/metadata.cc',
@ -3261,6 +3264,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_arena_allocator.h',
'src/core/lib/transport/call_destination.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_spine.h',
@ -3268,6 +3272,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/custom_metadata.h',
'src/core/lib/transport/error_utils.h',
'src/core/lib/transport/http2_errors.h',
'src/core/lib/transport/interception_chain.h',
'src/core/lib/transport/message.h',
'src/core/lib/transport/metadata.h',
'src/core/lib/transport/metadata_batch.h',

3
grpc.gemspec generated

@ -1751,6 +1751,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/transport/bdp_estimator.h )
s.files += %w( src/core/lib/transport/call_arena_allocator.cc )
s.files += %w( src/core/lib/transport/call_arena_allocator.h )
s.files += %w( src/core/lib/transport/call_destination.h )
s.files += %w( src/core/lib/transport/call_filters.cc )
s.files += %w( src/core/lib/transport/call_filters.h )
s.files += %w( src/core/lib/transport/call_final_info.cc )
@ -1763,6 +1764,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/transport/error_utils.cc )
s.files += %w( src/core/lib/transport/error_utils.h )
s.files += %w( src/core/lib/transport/http2_errors.h )
s.files += %w( src/core/lib/transport/interception_chain.cc )
s.files += %w( src/core/lib/transport/interception_chain.h )
s.files += %w( src/core/lib/transport/message.cc )
s.files += %w( src/core/lib/transport/message.h )
s.files += %w( src/core/lib/transport/metadata.cc )

3
package.xml generated

@ -1733,6 +1733,7 @@
<file baseinstalldir="/" name="src/core/lib/transport/bdp_estimator.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_arena_allocator.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_arena_allocator.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_destination.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_filters.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_filters.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_final_info.cc" role="src" />
@ -1745,6 +1746,8 @@
<file baseinstalldir="/" name="src/core/lib/transport/error_utils.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/error_utils.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/http2_errors.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/interception_chain.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/interception_chain.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/message.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/message.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/metadata.cc" role="src" />

@ -664,7 +664,10 @@ grpc_cc_library(
grpc_cc_library(
name = "promise_like",
external_deps = ["absl/meta:type_traits"],
external_deps = [
"absl/functional:any_invocable",
"absl/meta:type_traits",
],
language = "c++",
public_hdrs = [
"lib/promise/detail/promise_like.h",
@ -3130,6 +3133,7 @@ grpc_cc_library(
"channel_fwd",
"channel_stack_trace",
"channel_stack_type",
"interception_chain",
"//:channel_stack_builder",
"//:debug_location",
"//:gpr",
@ -3147,6 +3151,7 @@ grpc_cc_library(
deps = [
"channel_args",
"//:channelz",
"//:event_engine_base_hdrs",
"//:gpr_platform",
],
)
@ -7087,6 +7092,7 @@ grpc_cc_library(
"experiments",
"iomgr_fwd",
"metadata_batch",
"resource_quota",
"slice",
"slice_buffer",
"status_helper",

@ -539,11 +539,11 @@ void RegisterLoggingFilter(LoggingSink* sink) {
g_logging_sink = sink;
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
builder->channel_init()
->RegisterFilter<ServerLoggingFilter>(GRPC_SERVER_CHANNEL)
->RegisterV2Filter<ServerLoggingFilter>(GRPC_SERVER_CHANNEL)
// TODO(yashykt) : Figure out a good place to place this channel arg
.IfChannelArg("grpc.experimental.enable_observability", true);
builder->channel_init()
->RegisterFilter<ClientLoggingFilter>(GRPC_CLIENT_CHANNEL)
->RegisterV2Filter<ClientLoggingFilter>(GRPC_CLIENT_CHANNEL)
// TODO(yashykt) : Figure out a good place to place this channel arg
.IfChannelArg("grpc.experimental.enable_observability", true);
});

@ -235,37 +235,27 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall(
FrameHeader frame_header, BufferPair buffers,
ChaoticGoodTransport& transport) {
ClientFragmentFrame fragment_frame;
ScopedArenaPtr arena(acceptor_->CreateArena());
ScopedArenaPtr arena(call_arena_allocator_->MakeArena());
absl::Status status = transport.DeserializeFrame(
frame_header, std::move(buffers), arena.get(), fragment_frame,
FrameLimits{1024 * 1024 * 1024, aligned_bytes_ - 1});
absl::optional<CallInitiator> call_initiator;
if (status.ok()) {
auto create_call_result = acceptor_->CreateCall(
std::move(fragment_frame.headers), arena.release());
if (grpc_chaotic_good_trace.enabled()) {
gpr_log(GPR_INFO,
"CHAOTIC_GOOD: DeserializeAndPushFragmentToNewCall: "
"create_call_result=%s",
create_call_result.ok()
? "ok"
: create_call_result.status().ToString().c_str());
}
if (create_call_result.ok()) {
call_initiator.emplace(std::move(*create_call_result));
auto add_result = NewStream(frame_header.stream_id, *call_initiator);
if (add_result.ok()) {
call_initiator->SpawnGuarded(
"server-write", [this, stream_id = frame_header.stream_id,
call_initiator = *call_initiator]() {
return CallOutboundLoop(stream_id, call_initiator);
});
} else {
call_initiator.reset();
status = add_result;
}
auto call =
MakeCallPair(std::move(fragment_frame.headers), event_engine_.get(),
arena.release(), call_arena_allocator_, nullptr);
call_initiator.emplace(std::move(call.initiator));
auto add_result = NewStream(frame_header.stream_id, *call_initiator);
if (add_result.ok()) {
call_destination_->StartCall(std::move(call.handler));
call_initiator->SpawnGuarded(
"server-write", [this, stream_id = frame_header.stream_id,
call_initiator = *call_initiator]() {
return CallOutboundLoop(stream_id, call_initiator);
});
} else {
status = create_call_result.status();
call_initiator.reset();
status = add_result;
}
}
return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status),
@ -366,10 +356,13 @@ ChaoticGoodServerTransport::ChaoticGoodServerTransport(
PromiseEndpoint data_endpoint,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine,
HPackParser hpack_parser, HPackCompressor hpack_encoder)
: outgoing_frames_(4),
allocator_(args.GetObject<ResourceQuota>()
->memory_quota()
->CreateMemoryAllocator("chaotic-good")) {
: call_arena_allocator_(MakeRefCounted<CallArenaAllocator>(
args.GetObject<ResourceQuota>()
->memory_quota()
->CreateMemoryAllocator("chaotic-good"),
1024)),
event_engine_(event_engine),
outgoing_frames_(4) {
auto transport = MakeRefCounted<ChaoticGoodTransport>(
std::move(control_endpoint), std::move(data_endpoint),
std::move(hpack_parser), std::move(hpack_encoder));
@ -381,20 +374,25 @@ ChaoticGoodServerTransport::ChaoticGoodServerTransport(
OnTransportActivityDone("reader"));
}
void ChaoticGoodServerTransport::SetAcceptor(Acceptor* acceptor) {
CHECK_EQ(acceptor_, nullptr);
CHECK_NE(acceptor, nullptr);
acceptor_ = acceptor;
void ChaoticGoodServerTransport::SetCallDestination(
RefCountedPtr<UnstartedCallDestination> call_destination) {
CHECK(call_destination_ == nullptr);
CHECK(call_destination != nullptr);
call_destination_ = call_destination;
got_acceptor_.Set();
}
ChaoticGoodServerTransport::~ChaoticGoodServerTransport() {
if (writer_ != nullptr) {
writer_.reset();
}
if (reader_ != nullptr) {
reader_.reset();
void ChaoticGoodServerTransport::Orphan() {
ActivityPtr writer;
ActivityPtr reader;
{
MutexLock lock(&mu_);
writer = std::move(writer_);
reader = std::move(reader_);
}
writer.reset();
reader.reset();
Unref();
}
void ChaoticGoodServerTransport::AbortWithError() {

@ -86,7 +86,6 @@ class ChaoticGoodServerTransport final : public ServerTransport {
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine,
HPackParser hpack_parser, HPackCompressor hpack_encoder);
~ChaoticGoodServerTransport() override;
FilterStackTransport* filter_stack_transport() override { return nullptr; }
ClientTransport* client_transport() override { return nullptr; }
@ -96,9 +95,10 @@ class ChaoticGoodServerTransport final : public ServerTransport {
void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
void PerformOp(grpc_transport_op*) override;
grpc_endpoint* GetEndpoint() override { return nullptr; }
void Orphan() override { Unref(); }
void Orphan() override;
void SetAcceptor(Acceptor* acceptor) override;
void SetCallDestination(
RefCountedPtr<UnstartedCallDestination> call_destination) override;
void AbortWithError();
private:
@ -137,7 +137,10 @@ class ChaoticGoodServerTransport final : public ServerTransport {
auto PushFragmentIntoCall(CallInitiator call_initiator,
ClientFragmentFrame frame, uint32_t stream_id);
Acceptor* acceptor_ = nullptr;
RefCountedPtr<UnstartedCallDestination> call_destination_;
const RefCountedPtr<CallArenaAllocator> call_arena_allocator_;
const std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine_;
InterActivityLatch<void> got_acceptor_;
MpscReceiver<ServerFrame> outgoing_frames_;
// Assigned aligned bytes from setting frame.
@ -146,7 +149,6 @@ class ChaoticGoodServerTransport final : public ServerTransport {
// Map of stream incoming server frames, key is stream_id.
StreamMap stream_map_ ABSL_GUARDED_BY(mu_);
uint32_t last_seen_new_stream_id_ = 0;
grpc_event_engine::experimental::MemoryAllocator allocator_;
ActivityPtr writer_ ABSL_GUARDED_BY(mu_);
ActivityPtr reader_ ABSL_GUARDED_BY(mu_);
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(mu_){

@ -28,6 +28,7 @@
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/surface/channel_create.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/server/server.h"
@ -39,8 +40,18 @@ class InprocClientTransport;
class InprocServerTransport final : public ServerTransport {
public:
void SetAcceptor(Acceptor* acceptor) override {
acceptor_ = acceptor;
explicit InprocServerTransport(const ChannelArgs& args)
: event_engine_(
args.GetObjectRef<grpc_event_engine::experimental::EventEngine>()),
call_arena_allocator_(MakeRefCounted<CallArenaAllocator>(
args.GetObject<ResourceQuota>()
->memory_quota()
->CreateMemoryAllocator("inproc_server"),
1024)) {}
void SetCallDestination(
RefCountedPtr<UnstartedCallDestination> unstarted_call_handler) override {
unstarted_call_handler_ = unstarted_call_handler;
ConnectionState expect = ConnectionState::kInitial;
state_.compare_exchange_strong(expect, ConnectionState::kReady,
std::memory_order_acq_rel,
@ -95,7 +106,11 @@ class InprocServerTransport final : public ServerTransport {
case ConnectionState::kReady:
break;
}
return acceptor_->CreateCall(std::move(md), acceptor_->CreateArena());
auto* arena = call_arena_allocator_->MakeArena();
auto server_call = MakeCallPair(std::move(md), event_engine_.get(), arena,
call_arena_allocator_, nullptr);
unstarted_call_handler_->StartCall(std::move(server_call.handler));
return std::move(server_call.initiator);
}
OrphanablePtr<InprocClientTransport> MakeClientTransport();
@ -105,11 +120,14 @@ class InprocServerTransport final : public ServerTransport {
std::atomic<ConnectionState> state_{ConnectionState::kInitial};
std::atomic<bool> disconnecting_{false};
Acceptor* acceptor_;
RefCountedPtr<UnstartedCallDestination> unstarted_call_handler_;
absl::Status disconnect_error_;
Mutex state_tracker_mu_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(state_tracker_mu_){
"inproc_server_transport", GRPC_CHANNEL_CONNECTING};
const std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine_;
const RefCountedPtr<CallArenaAllocator> call_arena_allocator_;
};
class InprocClientTransport final : public ClientTransport {
@ -118,16 +136,19 @@ class InprocClientTransport final : public ClientTransport {
RefCountedPtr<InprocServerTransport> server_transport)
: server_transport_(std::move(server_transport)) {}
void StartCall(CallHandler call_handler) override {
call_handler.SpawnGuarded(
void StartCall(CallHandler child_call_handler) override {
child_call_handler.SpawnGuarded(
"pull_initial_metadata",
TrySeq(call_handler.PullClientInitialMetadata(),
TrySeq(child_call_handler.PullClientInitialMetadata(),
[server_transport = server_transport_,
call_handler](ClientMetadataHandle md) {
auto call_initiator =
child_call_handler](ClientMetadataHandle md) {
auto server_call_initiator =
server_transport->AcceptCall(std::move(md));
if (!call_initiator.ok()) return call_initiator.status();
ForwardCall(call_handler, std::move(*call_initiator));
if (!server_call_initiator.ok()) {
return server_call_initiator.status();
}
ForwardCall(child_call_handler,
std::move(*server_call_initiator));
return absl::OkStatus();
}));
}
@ -155,7 +176,6 @@ class InprocClientTransport final : public ClientTransport {
bool UsePromiseBasedTransport() {
if (!IsPromiseBasedInprocTransportEnabled()) return false;
CHECK(IsPromiseBasedClientCallEnabled());
CHECK(IsPromiseBasedServerCallEnabled());
return true;
}
@ -180,7 +200,7 @@ OrphanablePtr<Channel> MakeLameChannel(absl::string_view why,
OrphanablePtr<Channel> MakeInprocChannel(Server* server,
ChannelArgs client_channel_args) {
auto transports = MakeInProcessTransportPair();
auto transports = MakeInProcessTransportPair(server->channel_args());
auto client_transport = std::move(transports.first);
auto server_transport = std::move(transports.second);
auto error =
@ -205,8 +225,9 @@ OrphanablePtr<Channel> MakeInprocChannel(Server* server,
} // namespace
std::pair<OrphanablePtr<Transport>, OrphanablePtr<Transport>>
MakeInProcessTransportPair() {
auto server_transport = MakeOrphanable<InprocServerTransport>();
MakeInProcessTransportPair(const ChannelArgs& server_channel_args) {
auto server_transport =
MakeOrphanable<InprocServerTransport>(server_channel_args);
auto client_transport = server_transport->MakeClientTransport();
return std::make_pair(std::move(client_transport),
std::move(server_transport));

@ -30,7 +30,7 @@ extern grpc_core::TraceFlag grpc_inproc_trace;
namespace grpc_core {
std::pair<OrphanablePtr<Transport>, OrphanablePtr<Transport>>
MakeInProcessTransportPair();
MakeInProcessTransportPair(const ChannelArgs& server_channel_args);
}

@ -305,13 +305,6 @@ grpc_core::NextPromiseFactory ClientNext(grpc_channel_element* elem) {
};
}
grpc_core::NextPromiseFactory ServerNext(grpc_channel_element* elem) {
return [elem](grpc_core::CallArgs args) {
return elem->filter->make_call_promise(elem, std::move(args),
ServerNext(elem - 1));
};
}
} // namespace
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
@ -319,12 +312,6 @@ grpc_channel_stack::MakeClientCallPromise(grpc_core::CallArgs call_args) {
return ClientNext(grpc_channel_stack_element(this, 0))(std::move(call_args));
}
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
grpc_channel_stack::MakeServerCallPromise(grpc_core::CallArgs call_args) {
return ServerNext(grpc_channel_stack_element(this, this->count - 1))(
std::move(call_args));
}
void grpc_channel_stack::InitClientCallSpine(
grpc_core::CallSpineInterface* call) {
for (size_t i = 0; i < count; i++) {
@ -338,19 +325,6 @@ void grpc_channel_stack::InitClientCallSpine(
}
}
void grpc_channel_stack::InitServerCallSpine(
grpc_core::CallSpineInterface* call) {
for (size_t i = 0; i < count; i++) {
auto* elem = grpc_channel_stack_element(this, count - 1 - i);
if (elem->filter->init_call == nullptr) {
grpc_core::Crash(
absl::StrCat("Filter '", elem->filter->name,
"' does not support the call-v3 interface"));
}
elem->filter->init_call(elem, call);
}
}
void grpc_call_log_op(const char* file, int line, gpr_log_severity severity,
grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {

@ -241,7 +241,6 @@ struct grpc_channel_stack {
MakeServerCallPromise(grpc_core::CallArgs call_args);
void InitClientCallSpine(grpc_core::CallSpineInterface* call);
void InitServerCallSpine(grpc_core::CallSpineInterface* call);
};
// A call stack tracks a set of related filters for one call, and guarantees

@ -88,23 +88,17 @@ const char* const additional_constraints_promise_based_client_call = "{}";
const uint8_t required_experiments_promise_based_client_call[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineClient),
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineListener)};
const char* const description_promise_based_server_call =
"If set, use the new gRPC promise based call code when it's appropriate "
"(ie when all filters in a stack are promise based)";
const char* const additional_constraints_promise_based_server_call = "{}";
const char* const description_chaotic_good =
"If set, enable the chaotic good load transport (this is mostly here for "
"testing)";
const char* const additional_constraints_chaotic_good = "{}";
const uint8_t required_experiments_chaotic_good[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedClientCall),
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedServerCall)};
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedClientCall)};
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const uint8_t required_experiments_promise_based_inproc_transport[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedClientCall),
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedServerCall)};
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedClientCall)};
const char* const description_rstpit =
"On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short "
"duration";
@ -184,15 +178,13 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"promise_based_client_call", description_promise_based_client_call,
additional_constraints_promise_based_client_call,
required_experiments_promise_based_client_call, 2, false, true},
{"promise_based_server_call", description_promise_based_server_call,
additional_constraints_promise_based_server_call, nullptr, 0, false, true},
{"chaotic_good", description_chaotic_good,
additional_constraints_chaotic_good, required_experiments_chaotic_good, 2,
additional_constraints_chaotic_good, required_experiments_chaotic_good, 1,
false, true},
{"promise_based_inproc_transport",
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport,
required_experiments_promise_based_inproc_transport, 2, false, false},
required_experiments_promise_based_inproc_transport, 1, false, false},
{"rstpit", description_rstpit, additional_constraints_rstpit, nullptr, 0,
false, true},
{"schedule_cancellation_over_write",
@ -288,23 +280,17 @@ const char* const additional_constraints_promise_based_client_call = "{}";
const uint8_t required_experiments_promise_based_client_call[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineClient),
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineListener)};
const char* const description_promise_based_server_call =
"If set, use the new gRPC promise based call code when it's appropriate "
"(ie when all filters in a stack are promise based)";
const char* const additional_constraints_promise_based_server_call = "{}";
const char* const description_chaotic_good =
"If set, enable the chaotic good load transport (this is mostly here for "
"testing)";
const char* const additional_constraints_chaotic_good = "{}";
const uint8_t required_experiments_chaotic_good[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedClientCall),
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedServerCall)};
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedClientCall)};
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const uint8_t required_experiments_promise_based_inproc_transport[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedClientCall),
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedServerCall)};
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedClientCall)};
const char* const description_rstpit =
"On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short "
"duration";
@ -384,15 +370,13 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"promise_based_client_call", description_promise_based_client_call,
additional_constraints_promise_based_client_call,
required_experiments_promise_based_client_call, 2, false, true},
{"promise_based_server_call", description_promise_based_server_call,
additional_constraints_promise_based_server_call, nullptr, 0, false, true},
{"chaotic_good", description_chaotic_good,
additional_constraints_chaotic_good, required_experiments_chaotic_good, 2,
additional_constraints_chaotic_good, required_experiments_chaotic_good, 1,
false, true},
{"promise_based_inproc_transport",
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport,
required_experiments_promise_based_inproc_transport, 2, false, false},
required_experiments_promise_based_inproc_transport, 1, false, false},
{"rstpit", description_rstpit, additional_constraints_rstpit, nullptr, 0,
false, true},
{"schedule_cancellation_over_write",
@ -488,23 +472,17 @@ const char* const additional_constraints_promise_based_client_call = "{}";
const uint8_t required_experiments_promise_based_client_call[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineClient),
static_cast<uint8_t>(grpc_core::kExperimentIdEventEngineListener)};
const char* const description_promise_based_server_call =
"If set, use the new gRPC promise based call code when it's appropriate "
"(ie when all filters in a stack are promise based)";
const char* const additional_constraints_promise_based_server_call = "{}";
const char* const description_chaotic_good =
"If set, enable the chaotic good load transport (this is mostly here for "
"testing)";
const char* const additional_constraints_chaotic_good = "{}";
const uint8_t required_experiments_chaotic_good[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedClientCall),
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedServerCall)};
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedClientCall)};
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const uint8_t required_experiments_promise_based_inproc_transport[] = {
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedClientCall),
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedServerCall)};
static_cast<uint8_t>(grpc_core::kExperimentIdPromiseBasedClientCall)};
const char* const description_rstpit =
"On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short "
"duration";
@ -584,15 +562,13 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"promise_based_client_call", description_promise_based_client_call,
additional_constraints_promise_based_client_call,
required_experiments_promise_based_client_call, 2, false, true},
{"promise_based_server_call", description_promise_based_server_call,
additional_constraints_promise_based_server_call, nullptr, 0, false, true},
{"chaotic_good", description_chaotic_good,
additional_constraints_chaotic_good, required_experiments_chaotic_good, 2,
additional_constraints_chaotic_good, required_experiments_chaotic_good, 1,
false, true},
{"promise_based_inproc_transport",
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport,
required_experiments_promise_based_inproc_transport, 2, false, false},
required_experiments_promise_based_inproc_transport, 1, false, false},
{"rstpit", description_rstpit, additional_constraints_rstpit, nullptr, 0,
false, true},
{"schedule_cancellation_over_write",

@ -79,7 +79,6 @@ inline bool IsPendingQueueCapEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW
inline bool IsPickFirstNewEnabled() { return true; }
inline bool IsPromiseBasedClientCallEnabled() { return false; }
inline bool IsPromiseBasedServerCallEnabled() { return false; }
inline bool IsChaoticGoodEnabled() { return false; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsRstpitEnabled() { return false; }
@ -119,7 +118,6 @@ inline bool IsPendingQueueCapEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW
inline bool IsPickFirstNewEnabled() { return true; }
inline bool IsPromiseBasedClientCallEnabled() { return false; }
inline bool IsPromiseBasedServerCallEnabled() { return false; }
inline bool IsChaoticGoodEnabled() { return false; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsRstpitEnabled() { return false; }
@ -159,7 +157,6 @@ inline bool IsPendingQueueCapEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW
inline bool IsPickFirstNewEnabled() { return true; }
inline bool IsPromiseBasedClientCallEnabled() { return false; }
inline bool IsPromiseBasedServerCallEnabled() { return false; }
inline bool IsChaoticGoodEnabled() { return false; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsRstpitEnabled() { return false; }
@ -195,7 +192,6 @@ enum ExperimentIds {
kExperimentIdPendingQueueCap,
kExperimentIdPickFirstNew,
kExperimentIdPromiseBasedClientCall,
kExperimentIdPromiseBasedServerCall,
kExperimentIdChaoticGood,
kExperimentIdPromiseBasedInprocTransport,
kExperimentIdRstpit,
@ -277,10 +273,6 @@ inline bool IsPickFirstNewEnabled() {
inline bool IsPromiseBasedClientCallEnabled() {
return IsExperimentEnabled(kExperimentIdPromiseBasedClientCall);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
inline bool IsPromiseBasedServerCallEnabled() {
return IsExperimentEnabled(kExperimentIdPromiseBasedServerCall);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_CHAOTIC_GOOD
inline bool IsChaoticGoodEnabled() {
return IsExperimentEnabled(kExperimentIdChaoticGood);

@ -64,7 +64,7 @@
If set, enable the chaotic good load transport (this is mostly here for testing)
expiry: 2024/09/09
owner: ctiller@google.com
requires: [promise_based_client_call, promise_based_server_call]
requires: [promise_based_client_call]
test_tags: [core_end2end_test]
- name: client_privacy
description:
@ -170,14 +170,7 @@
owner: ctiller@google.com
test_tags: []
allow_in_fuzzing_config: false # experiment currently crashes if enabled
requires: [promise_based_client_call, promise_based_server_call]
- name: promise_based_server_call
description:
If set, use the new gRPC promise based call code when it's appropriate
(ie when all filters in a stack are promise based)
expiry: 2024/06/14
owner: ctiller@google.com
test_tags: ["core_end2end_test", "logging_test"]
requires: [promise_based_client_call]
- name: rstpit
description:
On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short duration

@ -94,8 +94,6 @@
ios: broken
windows: broken
posix: false
- name: promise_based_server_call
default: false
- name: rstpit
default: false
- name: schedule_cancellation_over_write

@ -17,6 +17,7 @@
#include <utility>
#include "absl/functional/any_invocable.h"
#include "absl/meta/type_traits.h"
#include <grpc/support/port_platform.h>
@ -63,6 +64,10 @@ auto WrapInPoll(T&& x) -> decltype(PollWrapper<T>::Wrap(std::forward<T>(x))) {
return PollWrapper<T>::Wrap(std::forward<T>(x));
}
// T -> T, const T& -> T
template <typename T>
using RemoveCVRef = absl::remove_cv_t<absl::remove_reference_t<T>>;
template <typename F, typename SfinaeVoid = void>
class PromiseLike;
@ -73,7 +78,7 @@ template <typename F>
class PromiseLike<F, absl::enable_if_t<!std::is_void<
typename std::result_of<F()>::type>::value>> {
private:
GPR_NO_UNIQUE_ADDRESS F f_;
GPR_NO_UNIQUE_ADDRESS RemoveCVRef<F> f_;
public:
// NOLINTNEXTLINE - internal detail that drastically simplifies calling code.
@ -82,10 +87,6 @@ class PromiseLike<F, absl::enable_if_t<!std::is_void<
using Result = typename PollTraits<decltype(WrapInPoll(f_()))>::Type;
};
// T -> T, const T& -> T
template <typename T>
using RemoveCVRef = absl::remove_cv_t<absl::remove_reference_t<T>>;
} // namespace promise_detail
} // namespace grpc_core

@ -171,6 +171,8 @@ class ValueOrFailure {
T& value() { return value_.value(); }
const T& operator*() const { return *value_; }
T& operator*() { return *value_; }
const T* operator->() const { return &*value_; }
T* operator->() { return &*value_; }
bool operator==(const ValueOrFailure& other) const {
return value_ == other.value_;

File diff suppressed because it is too large Load Diff

@ -82,7 +82,7 @@ class Call : public CppImplOf<Call, grpc_call>,
public grpc_event_engine::experimental::EventEngine::
Closure /* for deadlines */ {
public:
Arena* arena() { return arena_; }
virtual Arena* arena() = 0;
bool is_client() const { return is_client_; }
virtual void ContextSet(grpc_context_index elem, void* value,
@ -92,7 +92,7 @@ class Call : public CppImplOf<Call, grpc_call>,
void CancelWithStatus(grpc_status_code status, const char* description);
virtual void CancelWithError(grpc_error_handle error) = 0;
virtual void SetCompletionQueue(grpc_completion_queue* cq) = 0;
char* GetPeer();
virtual char* GetPeer() = 0;
virtual grpc_call_error StartBatch(const grpc_op* ops, size_t nops,
void* notify_tag,
bool is_notify_tag_closure) = 0;
@ -157,25 +157,15 @@ class Call : public CppImplOf<Call, grpc_call>,
Call* sibling_prev = nullptr;
};
Call(Arena* arena, bool is_client, Timestamp send_deadline,
RefCountedPtr<Channel> channel)
: channel_(std::move(channel)),
arena_(arena),
send_deadline_(send_deadline),
is_client_(is_client) {
DCHECK_NE(arena_, nullptr);
DCHECK(channel_ != nullptr);
}
Call(bool is_client, Timestamp send_deadline,
grpc_event_engine::experimental::EventEngine* event_engine)
: send_deadline_(send_deadline),
is_client_(is_client),
event_engine_(event_engine) {}
~Call() override = default;
void DeleteThis();
ParentCall* GetOrCreateParentCall();
ParentCall* parent_call();
Channel* channel() const {
DCHECK(channel_ != nullptr);
return channel_.get();
}
absl::Status InitParent(Call* parent, uint32_t propagation_mask);
void PublishToParent(Call* parent);
@ -221,9 +211,9 @@ class Call : public CppImplOf<Call, grpc_call>,
gpr_cycle_counter start_time() const { return start_time_; }
virtual grpc_compression_options compression_options() = 0;
private:
RefCountedPtr<Channel> channel_;
Arena* const arena_;
std::atomic<ParentCall*> parent_call_{nullptr};
ChildCall* child_ = nullptr;
Timestamp send_deadline_;
@ -247,34 +237,13 @@ class Call : public CppImplOf<Call, grpc_call>,
Timestamp deadline_ ABSL_GUARDED_BY(deadline_mu_) = Timestamp::InfFuture();
grpc_event_engine::experimental::EventEngine::TaskHandle ABSL_GUARDED_BY(
deadline_mu_) deadline_task_;
grpc_event_engine::experimental::EventEngine* const event_engine_;
gpr_cycle_counter start_time_ = gpr_get_cycle_counter();
};
class BasicPromiseBasedCall;
class ServerPromiseBasedCall;
class ServerCallContext {
public:
virtual void PublishInitialMetadata(
ClientMetadataHandle metadata,
grpc_metadata_array* publish_initial_metadata) = 0;
// Construct the top of the server call promise for the v2 filter stack.
// TODO(ctiller): delete when v3 is available.
virtual ArenaPromise<ServerMetadataHandle> MakeTopOfServerCallPromise(
CallArgs call_args, grpc_completion_queue* cq,
absl::FunctionRef<void(grpc_call* call)> publish) = 0;
// Server stream data as supplied by the transport (so we can link the
// transport stream up with the call again).
// TODO(ctiller): legacy API - once we move transports to promises we'll
// create the promise directly and not need to pass around this token.
virtual const void* server_stream_data() = 0;
protected:
~ServerCallContext() = default;
};
// TODO(ctiller): move more call things into this type
class CallContext {
public:
@ -300,8 +269,6 @@ class CallContext {
gpr_atm* peer_string_atm_ptr();
gpr_cycle_counter call_start_time() { return start_time_; }
ServerCallContext* server_call_context();
void set_traced(bool traced) { traced_ = traced; }
bool traced() const { return traced_; }
@ -329,9 +296,10 @@ template <>
struct ContextType<CallContext> {};
// TODO(ctiller): remove once call-v3 finalized
RefCountedPtr<CallSpineInterface> MakeServerCall(
ClientMetadataHandle client_initial_metadata, ServerInterface* server,
Channel* channel, Arena* arena);
grpc_call* MakeServerCall(CallHandler call_handler,
ClientMetadataHandle client_initial_metadata,
ServerInterface* server, grpc_completion_queue* cq,
grpc_metadata_array* publish_initial_metadata);
} // namespace grpc_core

@ -104,9 +104,9 @@ ChannelInit::FilterRegistration::ExcludeFromMinimalStack() {
ChannelInit::FilterRegistration& ChannelInit::Builder::RegisterFilter(
grpc_channel_stack_type type, const grpc_channel_filter* filter,
const ChannelFilterVtable* vtable, SourceLocation registration_source) {
FilterAdder filter_adder, SourceLocation registration_source) {
filters_[type].emplace_back(std::make_unique<FilterRegistration>(
filter, vtable, registration_source));
filter, filter_adder, registration_source));
return *filters_[type].back();
}
@ -223,9 +223,10 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig(
while (!dependencies.empty()) {
auto filter = take_ready_dependency();
auto* registration = filter_to_registration[filter];
filters.emplace_back(
filter, registration->vtable_, std::move(registration->predicates_),
registration->skip_v3_, registration->registration_source_);
filters.emplace_back(filter, registration->filter_adder_,
std::move(registration->predicates_),
registration->skip_v3_,
registration->registration_source_);
for (auto& p : dependencies) {
p.second.erase(filter);
}
@ -406,78 +407,21 @@ bool ChannelInit::CreateStack(ChannelStackBuilder* builder) const {
return true;
}
absl::StatusOr<ChannelInit::StackSegment> ChannelInit::CreateStackSegment(
grpc_channel_stack_type type, const ChannelArgs& args) const {
void ChannelInit::AddToInterceptionChainBuilder(
grpc_channel_stack_type type, InterceptionChainBuilder& builder) const {
const auto& stack_config = stack_configs_[type];
std::vector<StackSegment::ChannelFilter> filters;
size_t channel_data_size = 0;
size_t channel_data_alignment = 0;
// Based on predicates build a list of filters to include in this segment.
for (const auto& filter : stack_config.filters) {
if (filter.skip_v3) continue;
if (!filter.CheckPredicates(args)) continue;
if (filter.vtable == nullptr) {
return absl::InvalidArgumentError(
if (!filter.CheckPredicates(builder.channel_args())) continue;
if (filter.filter_adder == nullptr) {
builder.Fail(absl::InvalidArgumentError(
absl::StrCat("Filter ", NameFromChannelFilter(filter.filter),
" has no v3-callstack vtable"));
" has no v3-callstack vtable")));
return;
}
channel_data_alignment =
std::max(channel_data_alignment, filter.vtable->alignment);
if (channel_data_size % filter.vtable->alignment != 0) {
channel_data_size += filter.vtable->alignment -
(channel_data_size % filter.vtable->alignment);
}
filters.push_back({channel_data_size, filter.vtable});
channel_data_size += filter.vtable->size;
}
// Shortcut for empty segments.
if (filters.empty()) return StackSegment();
// Allocate memory for the channel data, initialize channel filters into it.
uint8_t* p = static_cast<uint8_t*>(
gpr_malloc_aligned(channel_data_size, channel_data_alignment));
for (size_t i = 0; i < filters.size(); i++) {
auto r = filters[i].vtable->init(p + filters[i].offset, args);
if (!r.ok()) {
for (size_t j = 0; j < i; j++) {
filters[j].vtable->destroy(p + filters[j].offset);
}
gpr_free_aligned(p);
return r;
}
}
return StackSegment(std::move(filters), p);
}
///////////////////////////////////////////////////////////////////////////////
// ChannelInit::StackSegment
ChannelInit::StackSegment::StackSegment(std::vector<ChannelFilter> filters,
uint8_t* channel_data)
: data_(MakeRefCounted<ChannelData>(std::move(filters), channel_data)) {}
void ChannelInit::StackSegment::AddToCallFilterStack(
CallFilters::StackBuilder& builder) {
if (data_ == nullptr) return;
data_->AddToCallFilterStack(builder);
builder.AddOwnedObject(data_);
};
ChannelInit::StackSegment::ChannelData::ChannelData(
std::vector<ChannelFilter> filters, uint8_t* channel_data)
: filters_(std::move(filters)), channel_data_(channel_data) {}
void ChannelInit::StackSegment::ChannelData::AddToCallFilterStack(
CallFilters::StackBuilder& builder) {
for (const auto& filter : filters_) {
filter.vtable->add_to_stack_builder(channel_data_ + filter.offset, builder);
}
}
ChannelInit::StackSegment::ChannelData::~ChannelData() {
for (const auto& filter : filters_) {
filter.vtable->destroy(channel_data_ + filter.offset);
filter.filter_adder(builder);
}
gpr_free_aligned(channel_data_);
}
} // namespace grpc_core

@ -38,6 +38,7 @@
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/call_filters.h"
#include "src/core/lib/transport/interception_chain.h"
/// This module provides a way for plugins (and the grpc core library itself)
/// to register mutators for channel stacks.
@ -68,6 +69,8 @@ class ChannelInit {
using InclusionPredicate = absl::AnyInvocable<bool(const ChannelArgs&) const>;
// Post processor for the channel stack - applied in PostProcessorSlot order
using PostProcessor = absl::AnyInvocable<void(ChannelStackBuilder&) const>;
// Function that can be called to add a filter to a stack builder
using FilterAdder = void (*)(InterceptionChainBuilder&);
// Post processing slots - up to one PostProcessor per slot can be registered
// They run after filters registered are added to the channel stack builder,
// but before Build is called - allowing ad-hoc mutation to the channel stack.
@ -77,25 +80,15 @@ class ChannelInit {
kCount
};
// Vtable-like data structure for channel data initialization
struct ChannelFilterVtable {
size_t size;
size_t alignment;
absl::Status (*init)(void* data, const ChannelArgs& args);
void (*destroy)(void* data);
void (*add_to_stack_builder)(void* data,
CallFilters::StackBuilder& builder);
};
class FilterRegistration {
public:
// TODO(ctiller): Remove grpc_channel_filter* arg when that can be
// deprecated (once filter stack is removed).
explicit FilterRegistration(const grpc_channel_filter* filter,
const ChannelFilterVtable* vtable,
FilterAdder filter_adder,
SourceLocation registration_source)
: filter_(filter),
vtable_(vtable),
filter_adder_(filter_adder),
registration_source_(registration_source) {}
FilterRegistration(const FilterRegistration&) = delete;
FilterRegistration& operator=(const FilterRegistration&) = delete;
@ -170,7 +163,7 @@ class ChannelInit {
private:
friend class ChannelInit;
const grpc_channel_filter* const filter_;
const ChannelFilterVtable* const vtable_;
const FilterAdder filter_adder_;
std::vector<const grpc_channel_filter*> after_;
std::vector<const grpc_channel_filter*> before_;
std::vector<InclusionPredicate> predicates_;
@ -188,16 +181,17 @@ class ChannelInit {
// properties of the filter being registered.
// TODO(ctiller): remove in favor of the version that does not mention
// grpc_channel_filter
FilterRegistration& RegisterFilter(
grpc_channel_stack_type type, const grpc_channel_filter* filter,
const ChannelFilterVtable* vtable = nullptr,
SourceLocation registration_source = {});
FilterRegistration& RegisterFilter(grpc_channel_stack_type type,
const grpc_channel_filter* filter,
FilterAdder filter_adder = nullptr,
SourceLocation registration_source = {});
template <typename Filter>
FilterRegistration& RegisterFilter(
grpc_channel_stack_type type, SourceLocation registration_source = {}) {
return RegisterFilter(type, &Filter::kFilter,
VtableForType<Filter>::vtable(),
registration_source);
return RegisterFilter(
type, &Filter::kFilter,
[](InterceptionChainBuilder& builder) { builder.Add<Filter>(); },
registration_source);
}
// Filter does not participate in v3
@ -232,58 +226,13 @@ class ChannelInit {
[static_cast<int>(PostProcessorSlot::kCount)];
};
// A set of channel filters that can be added to a call stack.
// TODO(ctiller): move this out so it can be used independently of
// the global registration mechanisms.
class StackSegment final {
public:
// Registration of one channel filter in the stack.
struct ChannelFilter {
size_t offset;
const ChannelFilterVtable* vtable;
};
StackSegment() = default;
explicit StackSegment(std::vector<ChannelFilter> filters,
uint8_t* channel_data);
StackSegment(const StackSegment& other) = delete;
StackSegment& operator=(const StackSegment& other) = delete;
StackSegment(StackSegment&& other) noexcept = default;
StackSegment& operator=(StackSegment&& other) = default;
// Add this segment to a call filter stack builder
void AddToCallFilterStack(CallFilters::StackBuilder& builder);
private:
// Combined channel data for the stack
class ChannelData : public RefCounted<ChannelData> {
public:
explicit ChannelData(std::vector<ChannelFilter> filters,
uint8_t* channel_data);
~ChannelData() override;
void AddToCallFilterStack(CallFilters::StackBuilder& builder);
private:
std::vector<ChannelFilter> filters_;
uint8_t* channel_data_;
};
RefCountedPtr<ChannelData> data_;
};
/// Construct a channel stack of some sort: see channel_stack.h for details
/// \a builder is the channel stack builder to build into.
GRPC_MUST_USE_RESULT
bool CreateStack(ChannelStackBuilder* builder) const;
// Create a segment of a channel stack.
// Terminators and post processors are not included in this construction:
// terminators are a legacy filter-stack concept, and post processors
// need to migrate to other mechanisms.
// TODO(ctiller): figure out other mechanisms.
absl::StatusOr<StackSegment> CreateStackSegment(
grpc_channel_stack_type type, const ChannelArgs& args) const;
void AddToInterceptionChainBuilder(grpc_channel_stack_type type,
InterceptionChainBuilder& builder) const;
private:
// The type of object returned by a filter's Create method.
@ -292,16 +241,16 @@ class ChannelInit {
typename decltype(T::Create(ChannelArgs(), {}))::value_type;
struct Filter {
Filter(const grpc_channel_filter* filter, const ChannelFilterVtable* vtable,
Filter(const grpc_channel_filter* filter, FilterAdder filter_adder,
std::vector<InclusionPredicate> predicates, bool skip_v3,
SourceLocation registration_source)
: filter(filter),
vtable(vtable),
filter_adder(filter_adder),
predicates(std::move(predicates)),
registration_source(registration_source),
skip_v3(skip_v3) {}
const grpc_channel_filter* filter;
const ChannelFilterVtable* vtable;
const FilterAdder filter_adder;
std::vector<InclusionPredicate> predicates;
SourceLocation registration_source;
bool skip_v3 = false;
@ -313,17 +262,6 @@ class ChannelInit {
std::vector<PostProcessor> post_processors;
};
template <typename T, typename = void>
struct VtableForType {
static const ChannelFilterVtable* vtable() { return nullptr; }
};
template <typename T>
struct VtableForType<T, absl::void_t<typename T::Call>> {
static const ChannelFilterVtable kVtable;
static const ChannelFilterVtable* vtable() { return &kVtable; }
};
StackConfig stack_configs_[GRPC_NUM_CHANNEL_STACK_TYPES];
static StackConfig BuildStackConfig(
@ -331,22 +269,6 @@ class ChannelInit {
PostProcessor* post_processors, grpc_channel_stack_type type);
};
template <typename T>
const ChannelInit::ChannelFilterVtable
ChannelInit::VtableForType<T, absl::void_t<typename T::Call>>::kVtable = {
sizeof(CreatedType<T>), alignof(CreatedType<T>),
[](void* data, const ChannelArgs& args) -> absl::Status {
// TODO(ctiller): fill in ChannelFilter::Args (2nd arg)
absl::StatusOr<CreatedType<T>> r = T::Create(args, {});
if (!r.ok()) return r.status();
new (data) CreatedType<T>(std::move(*r));
return absl::OkStatus();
},
[](void* data) { Destruct(static_cast<CreatedType<T>*>(data)); },
[](void* data, CallFilters::StackBuilder& builder) {
builder.Add(static_cast<CreatedType<T>*>(data)->get());
}};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_SURFACE_CHANNEL_INIT_H

@ -66,10 +66,10 @@ static bool g_shutting_down ABSL_GUARDED_BY(g_init_mu) = false;
namespace grpc_core {
void RegisterSecurityFilters(CoreConfiguration::Builder* builder) {
builder->channel_init()
->RegisterFilter<ClientAuthFilter>(GRPC_CLIENT_SUBCHANNEL)
->RegisterV2Filter<ClientAuthFilter>(GRPC_CLIENT_SUBCHANNEL)
.IfHasChannelArg(GRPC_ARG_SECURITY_CONNECTOR);
builder->channel_init()
->RegisterFilter<ClientAuthFilter>(GRPC_CLIENT_DIRECT_CHANNEL)
->RegisterV2Filter<ClientAuthFilter>(GRPC_CLIENT_DIRECT_CHANNEL)
.IfHasChannelArg(GRPC_ARG_SECURITY_CONNECTOR);
builder->channel_init()
->RegisterFilter<ServerAuthFilter>(GRPC_SERVER_CHANNEL)

@ -51,6 +51,24 @@ class CallDestination : public DualRefCounted<CallDestination> {
virtual void HandleCall(CallHandler unstarted_call_handler) = 0;
};
template <typename HC>
auto MakeCallDestinationFromHandlerFunction(HC handle_call) {
class Impl : public CallDestination {
public:
explicit Impl(HC handle_call) : handle_call_(std::move(handle_call)) {}
void Orphaned() override {}
void HandleCall(CallHandler call_handler) override {
handle_call_(std::move(call_handler));
}
private:
HC handle_call_;
};
return MakeRefCounted<Impl>(std::move(handle_call));
}
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_DESTINATION_H

@ -1245,6 +1245,42 @@ const NoInterceptor
template <typename Fn>
const NoInterceptor ServerTrailingMetadataInterceptor<Fn>::Call::OnFinalize;
template <typename Fn>
class ClientInitialMetadataInterceptor {
public:
class Call {
public:
auto OnClientInitialMetadata(ClientMetadata& md,
ClientInitialMetadataInterceptor* filter) {
return filter->fn_(md);
}
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
};
explicit ClientInitialMetadataInterceptor(Fn fn) : fn_(std::move(fn)) {}
private:
GPR_NO_UNIQUE_ADDRESS Fn fn_;
};
template <typename Fn>
const NoInterceptor
ClientInitialMetadataInterceptor<Fn>::Call::OnServerInitialMetadata;
template <typename Fn>
const NoInterceptor
ClientInitialMetadataInterceptor<Fn>::Call::OnClientToServerMessage;
template <typename Fn>
const NoInterceptor
ClientInitialMetadataInterceptor<Fn>::Call::OnServerToClientMessage;
template <typename Fn>
const NoInterceptor
ClientInitialMetadataInterceptor<Fn>::Call::OnServerTrailingMetadata;
template <typename Fn>
const NoInterceptor ClientInitialMetadataInterceptor<Fn>::Call::OnFinalize;
} // namespace filters_detail
// Execution environment for a stack of filters.
@ -1302,6 +1338,14 @@ class CallFilters {
AddOwnedObject([](void* p) { delete static_cast<T*>(p); }, p.release());
}
template <typename Fn>
void AddOnClientInitialMetadata(Fn fn) {
auto filter = std::make_unique<
filters_detail::ClientInitialMetadataInterceptor<Fn>>(std::move(fn));
Add(filter.get());
AddOwnedObject(std::move(filter));
}
template <typename Fn>
void AddOnServerTrailingMetadata(Fn fn) {
auto filter = std::make_unique<

@ -518,6 +518,8 @@ class CallHandler {
auto PullMessage() { return spine_->PullClientToServerMessage(); }
auto WasCancelled() { return spine_->WasCancelled(); }
template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory,
DebugLocation whence = {}) {
@ -536,11 +538,15 @@ class CallHandler {
Arena* arena() { return spine_->arena(); }
grpc_event_engine::experimental::EventEngine* event_engine() {
grpc_event_engine::experimental::EventEngine* event_engine() const {
return DownCast<CallSpine*>(spine_.get())->event_engine();
}
// TODO(ctiller): re-evaluate this API
const grpc_call_context_element* legacy_context() const {
return DownCast<CallSpine*>(spine_.get())->legacy_context();
}
grpc_call_context_element* legacy_context() {
return DownCast<CallSpine*>(spine_.get())->legacy_context();
}

@ -175,12 +175,23 @@ class InterceptionChainBuilder final {
return *this;
};
// Add a filter that just mutates client initial metadata.
template <typename F>
void AddOnClientInitialMetadata(F f) {
stack_builder().AddOnClientInitialMetadata(std::move(f));
}
// Add a filter that just mutates server trailing metadata.
template <typename F>
void AddOnServerTrailingMetadata(F f) {
stack_builder().AddOnServerTrailingMetadata(std::move(f));
}
void Fail(absl::Status status) {
CHECK(!status.ok()) << status;
if (status_.ok()) status_ = std::move(status);
}
// Build this stack
absl::StatusOr<RefCountedPtr<UnstartedCallDestination>> Build(
FinalDestination final_destination);

@ -55,6 +55,7 @@
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/call_destination.h"
#include "src/core/lib/transport/call_final_info.h"
#include "src/core/lib/transport/call_spine.h"
#include "src/core/lib/transport/connectivity_state.h"
@ -481,6 +482,15 @@ typedef struct grpc_transport_op {
grpc_handler_private_op_data handler_private;
} grpc_transport_op;
// Allocate a grpc_transport_op, and preconfigure the on_complete closure to
// \a on_complete and then delete the returned transport op
grpc_transport_op* grpc_make_transport_op(grpc_closure* on_complete);
// Allocate a grpc_transport_stream_op_batch, and preconfigure the on_complete
// closure
// to \a on_complete and then delete the returned transport op
grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
grpc_closure* on_complete);
void grpc_transport_stream_op_batch_finish_with_failure(
grpc_transport_stream_op_batch* batch, grpc_error_handle error,
grpc_core::CallCombiner* call_combiner);
@ -507,6 +517,21 @@ class Transport : public InternallyRefCounted<Transport> {
struct RawPointerChannelArgTag {};
static absl::string_view ChannelArgName() { return GRPC_ARG_TRANSPORT; }
// Though internally ref counted transports expose their "Ref" method to
// create a RefCountedPtr to themselves. The OrphanablePtr owner is the
// singleton decision maker on whether the transport should be destroyed or
// not.
// TODO(ctiller): consider moving to a DualRefCounted model (with the
// disadvantage that we would accidentally have many strong owners which is
// unnecessary for this type).
RefCountedPtr<Transport> Ref() {
return InternallyRefCounted<Transport>::Ref();
}
template <typename T>
RefCountedPtr<T> RefAsSubclass() {
return InternallyRefCounted<Transport>::RefAsSubclass<T>();
}
virtual FilterStackTransport* filter_stack_transport() = 0;
virtual ClientTransport* client_transport() = 0;
virtual ServerTransport* server_transport() = 0;
@ -527,6 +552,20 @@ class Transport : public InternallyRefCounted<Transport> {
// implementation of grpc_transport_perform_op
virtual void PerformOp(grpc_transport_op* op) = 0;
void StartConnectivityWatch(
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->start_connectivity_watch = std::move(watcher);
PerformOp(op);
}
void DisconnectWithError(grpc_error_handle error) {
CHECK(!error.ok()) << error;
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error = error;
PerformOp(op);
}
// implementation of grpc_transport_get_endpoint
virtual grpc_endpoint* GetEndpoint() = 0;
};
@ -582,24 +621,9 @@ class ClientTransport : public Transport {
class ServerTransport : public Transport {
public:
// Acceptor helps transports create calls.
class Acceptor {
public:
// Returns an arena that can be used to allocate memory for initial metadata
// parsing, and later passed to CreateCall() as the underlying arena for
// that call.
virtual Arena* CreateArena() = 0;
// Create a call at the server (or fail)
// arena must have been previously allocated by CreateArena()
virtual absl::StatusOr<CallInitiator> CreateCall(
ClientMetadataHandle client_initial_metadata, Arena* arena) = 0;
protected:
~Acceptor() = default;
};
// Called once slightly after transport setup to register the accept function.
virtual void SetAcceptor(Acceptor* acceptor) = 0;
virtual void SetCallDestination(
RefCountedPtr<UnstartedCallDestination> unstarted_call_handler) = 0;
protected:
~ServerTransport() override = default;
@ -607,15 +631,6 @@ class ServerTransport : public Transport {
} // namespace grpc_core
// Allocate a grpc_transport_op, and preconfigure the on_complete closure to
// \a on_complete and then delete the returned transport op
grpc_transport_op* grpc_make_transport_op(grpc_closure* on_complete);
// Allocate a grpc_transport_stream_op_batch, and preconfigure the on_complete
// closure
// to \a on_complete and then delete the returned transport op
grpc_transport_stream_op_batch* grpc_make_transport_stream_op(
grpc_closure* on_complete);
namespace grpc_core {
// This is the key to be used for loading/storing keepalive_throttling in the
// absl::Status object.

@ -79,10 +79,11 @@ namespace {
void RegisterBuiltins(CoreConfiguration::Builder* builder) {
RegisterServerCallTracerFilter(builder);
builder->channel_init()
->RegisterFilter<LameClientFilter>(GRPC_CLIENT_LAME_CHANNEL)
->RegisterV2Filter<LameClientFilter>(GRPC_CLIENT_LAME_CHANNEL)
.Terminal();
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &Server::kServerTopFilter)
.SkipV3()
.BeforeAll();
}

@ -57,6 +57,7 @@
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
@ -81,6 +82,7 @@
#include "src/core/lib/surface/wait_for_cq_end_op.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/interception_chain.h"
namespace grpc_core {
@ -235,7 +237,8 @@ struct Server::RequestedCall {
template <typename OptionalPayload>
void Complete(OptionalPayload payload, ClientMetadata& md) {
Timestamp deadline = GetContext<Call>()->deadline();
Timestamp deadline =
md.get(GrpcTimeoutMetadata()).value_or(Timestamp::InfFuture());
switch (type) {
case RequestedCall::Type::BATCH_CALL:
CHECK(!payload.has_value());
@ -288,23 +291,29 @@ struct Server::RequestedCall {
// application to explicitly request RPCs and then matching those to incoming
// RPCs, along with a slow path by which incoming RPCs are put on a locked
// pending list if they aren't able to be matched to an application request.
class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface {
class Server::RealRequestMatcher : public RequestMatcherInterface {
public:
explicit RealRequestMatcherFilterStack(Server* server)
explicit RealRequestMatcher(Server* server)
: server_(server), requests_per_cq_(server->cqs_.size()) {}
~RealRequestMatcherFilterStack() override {
~RealRequestMatcher() override {
for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) {
CHECK_EQ(queue.Pop(), nullptr);
}
CHECK(pending_.empty());
CHECK(pending_filter_stack_.empty());
CHECK(pending_promises_.empty());
}
void ZombifyPending() override {
while (!pending_.empty()) {
pending_.front().calld->SetState(CallData::CallState::ZOMBIED);
pending_.front().calld->KillZombie();
pending_.pop();
while (!pending_filter_stack_.empty()) {
pending_filter_stack_.front().calld->SetState(
CallData::CallState::ZOMBIED);
pending_filter_stack_.front().calld->KillZombie();
pending_filter_stack_.pop();
}
while (!pending_promises_.empty()) {
pending_promises_.front()->Finish(absl::InternalError("Server closed"));
pending_promises_.pop();
}
}
@ -329,35 +338,56 @@ class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface {
// matching calls
struct NextPendingCall {
RequestedCall* rc = nullptr;
CallData* pending;
CallData* pending_filter_stack = nullptr;
PendingCallPromises pending_promise;
};
while (true) {
NextPendingCall pending_call;
{
MutexLock lock(&server_->mu_call_);
while (!pending_.empty() &&
pending_.front().Age() > server_->max_time_in_pending_queue_) {
pending_.front().calld->SetState(CallData::CallState::ZOMBIED);
pending_.front().calld->KillZombie();
pending_.pop();
while (!pending_filter_stack_.empty() &&
pending_filter_stack_.front().Age() >
server_->max_time_in_pending_queue_) {
pending_filter_stack_.front().calld->SetState(
CallData::CallState::ZOMBIED);
pending_filter_stack_.front().calld->KillZombie();
pending_filter_stack_.pop();
}
if (!pending_.empty()) {
if (!pending_promises_.empty()) {
pending_call.rc = reinterpret_cast<RequestedCall*>(
requests_per_cq_[request_queue_index].Pop());
if (pending_call.rc != nullptr) {
pending_call.pending = pending_.front().calld;
pending_.pop();
pending_call.pending_promise =
std::move(pending_promises_.front());
pending_promises_.pop();
}
} else if (!pending_filter_stack_.empty()) {
pending_call.rc = reinterpret_cast<RequestedCall*>(
requests_per_cq_[request_queue_index].Pop());
if (pending_call.rc != nullptr) {
pending_call.pending_filter_stack =
pending_filter_stack_.front().calld;
pending_filter_stack_.pop();
}
}
}
if (pending_call.rc == nullptr) break;
if (!pending_call.pending->MaybeActivate()) {
// Zombied Call
pending_call.pending->KillZombie();
requests_per_cq_[request_queue_index].Push(
&pending_call.rc->mpscq_node);
if (pending_call.pending_filter_stack != nullptr) {
if (!pending_call.pending_filter_stack->MaybeActivate()) {
// Zombied Call
pending_call.pending_filter_stack->KillZombie();
requests_per_cq_[request_queue_index].Push(
&pending_call.rc->mpscq_node);
} else {
pending_call.pending_filter_stack->Publish(request_queue_index,
pending_call.rc);
}
} else {
pending_call.pending->Publish(request_queue_index, pending_call.rc);
if (!pending_call.pending_promise->Finish(
server(), request_queue_index, pending_call.rc)) {
requests_per_cq_[request_queue_index].Push(
&pending_call.rc->mpscq_node);
}
}
}
}
@ -395,7 +425,7 @@ class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface {
}
if (rc == nullptr) {
calld->SetState(CallData::CallState::PENDING);
pending_.push(PendingCall{calld});
pending_filter_stack_.push(PendingCallFilterStack{calld});
return;
}
}
@ -403,91 +433,6 @@ class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface {
calld->Publish(cq_idx, rc);
}
ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(size_t) override {
Crash("not implemented for filter stack request matcher");
}
Server* server() const final { return server_; }
private:
Server* const server_;
struct PendingCall {
CallData* calld;
Timestamp created = Timestamp::Now();
Duration Age() { return Timestamp::Now() - created; }
};
std::queue<PendingCall> pending_;
std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
};
class Server::RealRequestMatcherPromises : public RequestMatcherInterface {
public:
explicit RealRequestMatcherPromises(Server* server)
: server_(server), requests_per_cq_(server->cqs_.size()) {}
~RealRequestMatcherPromises() override {
for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) {
CHECK_EQ(queue.Pop(), nullptr);
}
}
void ZombifyPending() override {
while (!pending_.empty()) {
pending_.front()->Finish(absl::InternalError("Server closed"));
pending_.pop();
}
}
void KillRequests(grpc_error_handle error) override {
for (size_t i = 0; i < requests_per_cq_.size(); i++) {
RequestedCall* rc;
while ((rc = reinterpret_cast<RequestedCall*>(
requests_per_cq_[i].Pop())) != nullptr) {
server_->FailCall(i, rc, error);
}
}
}
size_t request_queue_count() const override {
return requests_per_cq_.size();
}
void RequestCallWithPossiblePublish(size_t request_queue_index,
RequestedCall* call) override {
if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) {
// this was the first queued request: we need to lock and start
// matching calls
struct NextPendingCall {
RequestedCall* rc = nullptr;
PendingCall pending;
};
while (true) {
NextPendingCall pending_call;
{
MutexLock lock(&server_->mu_call_);
if (!pending_.empty()) {
pending_call.rc = reinterpret_cast<RequestedCall*>(
requests_per_cq_[request_queue_index].Pop());
if (pending_call.rc != nullptr) {
pending_call.pending = std::move(pending_.front());
pending_.pop();
}
}
}
if (pending_call.rc == nullptr) break;
if (!pending_call.pending->Finish(server(), request_queue_index,
pending_call.rc)) {
requests_per_cq_[request_queue_index].Push(
&pending_call.rc->mpscq_node);
}
}
}
}
void MatchOrQueue(size_t, CallData*) override {
Crash("not implemented for promises");
}
ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
size_t start_request_queue_index) override {
for (size_t i = 0; i < requests_per_cq_.size(); i++) {
@ -509,10 +454,11 @@ class Server::RealRequestMatcherPromises : public RequestMatcherInterface {
{
std::vector<std::shared_ptr<ActivityWaiter>> removed_pending;
MutexLock lock(&server_->mu_call_);
while (!pending_.empty() &&
pending_.front()->Age() > server_->max_time_in_pending_queue_) {
removed_pending.push_back(std::move(pending_.front()));
pending_.pop();
while (!pending_promises_.empty() &&
pending_promises_.front()->Age() >
server_->max_time_in_pending_queue_) {
removed_pending.push_back(std::move(pending_promises_.front()));
pending_promises_.pop();
}
for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) {
cq_idx =
@ -521,14 +467,14 @@ class Server::RealRequestMatcherPromises : public RequestMatcherInterface {
if (rc != nullptr) break;
}
if (rc == nullptr) {
if (server_->pending_backlog_protector_.Reject(pending_.size(),
if (server_->pending_backlog_protector_.Reject(pending_promises_.size(),
server_->bitgen_)) {
return Immediate(absl::ResourceExhaustedError(
"Too many pending requests for this server"));
}
auto w = std::make_shared<ActivityWaiter>(
GetContext<Activity>()->MakeOwningWaker());
pending_.push(w);
pending_promises_.push(w);
return OnCancel(
[w]() -> Poll<absl::StatusOr<MatchResult>> {
std::unique_ptr<absl::StatusOr<MatchResult>> r(
@ -546,6 +492,11 @@ class Server::RealRequestMatcherPromises : public RequestMatcherInterface {
private:
Server* const server_;
struct PendingCallFilterStack {
CallData* calld;
Timestamp created = Timestamp::Now();
Duration Age() { return Timestamp::Now() - created; }
};
struct ActivityWaiter {
using ResultType = absl::StatusOr<MatchResult>;
explicit ActivityWaiter(Waker waker) : waker(std::move(waker)) {}
@ -580,8 +531,9 @@ class Server::RealRequestMatcherPromises : public RequestMatcherInterface {
std::atomic<ResultType*> result{nullptr};
const Timestamp created = Timestamp::Now();
};
using PendingCall = std::shared_ptr<ActivityWaiter>;
std::queue<PendingCall> pending_;
using PendingCallPromises = std::shared_ptr<ActivityWaiter>;
std::queue<PendingCallFilterStack> pending_filter_stack_;
std::queue<PendingCallPromises> pending_promises_;
std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
};
@ -784,13 +736,40 @@ class ChannelBroadcaster {
} // namespace
//
// Server::TransportConnectivityWatcher
//
class Server::TransportConnectivityWatcher
: public AsyncConnectivityStateWatcherInterface {
public:
TransportConnectivityWatcher(RefCountedPtr<ServerTransport> transport,
RefCountedPtr<Server> server)
: transport_(std::move(transport)), server_(std::move(server)) {}
private:
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status& /*status*/) override {
// Don't do anything until we are being shut down.
if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
// Shut down channel.
MutexLock lock(&server_->mu_global_);
server_->connections_.erase(transport_.get());
--server_->connections_open_;
server_->MaybeFinishShutdown();
}
RefCountedPtr<ServerTransport> transport_;
RefCountedPtr<Server> server_;
};
//
// Server
//
const grpc_channel_filter Server::kServerTopFilter = {
Server::CallData::StartTransportStreamOpBatch,
Server::ChannelData::MakeCallPromise,
nullptr,
[](grpc_channel_element*, CallSpineInterface*) {
// TODO(ctiller): remove the server filter when call-v3 is finalized
},
@ -826,12 +805,91 @@ RefCountedPtr<channelz::ServerNode> CreateChannelzNode(
return channelz_node;
}
absl::StatusOr<ClientMetadataHandle> CheckClientMetadata(
ValueOrFailure<ClientMetadataHandle> md) {
if (!md.ok()) {
return absl::InternalError("Missing metadata");
}
if (!md.value()->get_pointer(HttpPathMetadata())) {
return absl::InternalError("Missing :path header");
}
if (!md.value()->get_pointer(HttpAuthorityMetadata())) {
return absl::InternalError("Missing :authority header");
}
return std::move(*md);
}
} // namespace
auto Server::MatchAndPublishCall(CallHandler call_handler) {
call_handler.SpawnGuarded("request_matcher", [this, call_handler]() mutable {
return TrySeq(
// Wait for initial metadata to pass through all filters
Map(call_handler.PullClientInitialMetadata(), CheckClientMetadata),
// Match request with requested call
[this, call_handler](ClientMetadataHandle md) mutable {
auto* registered_method = static_cast<RegisteredMethod*>(
md->get(GrpcRegisteredMethod()).value_or(nullptr));
RequestMatcherInterface* rm;
grpc_server_register_method_payload_handling payload_handling =
GRPC_SRM_PAYLOAD_NONE;
if (registered_method == nullptr) {
rm = unregistered_request_matcher_.get();
} else {
payload_handling = registered_method->payload_handling;
rm = registered_method->matcher.get();
}
auto maybe_read_first_message = If(
payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
[call_handler]() mutable { return call_handler.PullMessage(); },
[]() -> ValueOrFailure<absl::optional<MessageHandle>> {
return ValueOrFailure<absl::optional<MessageHandle>>(
absl::nullopt);
});
return TryJoin<absl::StatusOr>(
std::move(maybe_read_first_message), rm->MatchRequest(0),
[md = std::move(md)]() mutable {
return ValueOrFailure<ClientMetadataHandle>(std::move(md));
});
},
// Publish call to cq
[call_handler, this](std::tuple<absl::optional<MessageHandle>,
RequestMatcherInterface::MatchResult,
ClientMetadataHandle>
r) {
RequestMatcherInterface::MatchResult& mr = std::get<1>(r);
auto md = std::move(std::get<2>(r));
auto* rc = mr.TakeCall();
rc->Complete(std::move(std::get<0>(r)), *md);
grpc_call* call =
MakeServerCall(call_handler, std::move(md), this,
rc->cq_bound_to_call, rc->initial_metadata);
*rc->call = call;
return Map(WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()),
[rc = std::unique_ptr<RequestedCall>(rc)](Empty) {
return absl::OkStatus();
});
});
});
}
absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>
Server::MakeCallDestination(const ChannelArgs& args) {
InterceptionChainBuilder builder(args);
builder.AddOnClientInitialMetadata(
[this](ClientMetadata& md) { SetRegisteredMethodOnMetadata(md); });
CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(
GRPC_SERVER_CHANNEL, builder);
return builder.Build(
MakeCallDestinationFromHandlerFunction([this](CallHandler handler) {
return MatchAndPublishCall(std::move(handler));
}));
}
Server::Server(const ChannelArgs& args)
: channel_args_(args),
channelz_node_(CreateChannelzNode(args)),
server_call_tracer_factory_(ServerCallTracerFactory::Get(args)),
compression_options_(CompressionOptionsFromChannelArgs(args)),
max_time_in_pending_queue_(Duration::Seconds(
channel_args_
.GetInt(GRPC_ARG_SERVER_MAX_UNREQUESTED_TIME_IN_SERVER_SECONDS)
@ -862,15 +920,6 @@ void Server::AddListener(OrphanablePtr<ListenerInterface> listener) {
}
void Server::Start() {
auto make_real_request_matcher =
[this]() -> std::unique_ptr<RequestMatcherInterface> {
if (IsPromiseBasedServerCallEnabled()) {
return std::make_unique<RealRequestMatcherPromises>(this);
} else {
return std::make_unique<RealRequestMatcherFilterStack>(this);
}
};
started_ = true;
for (grpc_completion_queue* cq : cqs_) {
if (grpc_cq_can_listen(cq)) {
@ -878,11 +927,11 @@ void Server::Start() {
}
}
if (unregistered_request_matcher_ == nullptr) {
unregistered_request_matcher_ = make_real_request_matcher();
unregistered_request_matcher_ = std::make_unique<RealRequestMatcher>(this);
}
for (auto& rm : registered_methods_) {
if (rm.second->matcher == nullptr) {
rm.second->matcher = make_real_request_matcher();
rm.second->matcher = std::make_unique<RealRequestMatcher>(this);
}
}
{
@ -913,37 +962,61 @@ grpc_error_handle Server::SetupTransport(
const RefCountedPtr<channelz::SocketNode>& socket_node) {
// Create channel.
global_stats().IncrementServerChannelsCreated();
absl::StatusOr<OrphanablePtr<Channel>> channel =
LegacyChannel::Create("", args.SetObject(transport), GRPC_SERVER_CHANNEL);
if (!channel.ok()) {
return absl_status_to_grpc_error(channel.status());
}
ChannelData* chand = static_cast<ChannelData*>(
grpc_channel_stack_element((*channel)->channel_stack(), 0)->channel_data);
// Set up CQs.
size_t cq_idx;
for (cq_idx = 0; cq_idx < cqs_.size(); cq_idx++) {
if (grpc_cq_pollset(cqs_[cq_idx]) == accepting_pollset) break;
}
if (cq_idx == cqs_.size()) {
// Completion queue not found. Pick a random one to publish new calls to.
cq_idx = static_cast<size_t>(rand()) % std::max<size_t>(1, cqs_.size());
}
// Set up channelz node.
intptr_t channelz_socket_uuid = 0;
if (socket_node != nullptr) {
channelz_socket_uuid = socket_node->uuid();
channelz_node_->AddChildSocket(socket_node);
}
// Initialize chand.
chand->InitTransport(Ref(), std::move(*channel), cq_idx, transport,
channelz_socket_uuid);
if (transport->server_transport() != nullptr) {
// Take ownership
// TODO(ctiller): post-v3-transition make this method take an
// OrphanablePtr<ServerTransport> directly.
OrphanablePtr<ServerTransport> t(transport->server_transport());
auto destination = MakeCallDestination(args.SetObject(transport));
if (!destination.ok()) {
return absl_status_to_grpc_error(destination.status());
}
// TODO(ctiller): add channelz node
t->SetCallDestination(std::move(*destination));
MutexLock lock(&mu_global_);
if (ShutdownCalled()) {
t->DisconnectWithError(GRPC_ERROR_CREATE("Server shutdown"));
}
t->StartConnectivityWatch(MakeOrphanable<TransportConnectivityWatcher>(
t->RefAsSubclass<ServerTransport>(), Ref()));
gpr_log(GPR_INFO, "Adding connection");
connections_.emplace(std::move(t));
++connections_open_;
} else {
CHECK(transport->filter_stack_transport() != nullptr);
absl::StatusOr<OrphanablePtr<Channel>> channel = LegacyChannel::Create(
"", args.SetObject(transport), GRPC_SERVER_CHANNEL);
if (!channel.ok()) {
return absl_status_to_grpc_error(channel.status());
}
ChannelData* chand = static_cast<ChannelData*>(
grpc_channel_stack_element((*channel)->channel_stack(), 0)
->channel_data);
// Set up CQs.
size_t cq_idx;
for (cq_idx = 0; cq_idx < cqs_.size(); cq_idx++) {
if (grpc_cq_pollset(cqs_[cq_idx]) == accepting_pollset) break;
}
if (cq_idx == cqs_.size()) {
// Completion queue not found. Pick a random one to publish new calls to.
cq_idx = static_cast<size_t>(rand()) % std::max<size_t>(1, cqs_.size());
}
intptr_t channelz_socket_uuid = 0;
if (socket_node != nullptr) {
channelz_socket_uuid = socket_node->uuid();
channelz_node_->AddChildSocket(socket_node);
}
// Initialize chand.
chand->InitTransport(Ref(), std::move(*channel), cq_idx, transport,
channelz_socket_uuid);
}
return absl::OkStatus();
}
bool Server::HasOpenConnections() {
MutexLock lock(&mu_global_);
return !channels_.empty();
return !channels_.empty() || !connections_.empty();
}
void Server::SetRegisteredMethodAllocator(
@ -1023,16 +1096,18 @@ void Server::MaybeFinishShutdown() {
MutexLock lock(&mu_call_);
KillPendingWorkLocked(GRPC_ERROR_CREATE("Server Shutdown"));
}
if (!channels_.empty() || listeners_destroyed_ < listeners_.size()) {
if (!channels_.empty() || connections_open_ > 0 ||
listeners_destroyed_ < listeners_.size()) {
if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
last_shutdown_message_time_),
gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME);
gpr_log(GPR_DEBUG,
"Waiting for %" PRIuPTR " channels and %" PRIuPTR "/%" PRIuPTR
"Waiting for %" PRIuPTR " channels %" PRIuPTR
" connections and %" PRIuPTR "/%" PRIuPTR
" listeners to be destroyed before shutting down server",
channels_.size(), listeners_.size() - listeners_destroyed_,
listeners_.size());
channels_.size(), connections_open_,
listeners_.size() - listeners_destroyed_, listeners_.size());
}
return;
}
@ -1095,6 +1170,7 @@ void DonePublishedShutdown(void* /*done_arg*/, grpc_cq_completion* storage) {
// -- Once there are no more calls in progress, the channel is closed.
void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) {
ChannelBroadcaster broadcaster;
absl::flat_hash_set<OrphanablePtr<ServerTransport>> removing_connections;
{
// Wait for startup to be finished. Locks mu_global.
MutexLock lock(&mu_global_);
@ -1114,6 +1190,7 @@ void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) {
}
last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME);
broadcaster.FillChannelsLocked(GetChannelsLocked());
removing_connections.swap(connections_);
// Collect all unregistered then registered calls.
{
MutexLock lock(&mu_call_);
@ -1300,17 +1377,6 @@ Server::ChannelData::~ChannelData() {
}
}
Arena* Server::ChannelData::CreateArena() { return channel_->CreateArena(); }
absl::StatusOr<CallInitiator> Server::ChannelData::CreateCall(
ClientMetadataHandle client_initial_metadata, Arena* arena) {
SetRegisteredMethodOnMetadata(*client_initial_metadata);
auto call = MakeServerCall(std::move(client_initial_metadata), server_.get(),
channel_.get(), arena);
InitCall(call);
return CallInitiator(std::move(call));
}
void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
OrphanablePtr<Channel> channel,
size_t cq_idx, Transport* transport,
@ -1327,22 +1393,15 @@ void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
}
// Start accept_stream transport op.
grpc_transport_op* op = grpc_make_transport_op(nullptr);
int accept_stream_types = 0;
if (transport->filter_stack_transport() != nullptr) {
++accept_stream_types;
op->set_accept_stream = true;
op->set_accept_stream_fn = AcceptStream;
op->set_registered_method_matcher_fn = [](void* arg,
ClientMetadata* metadata) {
static_cast<ChannelData*>(arg)->SetRegisteredMethodOnMetadata(*metadata);
};
op->set_accept_stream_user_data = this;
}
if (transport->server_transport() != nullptr) {
++accept_stream_types;
transport->server_transport()->SetAcceptor(this);
}
CHECK_EQ(accept_stream_types, 1);
CHECK(transport->filter_stack_transport() != nullptr);
op->set_accept_stream = true;
op->set_accept_stream_fn = AcceptStream;
op->set_registered_method_matcher_fn = [](void* arg,
ClientMetadata* metadata) {
static_cast<ChannelData*>(arg)->server_->SetRegisteredMethodOnMetadata(
*metadata);
};
op->set_accept_stream_user_data = this;
op->start_connectivity_watch = MakeOrphanable<ConnectivityWatcher>(this);
if (server_->ShutdownCalled()) {
op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
@ -1350,24 +1409,23 @@ void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
transport->PerformOp(op);
}
Server::RegisteredMethod* Server::ChannelData::GetRegisteredMethod(
Server::RegisteredMethod* Server::GetRegisteredMethod(
const absl::string_view& host, const absl::string_view& path) {
if (server_->registered_methods_.empty()) return nullptr;
if (registered_methods_.empty()) return nullptr;
// check for an exact match with host
auto it = server_->registered_methods_.find(std::make_pair(host, path));
if (it != server_->registered_methods_.end()) {
auto it = registered_methods_.find(std::make_pair(host, path));
if (it != registered_methods_.end()) {
return it->second.get();
}
// check for wildcard method definition (no host set)
it = server_->registered_methods_.find(std::make_pair("", path));
if (it != server_->registered_methods_.end()) {
it = registered_methods_.find(std::make_pair("", path));
if (it != registered_methods_.end()) {
return it->second.get();
}
return nullptr;
}
void Server::ChannelData::SetRegisteredMethodOnMetadata(
ClientMetadata& metadata) {
void Server::SetRegisteredMethodOnMetadata(ClientMetadata& metadata) {
auto* authority = metadata.get_pointer(HttpAuthorityMetadata());
if (authority == nullptr) {
authority = metadata.get_pointer(HostMetadata());
@ -1403,188 +1461,14 @@ void Server::ChannelData::AcceptStream(void* arg, Transport* /*transport*/,
grpc_call* call;
grpc_error_handle error = grpc_call_create(&args, &call);
grpc_call_stack* call_stack = grpc_call_get_call_stack(call);
if (call_stack == nullptr) { // Promise based calls do not have a call stack
CHECK(error.ok());
CHECK(IsPromiseBasedServerCallEnabled());
CHECK_NE(call_stack, nullptr);
grpc_call_element* elem = grpc_call_stack_element(call_stack, 0);
auto* calld = static_cast<Server::CallData*>(elem->call_data);
if (!error.ok()) {
calld->FailCallCreation();
return;
} else {
grpc_call_element* elem = grpc_call_stack_element(call_stack, 0);
auto* calld = static_cast<Server::CallData*>(elem->call_data);
if (!error.ok()) {
calld->FailCallCreation();
return;
}
calld->Start(elem);
}
}
namespace {
auto CancelledDueToServerShutdown() {
return [] {
return ServerMetadataFromStatus(absl::CancelledError("Server shutdown"));
};
}
} // namespace
void Server::ChannelData::InitCall(RefCountedPtr<CallSpineInterface> call) {
call->SpawnGuarded("request_matcher", [this, call]() {
return TrySeq(
// Wait for initial metadata to pass through all filters
Map(call->PullClientInitialMetadata(),
[](ValueOrFailure<ClientMetadataHandle> md)
-> absl::StatusOr<ClientMetadataHandle> {
if (!md.ok()) {
return absl::InternalError("Missing metadata");
}
if (!md.value()->get_pointer(HttpPathMetadata())) {
return absl::InternalError("Missing :path header");
}
if (!md.value()->get_pointer(HttpAuthorityMetadata())) {
return absl::InternalError("Missing :authority header");
}
return std::move(*md);
}),
// Match request with requested call
[this, call](ClientMetadataHandle md) {
auto* registered_method = static_cast<RegisteredMethod*>(
md->get(GrpcRegisteredMethod()).value_or(nullptr));
RequestMatcherInterface* rm;
grpc_server_register_method_payload_handling payload_handling =
GRPC_SRM_PAYLOAD_NONE;
if (registered_method == nullptr) {
rm = server_->unregistered_request_matcher_.get();
} else {
payload_handling = registered_method->payload_handling;
rm = registered_method->matcher.get();
}
auto maybe_read_first_message = If(
payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
[call]() { return call->PullClientToServerMessage(); },
[]() -> ValueOrFailure<absl::optional<MessageHandle>> {
return ValueOrFailure<absl::optional<MessageHandle>>(
absl::nullopt);
});
return TryJoin<absl::StatusOr>(
std::move(maybe_read_first_message), rm->MatchRequest(cq_idx()),
[md = std::move(md)]() mutable {
return ValueOrFailure<ClientMetadataHandle>(std::move(md));
});
},
// Publish call to cq
[](std::tuple<absl::optional<MessageHandle>,
RequestMatcherInterface::MatchResult,
ClientMetadataHandle>
r) {
RequestMatcherInterface::MatchResult& mr = std::get<1>(r);
auto md = std::move(std::get<2>(r));
auto* rc = mr.TakeCall();
rc->Complete(std::move(std::get<0>(r)), *md);
auto* call_context = GetContext<CallContext>();
const auto* deadline = md->get_pointer(GrpcTimeoutMetadata());
if (deadline != nullptr) {
GetContext<Call>()->UpdateDeadline(*deadline);
}
*rc->call = call_context->c_call();
grpc_call_ref(*rc->call);
grpc_call_set_completion_queue(call_context->c_call(),
rc->cq_bound_to_call);
call_context->server_call_context()->PublishInitialMetadata(
std::move(md), rc->initial_metadata);
// TODO(ctiller): publish metadata
return Map(WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()),
[rc = std::unique_ptr<RequestedCall>(rc)](Empty) {
return absl::OkStatus();
});
});
});
}
ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise(
grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory) {
auto* chand = static_cast<Server::ChannelData*>(elem->channel_data);
auto* server = chand->server_.get();
if (server->ShutdownCalled()) return CancelledDueToServerShutdown();
auto cleanup_ref =
absl::MakeCleanup([server] { server->ShutdownUnrefOnRequest(); });
if (!server->ShutdownRefOnRequest()) return CancelledDueToServerShutdown();
auto path_ptr =
call_args.client_initial_metadata->get_pointer(HttpPathMetadata());
if (path_ptr == nullptr) {
return [] {
return ServerMetadataFromStatus(
absl::InternalError("Missing :path header"));
};
}
auto host_ptr =
call_args.client_initial_metadata->get_pointer(HttpAuthorityMetadata());
if (host_ptr == nullptr) {
return [] {
return ServerMetadataFromStatus(
absl::InternalError("Missing :authority header"));
};
}
// Find request matcher.
RequestMatcherInterface* matcher;
RegisteredMethod* rm = static_cast<RegisteredMethod*>(
call_args.client_initial_metadata->get(GrpcRegisteredMethod())
.value_or(nullptr));
ArenaPromise<absl::StatusOr<NextResult<MessageHandle>>>
maybe_read_first_message([] { return NextResult<MessageHandle>(); });
if (rm != nullptr) {
matcher = rm->matcher.get();
switch (rm->payload_handling) {
case GRPC_SRM_PAYLOAD_NONE:
break;
case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER:
maybe_read_first_message =
Map(call_args.client_to_server_messages->Next(),
[](NextResult<MessageHandle> msg)
-> absl::StatusOr<NextResult<MessageHandle>> {
return std::move(msg);
});
}
} else {
matcher = server->unregistered_request_matcher_.get();
}
return TrySeq(
std::move(maybe_read_first_message),
[cleanup_ref = std::move(cleanup_ref), matcher,
chand](NextResult<MessageHandle> payload) mutable {
return Map(
[cleanup_ref = std::move(cleanup_ref),
mr = matcher->MatchRequest(chand->cq_idx())]() mutable {
return mr();
},
[payload = std::move(payload)](
absl::StatusOr<RequestMatcherInterface::MatchResult> mr) mutable
-> absl::StatusOr<std::pair<RequestMatcherInterface::MatchResult,
NextResult<MessageHandle>>> {
if (!mr.ok()) return mr.status();
return std::make_pair(std::move(*mr), std::move(payload));
});
},
[call_args =
std::move(call_args)](std::pair<RequestMatcherInterface::MatchResult,
NextResult<MessageHandle>>
r) mutable {
auto& mr = r.first;
auto& payload = r.second;
auto* rc = mr.TakeCall();
auto* cq_for_new_request = mr.cq();
auto* server_call_context =
GetContext<CallContext>()->server_call_context();
rc->Complete(std::move(payload), *call_args.client_initial_metadata);
server_call_context->PublishInitialMetadata(
std::move(call_args.client_initial_metadata), rc->initial_metadata);
return server_call_context->MakeTopOfServerCallPromise(
std::move(call_args), rc->cq_bound_to_call,
[rc, cq_for_new_request](grpc_call* call) {
*rc->call = call;
grpc_cq_end_op(cq_for_new_request, rc->tag, absl::OkStatus(),
Server::DoneRequestEvent, rc, &rc->completion,
true);
});
});
calld->Start(elem);
}
void Server::ChannelData::FinishDestroy(void* arg,

@ -38,6 +38,7 @@
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/passive_listener.h>
#include <grpc/slice.h>
@ -215,6 +216,10 @@ class Server : public ServerInterface,
void SendGoaways() ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_);
grpc_compression_options compression_options() const override {
return compression_options_;
}
private:
// note: the grpc_core::Server redundant namespace qualification is
// required for older gcc versions.
@ -227,13 +232,12 @@ class Server : public ServerInterface,
struct RequestedCall;
class RequestMatcherInterface;
class RealRequestMatcherFilterStack;
class RealRequestMatcherPromises;
class RealRequestMatcher;
class AllocatingRequestMatcherBase;
class AllocatingRequestMatcherBatch;
class AllocatingRequestMatcherRegistered;
class ChannelData final : public ServerTransport::Acceptor {
class ChannelData final {
public:
ChannelData() = default;
~ChannelData();
@ -246,26 +250,17 @@ class Server : public ServerInterface,
Channel* channel() const { return channel_.get(); }
size_t cq_idx() const { return cq_idx_; }
RegisteredMethod* GetRegisteredMethod(const absl::string_view& host,
const absl::string_view& path);
// Filter vtable functions.
static grpc_error_handle InitChannelElement(
grpc_channel_element* elem, grpc_channel_element_args* args);
static void DestroyChannelElement(grpc_channel_element* elem);
static ArenaPromise<ServerMetadataHandle> MakeCallPromise(
grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory);
void InitCall(RefCountedPtr<CallSpineInterface> call);
Arena* CreateArena() override;
absl::StatusOr<CallInitiator> CreateCall(
ClientMetadataHandle client_initial_metadata, Arena* arena) override;
private:
class ConnectivityWatcher;
static void AcceptStream(void* arg, Transport* /*transport*/,
const void* transport_server_data);
void SetRegisteredMethodOnMetadata(ClientMetadata& metadata);
void Destroy() ABSL_EXCLUSIVE_LOCKS_REQUIRED(server_->mu_global_);
@ -396,6 +391,12 @@ class Server : public ServerInterface,
using is_transparent = void;
};
class TransportConnectivityWatcher;
RegisteredMethod* GetRegisteredMethod(const absl::string_view& host,
const absl::string_view& path);
void SetRegisteredMethodOnMetadata(ClientMetadata& metadata);
static void ListenerDestroyDone(void* arg, grpc_error_handle error);
static void DoneShutdownEvent(void* server,
@ -457,6 +458,10 @@ class Server : public ServerInterface,
return shutdown_refs_.load(std::memory_order_acquire) == 0;
}
auto MatchAndPublishCall(CallHandler call_handler);
absl::StatusOr<RefCountedPtr<UnstartedCallDestination>> MakeCallDestination(
const ChannelArgs& args);
ChannelArgs const channel_args_;
RefCountedPtr<channelz::ServerNode> channelz_node_;
std::unique_ptr<grpc_server_config_fetcher> config_fetcher_;
@ -465,6 +470,7 @@ class Server : public ServerInterface,
std::vector<grpc_completion_queue*> cqs_;
std::vector<grpc_pollset*> pollsets_;
bool started_ = false;
const grpc_compression_options compression_options_;
// The two following mutexes control access to server-state.
// mu_global_ controls access to non-call-related state (e.g., channel state).
@ -512,6 +518,9 @@ class Server : public ServerInterface,
absl::BitGen bitgen_ ABSL_GUARDED_BY(mu_call_);
std::list<ChannelData*> channels_;
absl::flat_hash_set<OrphanablePtr<ServerTransport>> connections_
ABSL_GUARDED_BY(mu_global_);
size_t connections_open_ ABSL_GUARDED_BY(mu_global_) = 0;
std::list<Listener> listeners_;
size_t listeners_destroyed_ = 0;

@ -17,6 +17,7 @@
#ifndef GRPC_SRC_CORE_SERVER_SERVER_INTERFACE_H
#define GRPC_SRC_CORE_SERVER_SERVER_INTERFACE_H
#include <grpc/impl/compression_types.h>
#include <grpc/support/port_platform.h>
#include "src/core/channelz/channelz.h"
@ -36,6 +37,7 @@ class ServerInterface {
virtual const ChannelArgs& channel_args() const = 0;
virtual channelz::ServerNode* channelz_node() const = 0;
virtual ServerCallTracerFactory* server_call_tracer_factory() const = 0;
virtual grpc_compression_options compression_options() const = 0;
};
} // namespace grpc_core

@ -736,6 +736,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/transport/call_spine.cc',
'src/core/lib/transport/connectivity_state.cc',
'src/core/lib/transport/error_utils.cc',
'src/core/lib/transport/interception_chain.cc',
'src/core/lib/transport/message.cc',
'src/core/lib/transport/metadata.cc',
'src/core/lib/transport/metadata_batch.cc',

@ -103,9 +103,6 @@ CORE_END2END_TEST(CoreEnd2endTest, FilterContext) {
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
for (auto type : {GRPC_CLIENT_CHANNEL, GRPC_CLIENT_SUBCHANNEL,
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_SERVER_CHANNEL}) {
if (type == GRPC_SERVER_CHANNEL && IsPromiseBasedServerCallEnabled()) {
continue;
}
builder->channel_init()->RegisterFilter(type, &test_filter);
}
});

@ -23,7 +23,10 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder_impl.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/call_arena_allocator.h"
#include "test/core/test_util/test_config.h"
namespace grpc_core {
@ -208,7 +211,7 @@ class TestFilter1 {
explicit TestFilter1(int* p) : p_(p) {}
static absl::StatusOr<std::unique_ptr<TestFilter1>> Create(
const ChannelArgs& args, Empty) {
const ChannelArgs& args, ChannelFilter::Args) {
EXPECT_EQ(args.GetInt("foo"), 1);
return std::make_unique<TestFilter1>(args.GetPointer<int>("p"));
}
@ -250,19 +253,23 @@ TEST(ChannelInitTest, CanCreateFilterWithCall) {
b.RegisterFilter<TestFilter1>(GRPC_CLIENT_CHANNEL);
auto init = b.Build();
int p = 0;
auto segment = init.CreateStackSegment(
GRPC_CLIENT_CHANNEL,
ChannelArgs().Set("foo", 1).Set("p", ChannelArgs::UnownedPointer(&p)));
ASSERT_TRUE(segment.ok()) << segment.status();
CallFilters::StackBuilder stack_builder;
segment->AddToCallFilterStack(stack_builder);
segment = absl::CancelledError(); // force the segment to be destroyed
auto stack = stack_builder.Build();
{
CallFilters call_filters(Arena::MakePooled<ClientMetadata>());
call_filters.SetStack(std::move(stack));
}
InterceptionChainBuilder chain_builder{
ChannelArgs().Set("foo", 1).Set("p", ChannelArgs::UnownedPointer(&p))};
init.AddToInterceptionChainBuilder(GRPC_CLIENT_CHANNEL, chain_builder);
int handled = 0;
auto stack = chain_builder.Build(MakeCallDestinationFromHandlerFunction(
[&handled](CallHandler) { ++handled; }));
ASSERT_TRUE(stack.ok()) << stack.status();
RefCountedPtr<CallArenaAllocator> allocator =
MakeRefCounted<CallArenaAllocator>(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
"test"),
1024);
auto call = MakeCallPair(Arena::MakePooled<ClientMetadata>(), nullptr,
allocator->MakeArena(), allocator, nullptr);
(*stack)->StartCall(std::move(call.handler));
EXPECT_EQ(p, 1);
EXPECT_EQ(handled, 1);
}
} // namespace

@ -39,6 +39,7 @@
#include "src/core/lib/uri/uri_parser.h"
#include "src/core/server/server.h"
#include "test/core/event_engine/event_engine_test_utils.h"
#include "test/core/test_util/build.h"
#include "test/core/test_util/port.h"
#include "test/core/test_util/test_config.h"
@ -67,7 +68,10 @@ class ChaoticGoodServerTest : public ::testing::Test {
auto ev = grpc_completion_queue_pluck(
shutdown_cq, nullptr, grpc_timeout_milliseconds_to_deadline(15000),
nullptr);
CHECK(ev.type == GRPC_OP_COMPLETE);
if (ev.type == GRPC_QUEUE_TIMEOUT) {
AsanAssertNoLeaks();
}
CHECK_EQ(ev.type, GRPC_OP_COMPLETE);
CHECK_EQ(ev.tag, nullptr);
grpc_completion_queue_destroy(shutdown_cq);
grpc_server_destroy(server_);

@ -82,19 +82,19 @@ ServerMetadataHandle TestTrailingMetadata() {
return md;
}
class MockAcceptor : public ServerTransport::Acceptor {
class MockCallDestination : public UnstartedCallDestination {
public:
virtual ~MockAcceptor() = default;
MOCK_METHOD(Arena*, CreateArena, (), (override));
MOCK_METHOD(absl::StatusOr<CallInitiator>, CreateCall,
(ClientMetadataHandle client_initial_metadata, Arena* arena),
~MockCallDestination() override = default;
MOCK_METHOD(void, Orphaned, (), (override));
MOCK_METHOD(void, StartCall, (UnstartedCallHandler unstarted_call_handler),
(override));
};
TEST_F(TransportTest, ReadAndWriteOneMessage) {
MockPromiseEndpoint control_endpoint;
MockPromiseEndpoint data_endpoint;
StrictMock<MockAcceptor> acceptor;
auto call_destination = MakeRefCounted<StrictMock<MockCallDestination>>();
EXPECT_CALL(*call_destination, Orphaned()).Times(1);
auto transport = MakeOrphanable<ChaoticGoodServerTransport>(
CoreConfiguration::Get()
.channel_args_preconditioning()
@ -112,19 +112,21 @@ TEST_F(TransportTest, ReadAndWriteOneMessage) {
data_endpoint.ExpectRead(
{EventEngineSlice::FromCopiedString("12345678"), Zeros(56)}, nullptr);
// Once that's read we'll create a new call
auto* call_arena = MakeArena();
EXPECT_CALL(acceptor, CreateArena).WillOnce(Return(call_arena));
StrictMock<MockFunction<void()>> on_done;
EXPECT_CALL(acceptor, CreateCall(_, call_arena))
.WillOnce(WithArgs<0>([this, call_arena, &on_done](
ClientMetadataHandle client_initial_metadata) {
EXPECT_EQ(client_initial_metadata->get_pointer(HttpPathMetadata())
auto control_address =
grpc_event_engine::experimental::URIToResolvedAddress("ipv4:1.2.3.4:5678")
.value();
EXPECT_CALL(*control_endpoint.endpoint, GetPeerAddress)
.WillRepeatedly([&control_address]() { return control_address; });
EXPECT_CALL(*call_destination, StartCall(_))
.WillOnce(WithArgs<0>([&on_done](
UnstartedCallHandler unstarted_call_handler) {
EXPECT_EQ(unstarted_call_handler.UnprocessedClientInitialMetadata()
.get_pointer(HttpPathMetadata())
->as_string_view(),
"/demo.Service/Step");
CallInitiatorAndHandler call = MakeCallPair(
std::move(client_initial_metadata), event_engine().get(),
call_arena, call_arena_allocator(), nullptr);
auto handler = call.handler.V2HackToStartCallWithoutACallFilterStack();
auto handler =
unstarted_call_handler.V2HackToStartCallWithoutACallFilterStack();
handler.SpawnInfallible("test-io", [&on_done, handler]() mutable {
return Seq(
handler.PullClientInitialMetadata(),
@ -163,9 +165,8 @@ TEST_F(TransportTest, ReadAndWriteOneMessage) {
return Empty{};
});
});
return std::move(call.initiator);
}));
transport->SetAcceptor(&acceptor);
transport->SetCallDestination(call_destination);
EXPECT_CALL(on_done, Call());
EXPECT_CALL(*control_endpoint.endpoint, Read)
.InSequence(control_endpoint.read_sequence)

@ -62,7 +62,7 @@ void FillMetadata(const std::vector<std::pair<std::string, std::string>>& md,
} // namespace
TRANSPORT_TEST(UnaryWithSomeContent) {
SetServerAcceptor();
SetServerCallDestination();
const auto client_initial_metadata = RandomMetadata();
const auto server_initial_metadata = RandomMetadata();
const auto server_trailing_metadata = RandomMetadata();

@ -17,7 +17,7 @@
namespace grpc_core {
TRANSPORT_TEST(MetadataOnlyRequest) {
SetServerAcceptor();
SetServerCallDestination();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
@ -72,7 +72,7 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsAfterInitialMetadata) {
"wrong status code: we don't care for any cases we're "
"rolling out soon, so leaving this disabled.";
SetServerAcceptor();
SetServerCallDestination();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
@ -125,7 +125,7 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsImmediately) {
"wrong status code: we don't care for any cases we're "
"rolling out soon, so leaving this disabled.";
SetServerAcceptor();
SetServerCallDestination();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
@ -165,7 +165,7 @@ TRANSPORT_TEST(MetadataOnlyRequestServerAbortsImmediately) {
}
TRANSPORT_TEST(CanCreateCallThenAbandonIt) {
SetServerAcceptor();
SetServerCallDestination();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
@ -178,7 +178,7 @@ TRANSPORT_TEST(CanCreateCallThenAbandonIt) {
}
TRANSPORT_TEST(UnaryRequest) {
SetServerAcceptor();
SetServerCallDestination();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
@ -257,7 +257,7 @@ TRANSPORT_TEST(UnaryRequest) {
}
TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) {
SetServerAcceptor();
SetServerCallDestination();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
@ -326,7 +326,7 @@ TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) {
}
TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) {
SetServerAcceptor();
SetServerCallDestination();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
@ -404,7 +404,7 @@ TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) {
}
TRANSPORT_TEST(ClientStreamingRequest) {
SetServerAcceptor();
SetServerCallDestination();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));
@ -518,7 +518,7 @@ TRANSPORT_TEST(ClientStreamingRequest) {
}
TRANSPORT_TEST(ServerStreamingRequest) {
SetServerAcceptor();
SetServerCallDestination();
auto md = Arena::MakePooled<ClientMetadata>();
md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
auto initiator = CreateCall(std::move(md));

@ -13,12 +13,16 @@
// limitations under the License.
#include "src/core/ext/transport/inproc/inproc_transport.h"
#include "src/core/lib/config/core_configuration.h"
#include "test/core/transport/test_suite/fixture.h"
namespace grpc_core {
TRANSPORT_FIXTURE(Inproc) {
auto transports = MakeInProcessTransportPair();
auto transports =
MakeInProcessTransportPair(CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr));
return {std::move(transports.first), std::move(transports.second)};
}

@ -20,8 +20,8 @@ TRANSPORT_TEST(NoOp) {}
TRANSPORT_TEST(WaitForAllPendingWork) { WaitForAllPendingWork(); }
TRANSPORT_TEST(SetServerAcceptorAndFinish) {
SetServerAcceptor();
TRANSPORT_TEST(SetServerCallDestinationAndFinish) {
SetServerCallDestination();
WaitForAllPendingWork();
}

@ -19,7 +19,7 @@
namespace grpc_core {
TRANSPORT_TEST(ManyUnaryRequests) {
SetServerAcceptor();
SetServerCallDestination();
const int kNumRequests = absl::LogUniform<int>(rng(), 10, 100);
std::list<std::string> call_names;
auto make_call_name = [&call_names](int i,

@ -52,8 +52,9 @@ void TransportTest::RunTest() {
event_engine_->UnsetGlobalHooks();
}
void TransportTest::SetServerAcceptor() {
transport_pair_.server->server_transport()->SetAcceptor(&acceptor_);
void TransportTest::SetServerCallDestination() {
transport_pair_.server->server_transport()->SetCallDestination(
server_call_destination_);
}
CallInitiator TransportTest::CreateCall(
@ -71,8 +72,10 @@ CallInitiator TransportTest::CreateCall(
CallHandler TransportTest::TickUntilServerCall() {
WatchDog watchdog(this);
for (;;) {
auto handler = acceptor_.PopHandler();
if (handler.has_value()) return std::move(*handler);
auto handler = server_call_destination_->PopHandler();
if (handler.has_value()) {
return std::move(*handler);
}
event_engine_->Tick();
}
}
@ -227,22 +230,14 @@ std::string TransportTest::RandomMessage() {
}
///////////////////////////////////////////////////////////////////////////////
// TransportTest::Acceptor
Arena* TransportTest::Acceptor::CreateArena() {
return test_->call_arena_allocator_->MakeArena();
}
// TransportTest::ServerCallDestination
absl::StatusOr<CallInitiator> TransportTest::Acceptor::CreateCall(
ClientMetadataHandle client_initial_metadata, Arena* arena) {
auto call = MakeCallPair(std::move(client_initial_metadata),
test_->event_engine_.get(), arena,
test_->call_arena_allocator_, nullptr);
handlers_.push(call.handler.V2HackToStartCallWithoutACallFilterStack());
return std::move(call.initiator);
void TransportTest::ServerCallDestination::StartCall(
UnstartedCallHandler handler) {
handlers_.push(handler.V2HackToStartCallWithoutACallFilterStack());
}
absl::optional<CallHandler> TransportTest::Acceptor::PopHandler() {
absl::optional<CallHandler> TransportTest::ServerCallDestination::PopHandler() {
if (!handlers_.empty()) {
auto handler = std::move(handlers_.front());
handlers_.pop();

@ -32,6 +32,7 @@
#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
#include "test/core/test_util/test_config.h"
#include "test/core/transport/test_suite/fixture.h"
namespace grpc_core {
@ -220,7 +221,7 @@ class TransportTest : public ::testing::Test {
fixture_(std::move(fixture)),
rng_(rng) {}
void SetServerAcceptor();
void SetServerCallDestination();
CallInitiator CreateCall(ClientMetadataHandle client_initial_metadata);
std::string RandomString(int min_length, int max_length,
@ -270,18 +271,14 @@ class TransportTest : public ::testing::Test {
void Timeout();
class Acceptor final : public ServerTransport::Acceptor {
class ServerCallDestination final : public UnstartedCallDestination {
public:
explicit Acceptor(TransportTest* test) : test_(test) {}
Arena* CreateArena() override;
absl::StatusOr<CallInitiator> CreateCall(
ClientMetadataHandle client_initial_metadata, Arena* arena) override;
void StartCall(UnstartedCallHandler unstarted_call_handler) override;
void Orphaned() override {}
absl::optional<CallHandler> PopHandler();
private:
std::queue<CallHandler> handlers_;
TransportTest* const test_;
};
class WatchDog {
@ -296,6 +293,7 @@ class TransportTest : public ::testing::Test {
[this]() { test_->Timeout(); })};
};
grpc::testing::TestGrpcScope grpc_scope_;
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
event_engine_{
std::make_shared<grpc_event_engine::experimental::FuzzingEventEngine>(
@ -313,7 +311,8 @@ class TransportTest : public ::testing::Test {
->memory_quota()
->CreateMemoryAllocator("test-allocator"),
1024)};
Acceptor acceptor_{this};
RefCountedPtr<ServerCallDestination> server_call_destination_ =
MakeRefCounted<ServerCallDestination>();
TransportFixture::ClientAndServerTransportPair transport_pair_ =
fixture_->CreateTransportPair(event_engine_);
std::queue<std::shared_ptr<transport_test_detail::ActionState>>

@ -111,7 +111,6 @@ int main(int argc, char** argv) {
grpc_core::ForceEnableExperiment("event_engine_client", true);
grpc_core::ForceEnableExperiment("event_engine_listener", true);
grpc_core::ForceEnableExperiment("promise_based_client_call", true);
grpc_core::ForceEnableExperiment("promise_based_server_call", true);
grpc_core::ForceEnableExperiment("chaotic_good", true);
grpc::testing::TestEnvironment env(&argc, argv);
LibraryInitializer libInit;

@ -2750,6 +2750,7 @@ src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/bdp_estimator.h \
src/core/lib/transport/call_arena_allocator.cc \
src/core/lib/transport/call_arena_allocator.h \
src/core/lib/transport/call_destination.h \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_filters.h \
src/core/lib/transport/call_final_info.cc \
@ -2762,6 +2763,8 @@ src/core/lib/transport/custom_metadata.h \
src/core/lib/transport/error_utils.cc \
src/core/lib/transport/error_utils.h \
src/core/lib/transport/http2_errors.h \
src/core/lib/transport/interception_chain.cc \
src/core/lib/transport/interception_chain.h \
src/core/lib/transport/message.cc \
src/core/lib/transport/message.h \
src/core/lib/transport/metadata.cc \

@ -2527,6 +2527,7 @@ src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/bdp_estimator.h \
src/core/lib/transport/call_arena_allocator.cc \
src/core/lib/transport/call_arena_allocator.h \
src/core/lib/transport/call_destination.h \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_filters.h \
src/core/lib/transport/call_final_info.cc \
@ -2539,6 +2540,8 @@ src/core/lib/transport/custom_metadata.h \
src/core/lib/transport/error_utils.cc \
src/core/lib/transport/error_utils.h \
src/core/lib/transport/http2_errors.h \
src/core/lib/transport/interception_chain.cc \
src/core/lib/transport/interception_chain.h \
src/core/lib/transport/message.cc \
src/core/lib/transport/message.h \
src/core/lib/transport/metadata.cc \

Loading…
Cancel
Save