diff --git a/BUILD b/BUILD
index 0c113b8bf25..f83a6bdc989 100644
--- a/BUILD
+++ b/BUILD
@@ -1942,9 +1942,7 @@ grpc_cc_library(
"promise",
"ref_counted_ptr",
"stats",
- "//src/core:1999",
"//src/core:activity",
- "//src/core:arena",
"//src/core:arena_promise",
"//src/core:cancel_callback",
"//src/core:channel_args",
@@ -1958,6 +1956,7 @@ grpc_cc_library(
"//src/core:error",
"//src/core:error_utils",
"//src/core:experiments",
+ "//src/core:interception_chain",
"//src/core:iomgr_fwd",
"//src/core:map",
"//src/core:metadata_batch",
@@ -2078,6 +2077,7 @@ grpc_cc_library(
"//src/core:arena_promise",
"//src/core:atomic_utils",
"//src/core:bitset",
+ "//src/core:call_destination",
"//src/core:call_filters",
"//src/core:call_final_info",
"//src/core:call_finalization",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1b107dddb0f..3499823786a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -2502,6 +2502,7 @@ add_library(grpc
src/core/lib/transport/call_spine.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc
+ src/core/lib/transport/interception_chain.cc
src/core/lib/transport/message.cc
src/core/lib/transport/metadata.cc
src/core/lib/transport/metadata_batch.cc
@@ -3255,6 +3256,7 @@ add_library(grpc_unsecure
src/core/lib/transport/call_spine.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc
+ src/core/lib/transport/interception_chain.cc
src/core/lib/transport/message.cc
src/core/lib/transport/metadata.cc
src/core/lib/transport/metadata_batch.cc
@@ -5366,6 +5368,7 @@ add_library(grpc_authorization_provider
src/core/lib/transport/call_spine.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/error_utils.cc
+ src/core/lib/transport/interception_chain.cc
src/core/lib/transport/message.cc
src/core/lib/transport/metadata.cc
src/core/lib/transport/metadata_batch.cc
diff --git a/Makefile b/Makefile
index 25ebda3f986..22625964566 100644
--- a/Makefile
+++ b/Makefile
@@ -1387,6 +1387,7 @@ LIBGRPC_SRC = \
src/core/lib/transport/call_spine.cc \
src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/error_utils.cc \
+ src/core/lib/transport/interception_chain.cc \
src/core/lib/transport/message.cc \
src/core/lib/transport/metadata.cc \
src/core/lib/transport/metadata_batch.cc \
diff --git a/Package.swift b/Package.swift
index 81f99552ac2..c4405cf775f 100644
--- a/Package.swift
+++ b/Package.swift
@@ -1749,6 +1749,7 @@ let package = Package(
"src/core/lib/transport/bdp_estimator.h",
"src/core/lib/transport/call_arena_allocator.cc",
"src/core/lib/transport/call_arena_allocator.h",
+ "src/core/lib/transport/call_destination.h",
"src/core/lib/transport/call_filters.cc",
"src/core/lib/transport/call_filters.h",
"src/core/lib/transport/call_final_info.cc",
@@ -1761,6 +1762,8 @@ let package = Package(
"src/core/lib/transport/error_utils.cc",
"src/core/lib/transport/error_utils.h",
"src/core/lib/transport/http2_errors.h",
+ "src/core/lib/transport/interception_chain.cc",
+ "src/core/lib/transport/interception_chain.h",
"src/core/lib/transport/message.cc",
"src/core/lib/transport/message.h",
"src/core/lib/transport/metadata.cc",
diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl
index 096b07c8828..0bd711b6641 100644
--- a/bazel/experiments.bzl
+++ b/bazel/experiments.bzl
@@ -34,9 +34,8 @@ EXPERIMENT_ENABLES = {
"pending_queue_cap": "pending_queue_cap",
"pick_first_new": "pick_first_new",
"promise_based_client_call": "event_engine_client,event_engine_listener,promise_based_client_call",
- "promise_based_server_call": "promise_based_server_call",
- "chaotic_good": "chaotic_good,event_engine_client,event_engine_listener,promise_based_client_call,promise_based_server_call",
- "promise_based_inproc_transport": "event_engine_client,event_engine_listener,promise_based_client_call,promise_based_inproc_transport,promise_based_server_call",
+ "chaotic_good": "chaotic_good,event_engine_client,event_engine_listener,promise_based_client_call",
+ "promise_based_inproc_transport": "event_engine_client,event_engine_listener,promise_based_client_call,promise_based_inproc_transport",
"rstpit": "rstpit",
"schedule_cancellation_over_write": "schedule_cancellation_over_write",
"server_privacy": "server_privacy",
@@ -59,9 +58,6 @@ EXPERIMENTS = {
"dbg": {
},
"off": {
- "core_end2end_test": [
- "promise_based_server_call",
- ],
"endpoint_test": [
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
@@ -73,9 +69,6 @@ EXPERIMENTS = {
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],
- "logging_test": [
- "promise_based_server_call",
- ],
"resource_quota_test": [
"free_large_allocator",
"unconstrained_max_quota_buffer_size",
@@ -107,9 +100,6 @@ EXPERIMENTS = {
"dbg": {
},
"off": {
- "core_end2end_test": [
- "promise_based_server_call",
- ],
"endpoint_test": [
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
@@ -121,9 +111,6 @@ EXPERIMENTS = {
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
],
- "logging_test": [
- "promise_based_server_call",
- ],
"resource_quota_test": [
"free_large_allocator",
"unconstrained_max_quota_buffer_size",
@@ -149,7 +136,6 @@ EXPERIMENTS = {
"chaotic_good",
"event_engine_client",
"promise_based_client_call",
- "promise_based_server_call",
],
"endpoint_test": [
"tcp_frame_size_tuning",
@@ -168,9 +154,6 @@ EXPERIMENTS = {
"lame_client_test": [
"promise_based_client_call",
],
- "logging_test": [
- "promise_based_server_call",
- ],
"resource_quota_test": [
"free_large_allocator",
"unconstrained_max_quota_buffer_size",
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 7458cc04b7f..b60ae5b5e2a 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -1110,6 +1110,7 @@ libs:
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/call_arena_allocator.h
+ - src/core/lib/transport/call_destination.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_spine.h
@@ -1117,6 +1118,7 @@ libs:
- src/core/lib/transport/custom_metadata.h
- src/core/lib/transport/error_utils.h
- src/core/lib/transport/http2_errors.h
+ - src/core/lib/transport/interception_chain.h
- src/core/lib/transport/message.h
- src/core/lib/transport/metadata.h
- src/core/lib/transport/metadata_batch.h
@@ -1921,6 +1923,7 @@ libs:
- src/core/lib/transport/call_spine.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc
+ - src/core/lib/transport/interception_chain.cc
- src/core/lib/transport/message.cc
- src/core/lib/transport/metadata.cc
- src/core/lib/transport/metadata_batch.cc
@@ -2609,6 +2612,7 @@ libs:
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/call_arena_allocator.h
+ - src/core/lib/transport/call_destination.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_spine.h
@@ -2616,6 +2620,7 @@ libs:
- src/core/lib/transport/custom_metadata.h
- src/core/lib/transport/error_utils.h
- src/core/lib/transport/http2_errors.h
+ - src/core/lib/transport/interception_chain.h
- src/core/lib/transport/message.h
- src/core/lib/transport/metadata.h
- src/core/lib/transport/metadata_batch.h
@@ -3032,6 +3037,7 @@ libs:
- src/core/lib/transport/call_spine.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc
+ - src/core/lib/transport/interception_chain.cc
- src/core/lib/transport/message.cc
- src/core/lib/transport/metadata.cc
- src/core/lib/transport/metadata_batch.cc
@@ -4686,6 +4692,7 @@ libs:
- src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/call_arena_allocator.h
+ - src/core/lib/transport/call_destination.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_spine.h
@@ -4693,6 +4700,7 @@ libs:
- src/core/lib/transport/custom_metadata.h
- src/core/lib/transport/error_utils.h
- src/core/lib/transport/http2_errors.h
+ - src/core/lib/transport/interception_chain.h
- src/core/lib/transport/message.h
- src/core/lib/transport/metadata.h
- src/core/lib/transport/metadata_batch.h
@@ -4989,6 +4997,7 @@ libs:
- src/core/lib/transport/call_spine.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/error_utils.cc
+ - src/core/lib/transport/interception_chain.cc
- src/core/lib/transport/message.cc
- src/core/lib/transport/metadata.cc
- src/core/lib/transport/metadata_batch.cc
diff --git a/config.m4 b/config.m4
index d44618c5775..4b72ff9577e 100644
--- a/config.m4
+++ b/config.m4
@@ -762,6 +762,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/transport/call_spine.cc \
src/core/lib/transport/connectivity_state.cc \
src/core/lib/transport/error_utils.cc \
+ src/core/lib/transport/interception_chain.cc \
src/core/lib/transport/message.cc \
src/core/lib/transport/metadata.cc \
src/core/lib/transport/metadata_batch.cc \
diff --git a/config.w32 b/config.w32
index 0d0a85c04d7..f71331b6d7d 100644
--- a/config.w32
+++ b/config.w32
@@ -727,6 +727,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\transport\\call_spine.cc " +
"src\\core\\lib\\transport\\connectivity_state.cc " +
"src\\core\\lib\\transport\\error_utils.cc " +
+ "src\\core\\lib\\transport\\interception_chain.cc " +
"src\\core\\lib\\transport\\message.cc " +
"src\\core\\lib\\transport\\metadata.cc " +
"src\\core\\lib\\transport\\metadata_batch.cc " +
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index ec37a7d2469..530f61b07aa 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -1213,6 +1213,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_arena_allocator.h',
+ 'src/core/lib/transport/call_destination.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_spine.h',
@@ -1220,6 +1221,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/custom_metadata.h',
'src/core/lib/transport/error_utils.h',
'src/core/lib/transport/http2_errors.h',
+ 'src/core/lib/transport/interception_chain.h',
'src/core/lib/transport/message.h',
'src/core/lib/transport/metadata.h',
'src/core/lib/transport/metadata_batch.h',
@@ -2482,6 +2484,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_arena_allocator.h',
+ 'src/core/lib/transport/call_destination.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_spine.h',
@@ -2489,6 +2492,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/custom_metadata.h',
'src/core/lib/transport/error_utils.h',
'src/core/lib/transport/http2_errors.h',
+ 'src/core/lib/transport/interception_chain.h',
'src/core/lib/transport/message.h',
'src/core/lib/transport/metadata.h',
'src/core/lib/transport/metadata_batch.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 5459da7206f..baff363da6c 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -1864,6 +1864,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_arena_allocator.cc',
'src/core/lib/transport/call_arena_allocator.h',
+ 'src/core/lib/transport/call_destination.h',
'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.cc',
@@ -1876,6 +1877,8 @@ Pod::Spec.new do |s|
'src/core/lib/transport/error_utils.cc',
'src/core/lib/transport/error_utils.h',
'src/core/lib/transport/http2_errors.h',
+ 'src/core/lib/transport/interception_chain.cc',
+ 'src/core/lib/transport/interception_chain.h',
'src/core/lib/transport/message.cc',
'src/core/lib/transport/message.h',
'src/core/lib/transport/metadata.cc',
@@ -3261,6 +3264,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_arena_allocator.h',
+ 'src/core/lib/transport/call_destination.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_spine.h',
@@ -3268,6 +3272,7 @@ Pod::Spec.new do |s|
'src/core/lib/transport/custom_metadata.h',
'src/core/lib/transport/error_utils.h',
'src/core/lib/transport/http2_errors.h',
+ 'src/core/lib/transport/interception_chain.h',
'src/core/lib/transport/message.h',
'src/core/lib/transport/metadata.h',
'src/core/lib/transport/metadata_batch.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 44f58db3185..4a58496ff43 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -1751,6 +1751,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/transport/bdp_estimator.h )
s.files += %w( src/core/lib/transport/call_arena_allocator.cc )
s.files += %w( src/core/lib/transport/call_arena_allocator.h )
+ s.files += %w( src/core/lib/transport/call_destination.h )
s.files += %w( src/core/lib/transport/call_filters.cc )
s.files += %w( src/core/lib/transport/call_filters.h )
s.files += %w( src/core/lib/transport/call_final_info.cc )
@@ -1763,6 +1764,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/transport/error_utils.cc )
s.files += %w( src/core/lib/transport/error_utils.h )
s.files += %w( src/core/lib/transport/http2_errors.h )
+ s.files += %w( src/core/lib/transport/interception_chain.cc )
+ s.files += %w( src/core/lib/transport/interception_chain.h )
s.files += %w( src/core/lib/transport/message.cc )
s.files += %w( src/core/lib/transport/message.h )
s.files += %w( src/core/lib/transport/metadata.cc )
diff --git a/package.xml b/package.xml
index 70f64915ef8..44f87605fa6 100644
--- a/package.xml
+++ b/package.xml
@@ -1733,6 +1733,7 @@
+
@@ -1745,6 +1746,8 @@
+
+
diff --git a/src/core/BUILD b/src/core/BUILD
index babf6143381..a9580c2aaa6 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -664,7 +664,10 @@ grpc_cc_library(
grpc_cc_library(
name = "promise_like",
- external_deps = ["absl/meta:type_traits"],
+ external_deps = [
+ "absl/functional:any_invocable",
+ "absl/meta:type_traits",
+ ],
language = "c++",
public_hdrs = [
"lib/promise/detail/promise_like.h",
@@ -3130,6 +3133,7 @@ grpc_cc_library(
"channel_fwd",
"channel_stack_trace",
"channel_stack_type",
+ "interception_chain",
"//:channel_stack_builder",
"//:debug_location",
"//:gpr",
@@ -3147,6 +3151,7 @@ grpc_cc_library(
deps = [
"channel_args",
"//:channelz",
+ "//:event_engine_base_hdrs",
"//:gpr_platform",
],
)
@@ -7087,6 +7092,7 @@ grpc_cc_library(
"experiments",
"iomgr_fwd",
"metadata_batch",
+ "resource_quota",
"slice",
"slice_buffer",
"status_helper",
diff --git a/src/core/ext/filters/logging/logging_filter.cc b/src/core/ext/filters/logging/logging_filter.cc
index 1c76e64b06a..587e87ff506 100644
--- a/src/core/ext/filters/logging/logging_filter.cc
+++ b/src/core/ext/filters/logging/logging_filter.cc
@@ -539,11 +539,11 @@ void RegisterLoggingFilter(LoggingSink* sink) {
g_logging_sink = sink;
CoreConfiguration::RegisterBuilder([](CoreConfiguration::Builder* builder) {
builder->channel_init()
- ->RegisterFilter(GRPC_SERVER_CHANNEL)
+ ->RegisterV2Filter(GRPC_SERVER_CHANNEL)
// TODO(yashykt) : Figure out a good place to place this channel arg
.IfChannelArg("grpc.experimental.enable_observability", true);
builder->channel_init()
- ->RegisterFilter(GRPC_CLIENT_CHANNEL)
+ ->RegisterV2Filter(GRPC_CLIENT_CHANNEL)
// TODO(yashykt) : Figure out a good place to place this channel arg
.IfChannelArg("grpc.experimental.enable_observability", true);
});
diff --git a/src/core/ext/transport/chaotic_good/server_transport.cc b/src/core/ext/transport/chaotic_good/server_transport.cc
index c139c6e44e0..98fbf4e915e 100644
--- a/src/core/ext/transport/chaotic_good/server_transport.cc
+++ b/src/core/ext/transport/chaotic_good/server_transport.cc
@@ -235,37 +235,27 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall(
FrameHeader frame_header, BufferPair buffers,
ChaoticGoodTransport& transport) {
ClientFragmentFrame fragment_frame;
- ScopedArenaPtr arena(acceptor_->CreateArena());
+ ScopedArenaPtr arena(call_arena_allocator_->MakeArena());
absl::Status status = transport.DeserializeFrame(
frame_header, std::move(buffers), arena.get(), fragment_frame,
FrameLimits{1024 * 1024 * 1024, aligned_bytes_ - 1});
absl::optional call_initiator;
if (status.ok()) {
- auto create_call_result = acceptor_->CreateCall(
- std::move(fragment_frame.headers), arena.release());
- if (grpc_chaotic_good_trace.enabled()) {
- gpr_log(GPR_INFO,
- "CHAOTIC_GOOD: DeserializeAndPushFragmentToNewCall: "
- "create_call_result=%s",
- create_call_result.ok()
- ? "ok"
- : create_call_result.status().ToString().c_str());
- }
- if (create_call_result.ok()) {
- call_initiator.emplace(std::move(*create_call_result));
- auto add_result = NewStream(frame_header.stream_id, *call_initiator);
- if (add_result.ok()) {
- call_initiator->SpawnGuarded(
- "server-write", [this, stream_id = frame_header.stream_id,
- call_initiator = *call_initiator]() {
- return CallOutboundLoop(stream_id, call_initiator);
- });
- } else {
- call_initiator.reset();
- status = add_result;
- }
+ auto call =
+ MakeCallPair(std::move(fragment_frame.headers), event_engine_.get(),
+ arena.release(), call_arena_allocator_, nullptr);
+ call_initiator.emplace(std::move(call.initiator));
+ auto add_result = NewStream(frame_header.stream_id, *call_initiator);
+ if (add_result.ok()) {
+ call_destination_->StartCall(std::move(call.handler));
+ call_initiator->SpawnGuarded(
+ "server-write", [this, stream_id = frame_header.stream_id,
+ call_initiator = *call_initiator]() {
+ return CallOutboundLoop(stream_id, call_initiator);
+ });
} else {
- status = create_call_result.status();
+ call_initiator.reset();
+ status = add_result;
}
}
return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status),
@@ -366,10 +356,13 @@ ChaoticGoodServerTransport::ChaoticGoodServerTransport(
PromiseEndpoint data_endpoint,
std::shared_ptr event_engine,
HPackParser hpack_parser, HPackCompressor hpack_encoder)
- : outgoing_frames_(4),
- allocator_(args.GetObject()
- ->memory_quota()
- ->CreateMemoryAllocator("chaotic-good")) {
+ : call_arena_allocator_(MakeRefCounted(
+ args.GetObject()
+ ->memory_quota()
+ ->CreateMemoryAllocator("chaotic-good"),
+ 1024)),
+ event_engine_(event_engine),
+ outgoing_frames_(4) {
auto transport = MakeRefCounted(
std::move(control_endpoint), std::move(data_endpoint),
std::move(hpack_parser), std::move(hpack_encoder));
@@ -381,20 +374,25 @@ ChaoticGoodServerTransport::ChaoticGoodServerTransport(
OnTransportActivityDone("reader"));
}
-void ChaoticGoodServerTransport::SetAcceptor(Acceptor* acceptor) {
- CHECK_EQ(acceptor_, nullptr);
- CHECK_NE(acceptor, nullptr);
- acceptor_ = acceptor;
+void ChaoticGoodServerTransport::SetCallDestination(
+ RefCountedPtr call_destination) {
+ CHECK(call_destination_ == nullptr);
+ CHECK(call_destination != nullptr);
+ call_destination_ = call_destination;
got_acceptor_.Set();
}
-ChaoticGoodServerTransport::~ChaoticGoodServerTransport() {
- if (writer_ != nullptr) {
- writer_.reset();
- }
- if (reader_ != nullptr) {
- reader_.reset();
+void ChaoticGoodServerTransport::Orphan() {
+ ActivityPtr writer;
+ ActivityPtr reader;
+ {
+ MutexLock lock(&mu_);
+ writer = std::move(writer_);
+ reader = std::move(reader_);
}
+ writer.reset();
+ reader.reset();
+ Unref();
}
void ChaoticGoodServerTransport::AbortWithError() {
diff --git a/src/core/ext/transport/chaotic_good/server_transport.h b/src/core/ext/transport/chaotic_good/server_transport.h
index a34ac92b73e..140641abcf6 100644
--- a/src/core/ext/transport/chaotic_good/server_transport.h
+++ b/src/core/ext/transport/chaotic_good/server_transport.h
@@ -86,7 +86,6 @@ class ChaoticGoodServerTransport final : public ServerTransport {
std::shared_ptr
event_engine,
HPackParser hpack_parser, HPackCompressor hpack_encoder);
- ~ChaoticGoodServerTransport() override;
FilterStackTransport* filter_stack_transport() override { return nullptr; }
ClientTransport* client_transport() override { return nullptr; }
@@ -96,9 +95,10 @@ class ChaoticGoodServerTransport final : public ServerTransport {
void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
void PerformOp(grpc_transport_op*) override;
grpc_endpoint* GetEndpoint() override { return nullptr; }
- void Orphan() override { Unref(); }
+ void Orphan() override;
- void SetAcceptor(Acceptor* acceptor) override;
+ void SetCallDestination(
+ RefCountedPtr call_destination) override;
void AbortWithError();
private:
@@ -137,7 +137,10 @@ class ChaoticGoodServerTransport final : public ServerTransport {
auto PushFragmentIntoCall(CallInitiator call_initiator,
ClientFragmentFrame frame, uint32_t stream_id);
- Acceptor* acceptor_ = nullptr;
+ RefCountedPtr call_destination_;
+ const RefCountedPtr call_arena_allocator_;
+ const std::shared_ptr
+ event_engine_;
InterActivityLatch got_acceptor_;
MpscReceiver outgoing_frames_;
// Assigned aligned bytes from setting frame.
@@ -146,7 +149,6 @@ class ChaoticGoodServerTransport final : public ServerTransport {
// Map of stream incoming server frames, key is stream_id.
StreamMap stream_map_ ABSL_GUARDED_BY(mu_);
uint32_t last_seen_new_stream_id_ = 0;
- grpc_event_engine::experimental::MemoryAllocator allocator_;
ActivityPtr writer_ ABSL_GUARDED_BY(mu_);
ActivityPtr reader_ ABSL_GUARDED_BY(mu_);
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(mu_){
diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc
index 7dd316bfca8..dbbd3e63123 100644
--- a/src/core/ext/transport/inproc/inproc_transport.cc
+++ b/src/core/ext/transport/inproc/inproc_transport.cc
@@ -28,6 +28,7 @@
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/try_seq.h"
+#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/surface/channel_create.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/server/server.h"
@@ -39,8 +40,18 @@ class InprocClientTransport;
class InprocServerTransport final : public ServerTransport {
public:
- void SetAcceptor(Acceptor* acceptor) override {
- acceptor_ = acceptor;
+ explicit InprocServerTransport(const ChannelArgs& args)
+ : event_engine_(
+ args.GetObjectRef()),
+ call_arena_allocator_(MakeRefCounted(
+ args.GetObject()
+ ->memory_quota()
+ ->CreateMemoryAllocator("inproc_server"),
+ 1024)) {}
+
+ void SetCallDestination(
+ RefCountedPtr unstarted_call_handler) override {
+ unstarted_call_handler_ = unstarted_call_handler;
ConnectionState expect = ConnectionState::kInitial;
state_.compare_exchange_strong(expect, ConnectionState::kReady,
std::memory_order_acq_rel,
@@ -95,7 +106,11 @@ class InprocServerTransport final : public ServerTransport {
case ConnectionState::kReady:
break;
}
- return acceptor_->CreateCall(std::move(md), acceptor_->CreateArena());
+ auto* arena = call_arena_allocator_->MakeArena();
+ auto server_call = MakeCallPair(std::move(md), event_engine_.get(), arena,
+ call_arena_allocator_, nullptr);
+ unstarted_call_handler_->StartCall(std::move(server_call.handler));
+ return std::move(server_call.initiator);
}
OrphanablePtr MakeClientTransport();
@@ -105,11 +120,14 @@ class InprocServerTransport final : public ServerTransport {
std::atomic state_{ConnectionState::kInitial};
std::atomic disconnecting_{false};
- Acceptor* acceptor_;
+ RefCountedPtr unstarted_call_handler_;
absl::Status disconnect_error_;
Mutex state_tracker_mu_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(state_tracker_mu_){
"inproc_server_transport", GRPC_CHANNEL_CONNECTING};
+ const std::shared_ptr
+ event_engine_;
+ const RefCountedPtr call_arena_allocator_;
};
class InprocClientTransport final : public ClientTransport {
@@ -118,16 +136,19 @@ class InprocClientTransport final : public ClientTransport {
RefCountedPtr server_transport)
: server_transport_(std::move(server_transport)) {}
- void StartCall(CallHandler call_handler) override {
- call_handler.SpawnGuarded(
+ void StartCall(CallHandler child_call_handler) override {
+ child_call_handler.SpawnGuarded(
"pull_initial_metadata",
- TrySeq(call_handler.PullClientInitialMetadata(),
+ TrySeq(child_call_handler.PullClientInitialMetadata(),
[server_transport = server_transport_,
- call_handler](ClientMetadataHandle md) {
- auto call_initiator =
+ child_call_handler](ClientMetadataHandle md) {
+ auto server_call_initiator =
server_transport->AcceptCall(std::move(md));
- if (!call_initiator.ok()) return call_initiator.status();
- ForwardCall(call_handler, std::move(*call_initiator));
+ if (!server_call_initiator.ok()) {
+ return server_call_initiator.status();
+ }
+ ForwardCall(child_call_handler,
+ std::move(*server_call_initiator));
return absl::OkStatus();
}));
}
@@ -155,7 +176,6 @@ class InprocClientTransport final : public ClientTransport {
bool UsePromiseBasedTransport() {
if (!IsPromiseBasedInprocTransportEnabled()) return false;
CHECK(IsPromiseBasedClientCallEnabled());
- CHECK(IsPromiseBasedServerCallEnabled());
return true;
}
@@ -180,7 +200,7 @@ OrphanablePtr MakeLameChannel(absl::string_view why,
OrphanablePtr MakeInprocChannel(Server* server,
ChannelArgs client_channel_args) {
- auto transports = MakeInProcessTransportPair();
+ auto transports = MakeInProcessTransportPair(server->channel_args());
auto client_transport = std::move(transports.first);
auto server_transport = std::move(transports.second);
auto error =
@@ -205,8 +225,9 @@ OrphanablePtr MakeInprocChannel(Server* server,
} // namespace
std::pair, OrphanablePtr>
-MakeInProcessTransportPair() {
- auto server_transport = MakeOrphanable();
+MakeInProcessTransportPair(const ChannelArgs& server_channel_args) {
+ auto server_transport =
+ MakeOrphanable(server_channel_args);
auto client_transport = server_transport->MakeClientTransport();
return std::make_pair(std::move(client_transport),
std::move(server_transport));
diff --git a/src/core/ext/transport/inproc/inproc_transport.h b/src/core/ext/transport/inproc/inproc_transport.h
index 676b5f1fa32..46dc9b540f1 100644
--- a/src/core/ext/transport/inproc/inproc_transport.h
+++ b/src/core/ext/transport/inproc/inproc_transport.h
@@ -30,7 +30,7 @@ extern grpc_core::TraceFlag grpc_inproc_trace;
namespace grpc_core {
std::pair, OrphanablePtr>
-MakeInProcessTransportPair();
+MakeInProcessTransportPair(const ChannelArgs& server_channel_args);
}
diff --git a/src/core/lib/channel/channel_stack.cc b/src/core/lib/channel/channel_stack.cc
index 9b28a321db1..c428bac0006 100644
--- a/src/core/lib/channel/channel_stack.cc
+++ b/src/core/lib/channel/channel_stack.cc
@@ -305,13 +305,6 @@ grpc_core::NextPromiseFactory ClientNext(grpc_channel_element* elem) {
};
}
-grpc_core::NextPromiseFactory ServerNext(grpc_channel_element* elem) {
- return [elem](grpc_core::CallArgs args) {
- return elem->filter->make_call_promise(elem, std::move(args),
- ServerNext(elem - 1));
- };
-}
-
} // namespace
grpc_core::ArenaPromise
@@ -319,12 +312,6 @@ grpc_channel_stack::MakeClientCallPromise(grpc_core::CallArgs call_args) {
return ClientNext(grpc_channel_stack_element(this, 0))(std::move(call_args));
}
-grpc_core::ArenaPromise
-grpc_channel_stack::MakeServerCallPromise(grpc_core::CallArgs call_args) {
- return ServerNext(grpc_channel_stack_element(this, this->count - 1))(
- std::move(call_args));
-}
-
void grpc_channel_stack::InitClientCallSpine(
grpc_core::CallSpineInterface* call) {
for (size_t i = 0; i < count; i++) {
@@ -338,19 +325,6 @@ void grpc_channel_stack::InitClientCallSpine(
}
}
-void grpc_channel_stack::InitServerCallSpine(
- grpc_core::CallSpineInterface* call) {
- for (size_t i = 0; i < count; i++) {
- auto* elem = grpc_channel_stack_element(this, count - 1 - i);
- if (elem->filter->init_call == nullptr) {
- grpc_core::Crash(
- absl::StrCat("Filter '", elem->filter->name,
- "' does not support the call-v3 interface"));
- }
- elem->filter->init_call(elem, call);
- }
-}
-
void grpc_call_log_op(const char* file, int line, gpr_log_severity severity,
grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index affd34a4191..030173fcd31 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -241,7 +241,6 @@ struct grpc_channel_stack {
MakeServerCallPromise(grpc_core::CallArgs call_args);
void InitClientCallSpine(grpc_core::CallSpineInterface* call);
- void InitServerCallSpine(grpc_core::CallSpineInterface* call);
};
// A call stack tracks a set of related filters for one call, and guarantees
diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc
index c5e2d3fb3b9..4bf01759fc2 100644
--- a/src/core/lib/experiments/experiments.cc
+++ b/src/core/lib/experiments/experiments.cc
@@ -88,23 +88,17 @@ const char* const additional_constraints_promise_based_client_call = "{}";
const uint8_t required_experiments_promise_based_client_call[] = {
static_cast(grpc_core::kExperimentIdEventEngineClient),
static_cast(grpc_core::kExperimentIdEventEngineListener)};
-const char* const description_promise_based_server_call =
- "If set, use the new gRPC promise based call code when it's appropriate "
- "(ie when all filters in a stack are promise based)";
-const char* const additional_constraints_promise_based_server_call = "{}";
const char* const description_chaotic_good =
"If set, enable the chaotic good load transport (this is mostly here for "
"testing)";
const char* const additional_constraints_chaotic_good = "{}";
const uint8_t required_experiments_chaotic_good[] = {
- static_cast(grpc_core::kExperimentIdPromiseBasedClientCall),
- static_cast(grpc_core::kExperimentIdPromiseBasedServerCall)};
+ static_cast(grpc_core::kExperimentIdPromiseBasedClientCall)};
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const uint8_t required_experiments_promise_based_inproc_transport[] = {
- static_cast(grpc_core::kExperimentIdPromiseBasedClientCall),
- static_cast(grpc_core::kExperimentIdPromiseBasedServerCall)};
+ static_cast(grpc_core::kExperimentIdPromiseBasedClientCall)};
const char* const description_rstpit =
"On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short "
"duration";
@@ -184,15 +178,13 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"promise_based_client_call", description_promise_based_client_call,
additional_constraints_promise_based_client_call,
required_experiments_promise_based_client_call, 2, false, true},
- {"promise_based_server_call", description_promise_based_server_call,
- additional_constraints_promise_based_server_call, nullptr, 0, false, true},
{"chaotic_good", description_chaotic_good,
- additional_constraints_chaotic_good, required_experiments_chaotic_good, 2,
+ additional_constraints_chaotic_good, required_experiments_chaotic_good, 1,
false, true},
{"promise_based_inproc_transport",
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport,
- required_experiments_promise_based_inproc_transport, 2, false, false},
+ required_experiments_promise_based_inproc_transport, 1, false, false},
{"rstpit", description_rstpit, additional_constraints_rstpit, nullptr, 0,
false, true},
{"schedule_cancellation_over_write",
@@ -288,23 +280,17 @@ const char* const additional_constraints_promise_based_client_call = "{}";
const uint8_t required_experiments_promise_based_client_call[] = {
static_cast(grpc_core::kExperimentIdEventEngineClient),
static_cast(grpc_core::kExperimentIdEventEngineListener)};
-const char* const description_promise_based_server_call =
- "If set, use the new gRPC promise based call code when it's appropriate "
- "(ie when all filters in a stack are promise based)";
-const char* const additional_constraints_promise_based_server_call = "{}";
const char* const description_chaotic_good =
"If set, enable the chaotic good load transport (this is mostly here for "
"testing)";
const char* const additional_constraints_chaotic_good = "{}";
const uint8_t required_experiments_chaotic_good[] = {
- static_cast(grpc_core::kExperimentIdPromiseBasedClientCall),
- static_cast(grpc_core::kExperimentIdPromiseBasedServerCall)};
+ static_cast(grpc_core::kExperimentIdPromiseBasedClientCall)};
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const uint8_t required_experiments_promise_based_inproc_transport[] = {
- static_cast(grpc_core::kExperimentIdPromiseBasedClientCall),
- static_cast(grpc_core::kExperimentIdPromiseBasedServerCall)};
+ static_cast(grpc_core::kExperimentIdPromiseBasedClientCall)};
const char* const description_rstpit =
"On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short "
"duration";
@@ -384,15 +370,13 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"promise_based_client_call", description_promise_based_client_call,
additional_constraints_promise_based_client_call,
required_experiments_promise_based_client_call, 2, false, true},
- {"promise_based_server_call", description_promise_based_server_call,
- additional_constraints_promise_based_server_call, nullptr, 0, false, true},
{"chaotic_good", description_chaotic_good,
- additional_constraints_chaotic_good, required_experiments_chaotic_good, 2,
+ additional_constraints_chaotic_good, required_experiments_chaotic_good, 1,
false, true},
{"promise_based_inproc_transport",
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport,
- required_experiments_promise_based_inproc_transport, 2, false, false},
+ required_experiments_promise_based_inproc_transport, 1, false, false},
{"rstpit", description_rstpit, additional_constraints_rstpit, nullptr, 0,
false, true},
{"schedule_cancellation_over_write",
@@ -488,23 +472,17 @@ const char* const additional_constraints_promise_based_client_call = "{}";
const uint8_t required_experiments_promise_based_client_call[] = {
static_cast(grpc_core::kExperimentIdEventEngineClient),
static_cast(grpc_core::kExperimentIdEventEngineListener)};
-const char* const description_promise_based_server_call =
- "If set, use the new gRPC promise based call code when it's appropriate "
- "(ie when all filters in a stack are promise based)";
-const char* const additional_constraints_promise_based_server_call = "{}";
const char* const description_chaotic_good =
"If set, enable the chaotic good load transport (this is mostly here for "
"testing)";
const char* const additional_constraints_chaotic_good = "{}";
const uint8_t required_experiments_chaotic_good[] = {
- static_cast(grpc_core::kExperimentIdPromiseBasedClientCall),
- static_cast(grpc_core::kExperimentIdPromiseBasedServerCall)};
+ static_cast(grpc_core::kExperimentIdPromiseBasedClientCall)};
const char* const description_promise_based_inproc_transport =
"Use promises for the in-process transport.";
const char* const additional_constraints_promise_based_inproc_transport = "{}";
const uint8_t required_experiments_promise_based_inproc_transport[] = {
- static_cast(grpc_core::kExperimentIdPromiseBasedClientCall),
- static_cast(grpc_core::kExperimentIdPromiseBasedServerCall)};
+ static_cast(grpc_core::kExperimentIdPromiseBasedClientCall)};
const char* const description_rstpit =
"On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short "
"duration";
@@ -584,15 +562,13 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"promise_based_client_call", description_promise_based_client_call,
additional_constraints_promise_based_client_call,
required_experiments_promise_based_client_call, 2, false, true},
- {"promise_based_server_call", description_promise_based_server_call,
- additional_constraints_promise_based_server_call, nullptr, 0, false, true},
{"chaotic_good", description_chaotic_good,
- additional_constraints_chaotic_good, required_experiments_chaotic_good, 2,
+ additional_constraints_chaotic_good, required_experiments_chaotic_good, 1,
false, true},
{"promise_based_inproc_transport",
description_promise_based_inproc_transport,
additional_constraints_promise_based_inproc_transport,
- required_experiments_promise_based_inproc_transport, 2, false, false},
+ required_experiments_promise_based_inproc_transport, 1, false, false},
{"rstpit", description_rstpit, additional_constraints_rstpit, nullptr, 0,
false, true},
{"schedule_cancellation_over_write",
diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h
index 65fb7ebfa80..16220be2591 100644
--- a/src/core/lib/experiments/experiments.h
+++ b/src/core/lib/experiments/experiments.h
@@ -79,7 +79,6 @@ inline bool IsPendingQueueCapEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW
inline bool IsPickFirstNewEnabled() { return true; }
inline bool IsPromiseBasedClientCallEnabled() { return false; }
-inline bool IsPromiseBasedServerCallEnabled() { return false; }
inline bool IsChaoticGoodEnabled() { return false; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsRstpitEnabled() { return false; }
@@ -119,7 +118,6 @@ inline bool IsPendingQueueCapEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW
inline bool IsPickFirstNewEnabled() { return true; }
inline bool IsPromiseBasedClientCallEnabled() { return false; }
-inline bool IsPromiseBasedServerCallEnabled() { return false; }
inline bool IsChaoticGoodEnabled() { return false; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsRstpitEnabled() { return false; }
@@ -159,7 +157,6 @@ inline bool IsPendingQueueCapEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW
inline bool IsPickFirstNewEnabled() { return true; }
inline bool IsPromiseBasedClientCallEnabled() { return false; }
-inline bool IsPromiseBasedServerCallEnabled() { return false; }
inline bool IsChaoticGoodEnabled() { return false; }
inline bool IsPromiseBasedInprocTransportEnabled() { return false; }
inline bool IsRstpitEnabled() { return false; }
@@ -195,7 +192,6 @@ enum ExperimentIds {
kExperimentIdPendingQueueCap,
kExperimentIdPickFirstNew,
kExperimentIdPromiseBasedClientCall,
- kExperimentIdPromiseBasedServerCall,
kExperimentIdChaoticGood,
kExperimentIdPromiseBasedInprocTransport,
kExperimentIdRstpit,
@@ -277,10 +273,6 @@ inline bool IsPickFirstNewEnabled() {
inline bool IsPromiseBasedClientCallEnabled() {
return IsExperimentEnabled(kExperimentIdPromiseBasedClientCall);
}
-#define GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
-inline bool IsPromiseBasedServerCallEnabled() {
- return IsExperimentEnabled(kExperimentIdPromiseBasedServerCall);
-}
#define GRPC_EXPERIMENT_IS_INCLUDED_CHAOTIC_GOOD
inline bool IsChaoticGoodEnabled() {
return IsExperimentEnabled(kExperimentIdChaoticGood);
diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml
index 0712c8afd85..0e00f5c4d68 100644
--- a/src/core/lib/experiments/experiments.yaml
+++ b/src/core/lib/experiments/experiments.yaml
@@ -64,7 +64,7 @@
If set, enable the chaotic good load transport (this is mostly here for testing)
expiry: 2024/09/09
owner: ctiller@google.com
- requires: [promise_based_client_call, promise_based_server_call]
+ requires: [promise_based_client_call]
test_tags: [core_end2end_test]
- name: client_privacy
description:
@@ -170,14 +170,7 @@
owner: ctiller@google.com
test_tags: []
allow_in_fuzzing_config: false # experiment currently crashes if enabled
- requires: [promise_based_client_call, promise_based_server_call]
-- name: promise_based_server_call
- description:
- If set, use the new gRPC promise based call code when it's appropriate
- (ie when all filters in a stack are promise based)
- expiry: 2024/06/14
- owner: ctiller@google.com
- test_tags: ["core_end2end_test", "logging_test"]
+ requires: [promise_based_client_call]
- name: rstpit
description:
On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short duration
diff --git a/src/core/lib/experiments/rollouts.yaml b/src/core/lib/experiments/rollouts.yaml
index f9d968d62a2..97018a672cd 100644
--- a/src/core/lib/experiments/rollouts.yaml
+++ b/src/core/lib/experiments/rollouts.yaml
@@ -94,8 +94,6 @@
ios: broken
windows: broken
posix: false
-- name: promise_based_server_call
- default: false
- name: rstpit
default: false
- name: schedule_cancellation_over_write
diff --git a/src/core/lib/promise/detail/promise_like.h b/src/core/lib/promise/detail/promise_like.h
index a8b80172c22..8e73577675a 100644
--- a/src/core/lib/promise/detail/promise_like.h
+++ b/src/core/lib/promise/detail/promise_like.h
@@ -17,6 +17,7 @@
#include
+#include "absl/functional/any_invocable.h"
#include "absl/meta/type_traits.h"
#include
@@ -63,6 +64,10 @@ auto WrapInPoll(T&& x) -> decltype(PollWrapper::Wrap(std::forward(x))) {
return PollWrapper::Wrap(std::forward(x));
}
+// T -> T, const T& -> T
+template
+using RemoveCVRef = absl::remove_cv_t>;
+
template
class PromiseLike;
@@ -73,7 +78,7 @@ template
class PromiseLike::type>::value>> {
private:
- GPR_NO_UNIQUE_ADDRESS F f_;
+ GPR_NO_UNIQUE_ADDRESS RemoveCVRef f_;
public:
// NOLINTNEXTLINE - internal detail that drastically simplifies calling code.
@@ -82,10 +87,6 @@ class PromiseLike::Type;
};
-// T -> T, const T& -> T
-template
-using RemoveCVRef = absl::remove_cv_t>;
-
} // namespace promise_detail
} // namespace grpc_core
diff --git a/src/core/lib/promise/status_flag.h b/src/core/lib/promise/status_flag.h
index 38d4cf5b720..132b2079c07 100644
--- a/src/core/lib/promise/status_flag.h
+++ b/src/core/lib/promise/status_flag.h
@@ -171,6 +171,8 @@ class ValueOrFailure {
T& value() { return value_.value(); }
const T& operator*() const { return *value_; }
T& operator*() { return *value_; }
+ const T* operator->() const { return &*value_; }
+ T* operator->() { return &*value_; }
bool operator==(const ValueOrFailure& other) const {
return value_ == other.value_;
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 6d2457dba2d..0d268a21572 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -125,7 +125,7 @@ using GrpcClosure = Closure;
Call::ParentCall* Call::GetOrCreateParentCall() {
ParentCall* p = parent_call_.load(std::memory_order_acquire);
if (p == nullptr) {
- p = arena_->New();
+ p = arena()->New();
ParentCall* expected = nullptr;
if (!parent_call_.compare_exchange_strong(expected, p,
std::memory_order_release,
@@ -244,28 +244,6 @@ void Call::PropagateCancellationToChildren() {
}
}
-char* Call::GetPeer() {
- Slice peer_slice = GetPeerString();
- if (!peer_slice.empty()) {
- absl::string_view peer_string_view = peer_slice.as_string_view();
- char* peer_string =
- static_cast(gpr_malloc(peer_string_view.size() + 1));
- memcpy(peer_string, peer_string_view.data(), peer_string_view.size());
- peer_string[peer_string_view.size()] = '\0';
- return peer_string;
- }
- char* peer_string = grpc_channel_get_target(channel_->c_ptr());
- if (peer_string != nullptr) return peer_string;
- return gpr_strdup("unknown");
-}
-
-void Call::DeleteThis() {
- RefCountedPtr channel = std::move(channel_);
- Arena* arena = arena_;
- this->~Call();
- channel->DestroyArena(arena);
-}
-
void Call::PrepareOutgoingInitialMetadata(const grpc_op& op,
grpc_metadata_batch& md) {
// TODO(juanlishen): If the user has already specified a compression
@@ -280,7 +258,7 @@ void Call::PrepareOutgoingInitialMetadata(const grpc_op& op,
op.data.send_initial_metadata.maybe_compression_level.level;
level_set = true;
} else {
- const grpc_compression_options copts = channel()->compression_options();
+ const grpc_compression_options copts = compression_options();
if (copts.default_level.is_set) {
level_set = true;
effective_compression_level = copts.default_level.level;
@@ -312,13 +290,12 @@ void Call::ProcessIncomingInitialMetadata(grpc_metadata_batch& md) {
md.Take(GrpcAcceptEncodingMetadata())
.value_or(CompressionAlgorithmSet{GRPC_COMPRESS_NONE});
- const grpc_compression_options compression_options =
- channel_->compression_options();
+ const grpc_compression_options copts = compression_options();
const grpc_compression_algorithm compression_algorithm =
incoming_compression_algorithm_;
- if (GPR_UNLIKELY(!CompressionAlgorithmSet::FromUint32(
- compression_options.enabled_algorithms_bitset)
- .IsSet(compression_algorithm))) {
+ if (GPR_UNLIKELY(
+ !CompressionAlgorithmSet::FromUint32(copts.enabled_algorithms_bitset)
+ .IsSet(compression_algorithm))) {
// check if algorithm is supported by current channel config
HandleCompressionAlgorithmDisabled(compression_algorithm);
}
@@ -368,22 +345,20 @@ void Call::UpdateDeadline(Timestamp deadline) {
StatusIntProperty::kRpcStatus, GRPC_STATUS_DEADLINE_EXCEEDED));
return;
}
- auto* const event_engine = channel()->event_engine();
if (deadline_ != Timestamp::InfFuture()) {
- if (!event_engine->Cancel(deadline_task_)) return;
+ if (!event_engine_->Cancel(deadline_task_)) return;
} else {
InternalRef("deadline");
}
deadline_ = deadline;
- deadline_task_ = event_engine->RunAfter(deadline - Timestamp::Now(), this);
+ deadline_task_ = event_engine_->RunAfter(deadline - Timestamp::Now(), this);
}
void Call::ResetDeadline() {
{
MutexLock lock(&deadline_mu_);
if (deadline_ == Timestamp::InfFuture()) return;
- auto* const event_engine = channel()->event_engine();
- if (!event_engine->Cancel(deadline_task_)) return;
+ if (!event_engine_->Cancel(deadline_task_)) return;
deadline_ = Timestamp::InfFuture();
}
InternalUnref("deadline[reset]");
@@ -398,11 +373,69 @@ void Call::Run() {
InternalUnref("deadline[run]");
}
+///////////////////////////////////////////////////////////////////////////////
+// ChannelBasedCall
+// TODO(ctiller): once we remove the v2 client code this can be folded into
+// FilterStackCall
+
+class ChannelBasedCall : public Call {
+ protected:
+ ChannelBasedCall(Arena* arena, bool is_client, Timestamp send_deadline,
+ RefCountedPtr channel)
+ : Call(is_client, send_deadline, channel->event_engine()),
+ arena_(arena),
+ channel_(std::move(channel)) {
+ DCHECK_NE(arena_, nullptr);
+ }
+
+ Arena* arena() final { return arena_; }
+
+ char* GetPeer() final {
+ Slice peer_slice = GetPeerString();
+ if (!peer_slice.empty()) {
+ absl::string_view peer_string_view = peer_slice.as_string_view();
+ char* peer_string =
+ static_cast(gpr_malloc(peer_string_view.size() + 1));
+ memcpy(peer_string, peer_string_view.data(), peer_string_view.size());
+ peer_string[peer_string_view.size()] = '\0';
+ return peer_string;
+ }
+ char* peer_string = grpc_channel_get_target(channel_->c_ptr());
+ if (peer_string != nullptr) return peer_string;
+ return gpr_strdup("unknown");
+ }
+
+ grpc_event_engine::experimental::EventEngine* event_engine() const override {
+ return channel_->event_engine();
+ }
+
+ grpc_compression_options compression_options() override {
+ return channel_->compression_options();
+ }
+
+ void DeleteThis() {
+ RefCountedPtr channel = std::move(channel_);
+ Arena* arena = arena_;
+ this->~ChannelBasedCall();
+ channel->DestroyArena(arena);
+ }
+
+ Channel* channel() const { return channel_.get(); }
+
+ protected:
+ // Non-virtual arena accessor -- needed by PipeBasedCall
+ Arena* GetArena() { return arena_; }
+
+ private:
+ Arena* const arena_;
+ RefCountedPtr channel_;
+};
+
///////////////////////////////////////////////////////////////////////////////
// FilterStackCall
// To be removed once promise conversion is complete
-class FilterStackCall final : public Call {
+class FilterStackCall final : public ChannelBasedCall {
public:
~FilterStackCall() override {
for (int i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
@@ -431,10 +464,6 @@ class FilterStackCall final : public Call {
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(*this)));
}
- grpc_event_engine::experimental::EventEngine* event_engine() const override {
- return channel()->event_engine();
- }
-
grpc_call_element* call_elem(size_t idx) {
return grpc_call_stack_element(call_stack(), idx);
}
@@ -570,8 +599,8 @@ class FilterStackCall final : public Call {
};
FilterStackCall(Arena* arena, const grpc_call_create_args& args)
- : Call(arena, args.server_transport_data == nullptr, args.send_deadline,
- args.channel->Ref()),
+ : ChannelBasedCall(arena, args.server_transport_data == nullptr,
+ args.send_deadline, args.channel->Ref()),
cq_(args.cq),
stream_op_payload_(context_) {
context_[GRPC_CONTEXT_CALL].value = this;
@@ -1874,15 +1903,15 @@ bool ValidateMetadata(size_t count, grpc_metadata* metadata) {
// PromiseBasedCall
// Will be folded into Call once the promise conversion is done
-class BasicPromiseBasedCall : public Call, public Party {
+class BasicPromiseBasedCall : public ChannelBasedCall, public Party {
public:
using Call::arena;
BasicPromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
uint32_t initial_internal_refs,
const grpc_call_create_args& args)
- : Call(arena, args.server_transport_data == nullptr, args.send_deadline,
- args.channel->Ref()),
+ : ChannelBasedCall(arena, args.server_transport_data == nullptr,
+ args.send_deadline, args.channel->Ref()),
Party(initial_internal_refs),
external_refs_(initial_external_refs),
cq_(args.cq) {
@@ -1903,7 +1932,6 @@ class BasicPromiseBasedCall : public Call, public Party {
virtual void OrphanCall() = 0;
- virtual ServerCallContext* server_call_context() { return nullptr; }
void SetCompletionQueue(grpc_completion_queue* cq) final {
cq_ = cq;
GRPC_CQ_INTERNAL_REF(cq, "bind");
@@ -2533,10 +2561,6 @@ void CallContext::IncrementRefCount(const char* reason) {
void CallContext::Unref(const char* reason) { call_->InternalUnref(reason); }
-ServerCallContext* CallContext::server_call_context() {
- return call_->server_call_context();
-}
-
RefCountedPtr CallContext::MakeCallSpine(
CallArgs call_args) {
return call_->MakeCallSpine(std::move(call_args));
@@ -2780,12 +2804,12 @@ class ClientPromiseBasedCall final : public PromiseBasedCall {
void PublishInitialMetadata(ServerMetadata* metadata);
ClientMetadataHandle send_initial_metadata_;
- Pipe server_initial_metadata_{arena()};
+ Pipe server_initial_metadata_{GetArena()};
Latch server_trailing_metadata_;
Latch cancel_error_;
Latch polling_entity_;
- Pipe client_to_server_messages_{arena()};
- Pipe server_to_client_messages_{arena()};
+ Pipe client_to_server_messages_{GetArena()};
+ Pipe server_to_client_messages_{GetArena()};
bool is_trailers_only_ = false;
bool scheduled_receive_status_ = false;
bool scheduled_send_close_ = false;
@@ -3092,242 +3116,7 @@ void ClientPromiseBasedCall::StartRecvStatusOnClient(
#endif
///////////////////////////////////////////////////////////////////////////////
-// ServerPromiseBasedCall
-
-#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
-
-class ServerPromiseBasedCall final : public PromiseBasedCall,
- public ServerCallContext {
- public:
- ServerPromiseBasedCall(Arena* arena, grpc_call_create_args* args);
-
- void OrphanCall() override {}
- void CancelWithError(grpc_error_handle) override;
- grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
- bool is_notify_tag_closure) override;
- bool is_trailers_only() const override {
- Crash("is_trailers_only not implemented for server calls");
- }
- absl::string_view GetServerAuthority() const override {
- const Slice* authority_metadata =
- client_initial_metadata_->get_pointer(HttpAuthorityMetadata());
- if (authority_metadata == nullptr) return "";
- return authority_metadata->as_string_view();
- }
-
- // Polling order for the server promise stack:
- //
- // │ ┌───────────────────────────────────────┐
- // │ │ ServerPromiseBasedCall ├──► Lifetime management
- // │ ├───────────────────────────────────────┤
- // │ │ ConnectedChannel ├─┐
- // │ ├───────────────────────────────────────┤ └► Interactions with the
- // │ │ ... closest to transport filter │ transport - send/recv msgs
- // │ ├───────────────────────────────────────┤ and metadata, call phase
- // │ │ ... │ ordering
- // │ ├───────────────────────────────────────┤
- // │ │ ... closest to app filter │ ┌► Request matching, initial
- // │ ├───────────────────────────────────────┤ │ setup, publishing call to
- // │ │ Server::ChannelData::MakeCallPromise ├─┘ application
- // │ ├───────────────────────────────────────┤
- // │ │ MakeTopOfServerCallPromise ├──► Send trailing metadata
- // ▼ └───────────────────────────────────────┘
- // Polling &
- // instantiation
- // order
-
- std::string DebugTag() const override {
- return absl::StrFormat("SERVER_CALL[%p]: ", this);
- }
-
- ServerCallContext* server_call_context() override { return this; }
-
- const void* server_stream_data() override { return server_transport_data_; }
- void PublishInitialMetadata(
- ClientMetadataHandle metadata,
- grpc_metadata_array* publish_initial_metadata) override;
- ArenaPromise MakeTopOfServerCallPromise(
- CallArgs call_args, grpc_completion_queue* cq,
- absl::FunctionRef publish) override;
-
- private:
- class RecvCloseOpCancelState {
- public:
- // Request that receiver be filled in per
- // grpc_op_recv_close_on_server. Returns true if the request can
- // be fulfilled immediately. Returns false if the request will be
- // fulfilled later.
- bool ReceiveCloseOnServerOpStarted(int* receiver) {
- uintptr_t state = state_.load(std::memory_order_acquire);
- uintptr_t new_state;
- do {
- switch (state) {
- case kUnset:
- new_state = reinterpret_cast(receiver);
- break;
- case kFinishedWithFailure:
- *receiver = 1;
- return true;
- case kFinishedWithSuccess:
- *receiver = 0;
- return true;
- default:
- Crash("Two threads offered ReceiveCloseOnServerOpStarted");
- }
- } while (!state_.compare_exchange_weak(state, new_state,
- std::memory_order_acq_rel,
- std::memory_order_acquire));
- return false;
- }
-
- // Mark the call as having completed.
- // Returns true if this finishes a previous
- // RequestReceiveCloseOnServer.
- bool CompleteCallWithCancelledSetTo(bool cancelled) {
- uintptr_t state = state_.load(std::memory_order_acquire);
- uintptr_t new_state;
- bool r;
- do {
- switch (state) {
- case kUnset:
- new_state = cancelled ? kFinishedWithFailure : kFinishedWithSuccess;
- r = false;
- break;
- case kFinishedWithFailure:
- return false;
- case kFinishedWithSuccess:
- Crash("unreachable");
- default:
- new_state = cancelled ? kFinishedWithFailure : kFinishedWithSuccess;
- r = true;
- }
- } while (!state_.compare_exchange_weak(state, new_state,
- std::memory_order_acq_rel,
- std::memory_order_acquire));
- if (r) *reinterpret_cast(state) = cancelled ? 1 : 0;
- return r;
- }
-
- std::string ToString() const {
- auto state = state_.load(std::memory_order_relaxed);
- switch (state) {
- case kUnset:
- return "Unset";
- case kFinishedWithFailure:
- return "FinishedWithFailure";
- case kFinishedWithSuccess:
- return "FinishedWithSuccess";
- default:
- return absl::StrFormat("WaitingForReceiver(%p)",
- reinterpret_cast(state));
- }
- }
-
- private:
- static constexpr uintptr_t kUnset = 0;
- static constexpr uintptr_t kFinishedWithFailure = 1;
- static constexpr uintptr_t kFinishedWithSuccess = 2;
- // Holds one of kUnset, kFinishedWithFailure, or
- // kFinishedWithSuccess OR an int* that wants to receive the
- // final status.
- std::atomic state_{kUnset};
- };
-
- void CommitBatch(const grpc_op* ops, size_t nops,
- const Completion& completion);
- void Finish(ServerMetadataHandle result);
-
- ServerInterface* const server_;
- const void* const server_transport_data_;
- PipeSender* server_initial_metadata_ = nullptr;
- PipeSender* server_to_client_messages_ = nullptr;
- PipeReceiver* client_to_server_messages_ = nullptr;
- Latch send_trailing_metadata_;
- RecvCloseOpCancelState recv_close_op_cancel_state_;
- ClientMetadataHandle client_initial_metadata_;
- Completion recv_close_completion_;
- std::atomic cancelled_{false};
-};
-
-ServerPromiseBasedCall::ServerPromiseBasedCall(Arena* arena,
- grpc_call_create_args* args)
- : PromiseBasedCall(arena, 0, *args),
- server_(args->server),
- server_transport_data_(args->server_transport_data) {
- global_stats().IncrementServerCallsCreated();
- channelz::ServerNode* channelz_node = server_->channelz_node();
- if (channelz_node != nullptr) {
- channelz_node->RecordCallStarted();
- }
- ScopedContext activity_context(this);
- // TODO(yashykt): In the future, we want to also enable stats and trace
- // collecting from when the call is created at the transport. The idea is that
- // the transport would create the call tracer and pass it in as part of the
- // metadata.
- // TODO(yijiem): OpenCensus and internal Census is still using this way to
- // set server call tracer. We need to refactor them to stats plugins
- // (including removing the client channel filters).
- if (args->server != nullptr &&
- args->server->server_call_tracer_factory() != nullptr) {
- auto* server_call_tracer =
- args->server->server_call_tracer_factory()->CreateNewServerCallTracer(
- arena, args->server->channel_args());
- if (server_call_tracer != nullptr) {
- // Note that we are setting both
- // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and
- // GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future
- // promise-based world, we would just a single tracer object for each
- // stack (call, subchannel_call, server_call.)
- ContextSet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE,
- server_call_tracer, nullptr);
- ContextSet(GRPC_CONTEXT_CALL_TRACER, server_call_tracer, nullptr);
- }
- }
- args->channel->channel_stack()->stats_plugin_group->AddServerCallTracers(
- context());
- Spawn("server_promise",
- channel()->channel_stack()->MakeServerCallPromise(
- CallArgs{nullptr, ClientInitialMetadataOutstandingToken::Empty(),
- nullptr, nullptr, nullptr, nullptr}),
- [this](ServerMetadataHandle result) { Finish(std::move(result)); });
-}
-
-void ServerPromiseBasedCall::Finish(ServerMetadataHandle result) {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO, "%s[call] Finish: recv_close_state:%s result:%s",
- DebugTag().c_str(), recv_close_op_cancel_state_.ToString().c_str(),
- result->DebugString().c_str());
- }
- const auto status =
- result->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
- channelz::ServerNode* channelz_node = server_->channelz_node();
- if (channelz_node != nullptr) {
- if (status == GRPC_STATUS_OK) {
- channelz_node->RecordCallSucceeded();
- } else {
- channelz_node->RecordCallFailed();
- }
- }
- bool was_cancelled = result->get(GrpcCallWasCancelled()).value_or(true);
- if (recv_close_op_cancel_state_.CompleteCallWithCancelledSetTo(
- was_cancelled)) {
- FinishOpOnCompletion(&recv_close_completion_,
- PendingOp::kReceiveCloseOnServer);
- }
- if (was_cancelled) set_failed_before_recv_message();
- if (server_initial_metadata_ != nullptr) {
- server_initial_metadata_->Close();
- }
- Slice message_slice;
- if (Slice* message = result->get_pointer(GrpcMessageMetadata())) {
- message_slice = message->Ref();
- }
- AcceptTransportStatsFromContext();
- SetFinalizationStatus(status, std::move(message_slice));
- set_completed();
- ResetDeadline();
- PropagateCancellationToChildren();
-}
+// CallSpine based Server Call
grpc_call_error ValidateServerBatch(const grpc_op* ops, size_t nops) {
BitSet<8> got_ops;
@@ -3371,234 +3160,33 @@ grpc_call_error ValidateServerBatch(const grpc_op* ops, size_t nops) {
return GRPC_CALL_OK;
}
-void ServerPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
- const Completion& completion) {
- Party::BulkSpawner spawner(this);
- for (size_t op_idx = 0; op_idx < nops; op_idx++) {
- const grpc_op& op = ops[op_idx];
- switch (op.op) {
- case GRPC_OP_SEND_INITIAL_METADATA: {
- auto metadata = arena()->MakePooled();
- PrepareOutgoingInitialMetadata(op, *metadata);
- CToMetadata(op.data.send_initial_metadata.metadata,
- op.data.send_initial_metadata.count, metadata.get());
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO, "%s[call] Send initial metadata",
- DebugTag().c_str());
- }
- QueueSend();
- spawner.Spawn(
- "call_send_initial_metadata",
- [this, metadata = std::move(metadata)]() mutable {
- EnactSend();
- return server_initial_metadata_->Push(std::move(metadata));
- },
- [this,
- completion = AddOpToCompletion(
- completion, PendingOp::kSendInitialMetadata)](bool r) mutable {
- if (!r) {
- set_failed_before_recv_message();
- FailCompletion(completion);
- }
- FinishOpOnCompletion(&completion,
- PendingOp::kSendInitialMetadata);
- });
- } break;
- case GRPC_OP_SEND_MESSAGE:
- StartSendMessage(op, completion, server_to_client_messages_, spawner);
- break;
- case GRPC_OP_RECV_MESSAGE:
- if (cancelled_.load(std::memory_order_relaxed)) {
- set_failed_before_recv_message();
- FailCompletion(completion);
- break;
- }
- StartRecvMessage(
- op, completion, []() { return []() { return Empty{}; }; },
- client_to_server_messages_, true, spawner);
- break;
- case GRPC_OP_SEND_STATUS_FROM_SERVER: {
- auto metadata = arena()->MakePooled();
- CToMetadata(op.data.send_status_from_server.trailing_metadata,
- op.data.send_status_from_server.trailing_metadata_count,
- metadata.get());
- metadata->Set(GrpcStatusMetadata(),
- op.data.send_status_from_server.status);
- if (auto* details = op.data.send_status_from_server.status_details) {
- // TODO(ctiller): this should not be a copy, but we have callers that
- // allocate and pass in a slice created with
- // grpc_slice_from_static_string and then delete the string after
- // passing it in, which shouldn't be a supported API.
- metadata->Set(GrpcMessageMetadata(),
- Slice(grpc_slice_copy(*details)));
- }
- spawner.Spawn(
- "call_send_status_from_server",
- [this, metadata = std::move(metadata)]() mutable {
- bool r = true;
- if (send_trailing_metadata_.is_set()) {
- r = false;
- } else {
- send_trailing_metadata_.Set(std::move(metadata));
- }
- return Map(WaitForSendingStarted(), [this, r](Empty) {
- server_initial_metadata_->Close();
- server_to_client_messages_->Close();
- return r;
- });
- },
- [this, completion = AddOpToCompletion(
- completion, PendingOp::kSendStatusFromServer)](
- bool ok) mutable {
- if (!ok) {
- set_failed_before_recv_message();
- FailCompletion(completion);
- }
- FinishOpOnCompletion(&completion,
- PendingOp::kSendStatusFromServer);
- });
- } break;
- case GRPC_OP_RECV_CLOSE_ON_SERVER:
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO, "%s[call] StartBatch: RecvClose %s",
- DebugTag().c_str(),
- recv_close_op_cancel_state_.ToString().c_str());
- }
- ForceCompletionSuccess(completion);
- recv_close_completion_ =
- AddOpToCompletion(completion, PendingOp::kReceiveCloseOnServer);
- if (recv_close_op_cancel_state_.ReceiveCloseOnServerOpStarted(
- op.data.recv_close_on_server.cancelled)) {
- FinishOpOnCompletion(&recv_close_completion_,
- PendingOp::kReceiveCloseOnServer);
- }
- break;
- case GRPC_OP_RECV_STATUS_ON_CLIENT:
- case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
- case GRPC_OP_RECV_INITIAL_METADATA:
- abort(); // unreachable
- }
- }
-}
-
-grpc_call_error ServerPromiseBasedCall::StartBatch(const grpc_op* ops,
- size_t nops,
- void* notify_tag,
- bool is_notify_tag_closure) {
- if (nops == 0) {
- EndOpImmediately(cq(), notify_tag, is_notify_tag_closure);
- return GRPC_CALL_OK;
- }
- const grpc_call_error validation_result = ValidateServerBatch(ops, nops);
- if (validation_result != GRPC_CALL_OK) {
- return validation_result;
- }
- Completion completion =
- StartCompletion(notify_tag, is_notify_tag_closure, ops);
- CommitBatch(ops, nops, completion);
- FinishOpOnCompletion(&completion, PendingOp::kStartingBatch);
- return GRPC_CALL_OK;
-}
-
-void ServerPromiseBasedCall::CancelWithError(absl::Status error) {
- cancelled_.store(true, std::memory_order_relaxed);
- Spawn(
- "cancel_with_error",
- [this, error = std::move(error)]() {
- if (!send_trailing_metadata_.is_set()) {
- auto md = ServerMetadataFromStatus(error);
- md->Set(GrpcCallWasCancelled(), true);
- send_trailing_metadata_.Set(std::move(md));
- }
- if (server_to_client_messages_ != nullptr) {
- server_to_client_messages_->Close();
- }
- if (server_initial_metadata_ != nullptr) {
- server_initial_metadata_->Close();
- }
- return Empty{};
- },
- [](Empty) {});
-}
-#endif
-
-#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
-void ServerPromiseBasedCall::PublishInitialMetadata(
- ClientMetadataHandle metadata,
- grpc_metadata_array* publish_initial_metadata) {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO, "%s[call] PublishInitialMetadata: %s", DebugTag().c_str(),
- metadata->DebugString().c_str());
- }
- PublishMetadataArray(metadata.get(), publish_initial_metadata, false);
- client_initial_metadata_ = std::move(metadata);
-}
-
-ArenaPromise
-ServerPromiseBasedCall::MakeTopOfServerCallPromise(
- CallArgs call_args, grpc_completion_queue* cq,
- absl::FunctionRef publish) {
- SetCompletionQueue(cq);
- call_args.polling_entity->Set(
- grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)));
- server_to_client_messages_ = call_args.server_to_client_messages;
- client_to_server_messages_ = call_args.client_to_server_messages;
- server_initial_metadata_ = call_args.server_initial_metadata;
- absl::optional deadline =
- client_initial_metadata_->get(GrpcTimeoutMetadata());
- if (deadline.has_value()) {
- set_send_deadline(*deadline);
- UpdateDeadline(*deadline);
- }
- ProcessIncomingInitialMetadata(*client_initial_metadata_);
- ExternalRef();
- publish(c_ptr());
- return Seq(server_to_client_messages_->AwaitClosed(),
- send_trailing_metadata_.Wait());
-}
-
-///////////////////////////////////////////////////////////////////////////////
-// CallSpine based Server Call
-
-class ServerCallSpine final : public PipeBasedCallSpine,
- public ServerCallContext,
- public BasicPromiseBasedCall {
+class ServerCall final : public Call, public DualRefCounted {
public:
- ServerCallSpine(ClientMetadataHandle client_initial_metadata,
- ServerInterface* server, Channel* channel, Arena* arena);
-
- // CallSpineInterface
- Pipe& client_initial_metadata() override {
- return client_initial_metadata_;
- }
- Pipe& server_initial_metadata() override {
- return server_initial_metadata_;
- }
- Pipe& client_to_server_messages() override {
- return client_to_server_messages_;
- }
- Pipe& server_to_client_messages() override {
- return server_to_client_messages_;
+ ServerCall(ClientMetadataHandle client_initial_metadata,
+ CallHandler call_handler, ServerInterface* server,
+ grpc_completion_queue* cq)
+ : Call(false,
+ client_initial_metadata->get(GrpcTimeoutMetadata())
+ .value_or(Timestamp::InfFuture()),
+ call_handler.event_engine()),
+ call_handler_(std::move(call_handler)),
+ client_initial_metadata_stored_(std::move(client_initial_metadata)),
+ cq_(cq),
+ server_(server) {
+ call_handler_.legacy_context()[GRPC_CONTEXT_CALL].value =
+ static_cast(this);
+ global_stats().IncrementServerCallsCreated();
}
- Latch& cancel_latch() override { return cancel_latch_; }
- Latch& was_cancelled_latch() override { return was_cancelled_latch_; }
- Party& party() override { return *this; }
- Arena* arena() override { return BasicPromiseBasedCall::arena(); }
- void IncrementRefCount() override { InternalRef("CallSpine"); }
- void Unref() override { InternalUnref("CallSpine"); }
- // PromiseBasedCall
- void OrphanCall() override {
- ResetDeadline();
- CancelWithError(absl::CancelledError());
- }
void CancelWithError(grpc_error_handle error) override {
- SpawnInfallible("CancelWithError", [this, error = std::move(error)] {
- auto status = ServerMetadataFromStatus(error);
- status->Set(GrpcCallWasCancelled(), true);
- PushServerTrailingMetadata(std::move(status));
- return Empty{};
- });
+ call_handler_.SpawnInfallible(
+ "CancelWithError",
+ [self = WeakRefAsSubclass(), error = std::move(error)] {
+ auto status = ServerMetadataFromStatus(error);
+ status->Set(GrpcCallWasCancelled(), true);
+ self->call_handler_.PushServerTrailingMetadata(std::move(status));
+ return Empty{};
+ });
}
bool is_trailers_only() const override {
Crash("is_trailers_only not implemented for server calls");
@@ -3609,102 +3197,78 @@ class ServerCallSpine final : public PipeBasedCallSpine,
grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
bool is_notify_tag_closure) override;
- bool Completed() final { Crash("unimplemented"); }
- bool failed_before_recv_message() const final { Crash("unimplemented"); }
+ Arena* arena() override { return call_handler_.arena(); }
- ServerCallContext* server_call_context() override { return this; }
- const void* server_stream_data() override { Crash("unimplemented"); }
- void PublishInitialMetadata(
- ClientMetadataHandle metadata,
- grpc_metadata_array* publish_initial_metadata) override;
- ArenaPromise MakeTopOfServerCallPromise(
- CallArgs, grpc_completion_queue*,
- absl::FunctionRef) override {
- Crash("unimplemented");
+ grpc_event_engine::experimental::EventEngine* event_engine() const override {
+ return call_handler_.event_engine();
}
- void V2HackToStartCallWithoutACallFilterStack() override {}
+ void ExternalRef() override { Ref().release(); }
+ void ExternalUnref() override { Unref(); }
+ void InternalRef(const char*) override { WeakRef().release(); }
+ void InternalUnref(const char*) override { WeakUnref(); }
- ClientMetadata& UnprocessedClientInitialMetadata() override {
- Crash("not for v2");
+ void Orphaned() override {
+ // TODO(ctiller): only when we're not already finished
+ CancelWithError(absl::CancelledError());
}
- bool RunParty() override {
- ScopedContext ctx(this);
- return Party::RunParty();
+ void ContextSet(grpc_context_index elem, void* value,
+ void (*destroy)(void*)) override {
+ call_handler_.legacy_context()[elem] =
+ grpc_call_context_element{value, destroy};
+ }
+
+ void* ContextGet(grpc_context_index elem) const override {
+ return call_handler_.legacy_context()[elem].value;
+ }
+
+ void SetCompletionQueue(grpc_completion_queue*) override {
+ Crash("unimplemented");
+ }
+
+ grpc_compression_options compression_options() override {
+ return server_->compression_options();
}
+ grpc_call_stack* call_stack() override { return nullptr; }
+
+ char* GetPeer() override {
+ Slice peer_slice = GetPeerString();
+ if (!peer_slice.empty()) {
+ absl::string_view peer_string_view = peer_slice.as_string_view();
+ char* peer_string =
+ static_cast(gpr_malloc(peer_string_view.size() + 1));
+ memcpy(peer_string, peer_string_view.data(), peer_string_view.size());
+ peer_string[peer_string_view.size()] = '\0';
+ return peer_string;
+ }
+ return gpr_strdup("unknown");
+ }
+
+ bool Completed() final { Crash("unimplemented"); }
+ bool failed_before_recv_message() const final { Crash("unimplemented"); }
+
private:
void CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
bool is_notify_tag_closure);
- StatusFlag FinishRecvMessage(NextResult result);
+ StatusFlag FinishRecvMessage(
+ ValueOrFailure> result);
- std::string DebugTag() const override {
- return absl::StrFormat("SERVER_CALL_SPINE[%p]: ", this);
- }
-
- // Initial metadata from client to server
- Pipe client_initial_metadata_;
- // Initial metadata from server to client
- Pipe server_initial_metadata_;
- // Messages travelling from the application to the transport.
- Pipe client_to_server_messages_;
- // Messages travelling from the transport to the application.
- Pipe server_to_client_messages_;
- // Latch that can be set to terminate the call
- Latch cancel_latch_;
- Latch was_cancelled_latch_;
+ std::string DebugTag() { return absl::StrFormat("SERVER_CALL[%p]: ", this); }
+
+ CallHandler call_handler_;
grpc_byte_buffer** recv_message_ = nullptr;
ClientMetadataHandle client_initial_metadata_stored_;
+ grpc_completion_queue* const cq_;
+ ServerInterface* const server_;
};
-ServerCallSpine::ServerCallSpine(ClientMetadataHandle client_initial_metadata,
- ServerInterface* server, Channel* channel,
- Arena* arena)
- : BasicPromiseBasedCall(arena, 0, 1,
- [channel, server]() -> grpc_call_create_args {
- grpc_call_create_args args;
- args.channel = channel->Ref();
- args.server = server;
- args.parent = nullptr;
- args.propagation_mask = 0;
- args.cq = nullptr;
- args.pollset_set_alternative = nullptr;
- args.server_transport_data =
- &args; // Arbitrary non-null pointer
- args.send_deadline = Timestamp::InfFuture();
- return args;
- }()),
- client_initial_metadata_(arena),
- server_initial_metadata_(arena),
- client_to_server_messages_(arena),
- server_to_client_messages_(arena) {
- global_stats().IncrementServerCallsCreated();
- ScopedContext ctx(this);
- channel->channel_stack()->InitServerCallSpine(this);
- SpawnGuarded("push_client_initial_metadata",
- [this, md = std::move(client_initial_metadata)]() mutable {
- return Map(client_initial_metadata_.sender.Push(std::move(md)),
- [](bool r) { return StatusFlag(r); });
- });
-}
-
-void ServerCallSpine::PublishInitialMetadata(
- ClientMetadataHandle metadata,
- grpc_metadata_array* publish_initial_metadata) {
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO, "%s[call] PublishInitialMetadata: %s", DebugTag().c_str(),
- metadata->DebugString().c_str());
- }
- PublishMetadataArray(metadata.get(), publish_initial_metadata, false);
- client_initial_metadata_stored_ = std::move(metadata);
-}
-
-grpc_call_error ServerCallSpine::StartBatch(const grpc_op* ops, size_t nops,
- void* notify_tag,
- bool is_notify_tag_closure) {
+grpc_call_error ServerCall::StartBatch(const grpc_op* ops, size_t nops,
+ void* notify_tag,
+ bool is_notify_tag_closure) {
if (nops == 0) {
- EndOpImmediately(cq(), notify_tag, is_notify_tag_closure);
+ EndOpImmediately(cq_, notify_tag, is_notify_tag_closure);
return GRPC_CALL_OK;
}
const grpc_call_error validation_result = ValidateServerBatch(ops, nops);
@@ -3716,63 +3280,86 @@ grpc_call_error ServerCallSpine::StartBatch(const grpc_op* ops, size_t nops,
}
namespace {
-template
+template
class MaybeOpImpl {
public:
- using SetupResult = decltype(std::declval()(grpc_op()));
using PromiseFactory = promise_detail::OncePromiseFactory;
using Promise = typename PromiseFactory::Promise;
- struct Dismissed {};
- using State = absl::variant;
+ static_assert(!std::is_same::value,
+ "PromiseFactory must return a promise");
+
+ MaybeOpImpl() : state_(State::kDismissed) {}
+ explicit MaybeOpImpl(SetupResult result) : state_(State::kPromiseFactory) {
+ Construct(&promise_factory_, std::move(result));
+ }
- // op_ is garbage but shouldn't be uninitialized
- MaybeOpImpl() : state_(Dismissed{}), op_(GRPC_OP_RECV_STATUS_ON_CLIENT) {}
- MaybeOpImpl(SetupResult result, grpc_op_type op)
- : state_(PromiseFactory(std::move(result))), op_(op) {}
+ ~MaybeOpImpl() {
+ switch (state_) {
+ case State::kDismissed:
+ break;
+ case State::kPromiseFactory:
+ Destruct(&promise_factory_);
+ break;
+ case State::kPromise:
+ Destruct(&promise_);
+ break;
+ }
+ }
MaybeOpImpl(const MaybeOpImpl&) = delete;
MaybeOpImpl& operator=(const MaybeOpImpl&) = delete;
- MaybeOpImpl(MaybeOpImpl&& other) noexcept
- : state_(MoveState(other.state_)), op_(other.op_) {}
- MaybeOpImpl& operator=(MaybeOpImpl&& other) noexcept {
- op_ = other.op_;
- if (absl::holds_alternative(state_)) {
- state_.template emplace();
- return *this;
+ MaybeOpImpl(MaybeOpImpl&& other) noexcept : state_(other.state_) {
+ switch (state_) {
+ case State::kDismissed:
+ break;
+ case State::kPromiseFactory:
+ Construct(&promise_factory_, std::move(other.promise_factory_));
+ break;
+ case State::kPromise:
+ Construct(&promise_, std::move(other.promise_));
+ break;
}
- // Can't move after first poll => Promise is not an option
- state_.template emplace(
- std::move(absl::get(other.state_)));
- return *this;
}
+ MaybeOpImpl& operator=(MaybeOpImpl&& other) noexcept = delete;
Poll operator()() {
- if (absl::holds_alternative(state_)) return Success{};
- if (absl::holds_alternative(state_)) {
- auto& factory = absl::get(state_);
- auto promise = factory.Make();
- state_.template emplace(std::move(promise));
- }
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO, "%sBeginPoll %s",
- Activity::current()->DebugTag().c_str(), OpName(op_).c_str());
- }
- auto& promise = absl::get(state_);
- auto r = poll_cast(promise());
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_INFO, "%sEndPoll %s --> %s",
- Activity::current()->DebugTag().c_str(), OpName(op_).c_str(),
+ switch (state_) {
+ case State::kDismissed:
+ return Success{};
+ case State::kPromiseFactory: {
+ auto promise = promise_factory_.Make();
+ Destruct(&promise_factory_);
+ Construct(&promise_, std::move(promise));
+ state_ = State::kPromise;
+ }
+ ABSL_FALLTHROUGH_INTENDED;
+ case State::kPromise: {
+ if (grpc_call_trace.enabled()) {
+ gpr_log(GPR_INFO, "%sBeginPoll %s",
+ Activity::current()->DebugTag().c_str(), OpName());
+ }
+ auto r = poll_cast(promise_());
+ if (grpc_call_trace.enabled()) {
+ gpr_log(
+ GPR_INFO, "%sEndPoll %s --> %s",
+ Activity::current()->DebugTag().c_str(), OpName(),
r.pending() ? "PENDING" : (r.value().ok() ? "OK" : "FAILURE"));
+ }
+ return r;
+ }
}
- return r;
+ GPR_UNREACHABLE_CODE(return Pending{});
}
private:
- GPR_NO_UNIQUE_ADDRESS State state_;
- GPR_NO_UNIQUE_ADDRESS grpc_op_type op_;
+ enum class State {
+ kDismissed,
+ kPromiseFactory,
+ kPromise,
+ };
- static std::string OpName(grpc_op_type op) {
- switch (op) {
+ static const char* OpName() {
+ switch (kOp) {
case GRPC_OP_SEND_INITIAL_METADATA:
return "SendInitialMetadata";
case GRPC_OP_SEND_MESSAGE:
@@ -3790,30 +3377,34 @@ class MaybeOpImpl {
case GRPC_OP_RECV_STATUS_ON_CLIENT:
return "RecvStatusOnClient";
}
- return absl::StrCat("UnknownOp(", op, ")");
+ Crash("Unreachable");
}
- static State MoveState(State& state) {
- if (absl::holds_alternative(state)) return Dismissed{};
- // Can't move after first poll => Promise is not an option
- return std::move(absl::get(state));
- }
+ // gcc-12 has problems with this being a variant
+ GPR_NO_UNIQUE_ADDRESS State state_;
+ union {
+ PromiseFactory promise_factory_;
+ Promise promise_;
+ };
};
-// MaybeOp captures a fairly complicated dance we need to do for the batch API.
-// We first check if an op is included or not, and if it is, we run the setup
-// function in the context of the API call (NOT in the call party).
-// This setup function returns a promise factory which we'll then run *in* the
+// MaybeOp captures a fairly complicated dance we need to do for the batch
+// API. We first check if an op is included or not, and if it is, we run the
+// setup function in the context of the API call (NOT in the call party). This
+// setup function returns a promise factory which we'll then run *in* the
// party to do initial setup, and have it return the promise that we'll
// ultimately poll on til completion.
// Once we express our surface API in terms of core internal types this whole
// dance will go away.
-template
-auto MaybeOp(const grpc_op* ops, uint8_t idx, SetupFn setup) {
- if (idx == 255) {
- return MaybeOpImpl();
+template
+auto MaybeOp(const grpc_op* ops, const std::array& idxs,
+ SetupFn setup) {
+ using SetupResult = decltype(std::declval()(grpc_op()));
+ if (idxs[op_type] == 255) {
+ return MaybeOpImpl();
} else {
- return MaybeOpImpl(setup(ops[idx]), ops[idx].op);
+ auto r = setup(ops[idxs[op_type]]);
+ return MaybeOpImpl(std::move(r));
}
}
@@ -3851,63 +3442,61 @@ PollBatchLogger LogPollBatch(void* tag, F f) {
}
} // namespace
-StatusFlag ServerCallSpine::FinishRecvMessage(
- NextResult result) {
- if (result.has_value()) {
- MessageHandle& message = *result;
- NoteLastMessageFlags(message->flags());
- if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
- (incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) {
- *recv_message_ = grpc_raw_compressed_byte_buffer_create(
- nullptr, 0, incoming_compression_algorithm());
- } else {
- *recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0);
- }
- grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(),
- &(*recv_message_)->data.raw.slice_buffer);
+StatusFlag ServerCall::FinishRecvMessage(
+ ValueOrFailure> result) {
+ if (!result.ok()) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO,
"%s[call] RecvMessage: outstanding_recv "
- "finishes: received %" PRIdPTR " byte message",
- DebugTag().c_str(),
- (*recv_message_)->data.raw.slice_buffer.length);
+ "finishes: received end-of-stream with error",
+ DebugTag().c_str());
}
+ *recv_message_ = nullptr;
recv_message_ = nullptr;
- return Success{};
+ return Failure{};
}
- if (result.cancelled()) {
+ if (!result->has_value()) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO,
"%s[call] RecvMessage: outstanding_recv "
- "finishes: received end-of-stream with error",
+ "finishes: received end-of-stream",
DebugTag().c_str());
}
*recv_message_ = nullptr;
recv_message_ = nullptr;
- return Failure{};
+ return Success{};
}
+ MessageHandle& message = **result;
+ NoteLastMessageFlags(message->flags());
+ if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ (incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) {
+ *recv_message_ = grpc_raw_compressed_byte_buffer_create(
+ nullptr, 0, incoming_compression_algorithm());
+ } else {
+ *recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0);
+ }
+ grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(),
+ &(*recv_message_)->data.raw.slice_buffer);
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO,
"%s[call] RecvMessage: outstanding_recv "
- "finishes: received end-of-stream",
- DebugTag().c_str());
+ "finishes: received %" PRIdPTR " byte message",
+ DebugTag().c_str(), (*recv_message_)->data.raw.slice_buffer.length);
}
- *recv_message_ = nullptr;
recv_message_ = nullptr;
return Success{};
}
-void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
- void* notify_tag,
- bool is_notify_tag_closure) {
+void ServerCall::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
+ bool is_notify_tag_closure) {
std::array got_ops{255, 255, 255, 255, 255, 255, 255, 255};
for (size_t op_idx = 0; op_idx < nops; op_idx++) {
const grpc_op& op = ops[op_idx];
got_ops[op.op] = op_idx;
}
- if (!is_notify_tag_closure) grpc_cq_begin_op(cq(), notify_tag);
- auto send_initial_metadata = MaybeOp(
- ops, got_ops[GRPC_OP_SEND_INITIAL_METADATA], [this](const grpc_op& op) {
+ if (!is_notify_tag_closure) grpc_cq_begin_op(cq_, notify_tag);
+ auto send_initial_metadata = MaybeOp(
+ ops, got_ops, [this](const grpc_op& op) {
auto metadata = arena()->MakePooled();
PrepareOutgoingInitialMetadata(op, *metadata);
CToMetadata(op.data.send_initial_metadata.metadata,
@@ -3917,27 +3506,22 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
DebugTag().c_str());
}
return [this, metadata = std::move(metadata)]() mutable {
- return Map(server_initial_metadata_.sender.Push(std::move(metadata)),
- [this](bool r) {
- server_initial_metadata_.sender.Close();
- return StatusFlag(r);
- });
+ return call_handler_.PushServerInitialMetadata(std::move(metadata));
};
});
auto send_message =
- MaybeOp(ops, got_ops[GRPC_OP_SEND_MESSAGE], [this](const grpc_op& op) {
+ MaybeOp(ops, got_ops, [this](const grpc_op& op) {
SliceBuffer send;
grpc_slice_buffer_swap(
&op.data.send_message.send_message->data.raw.slice_buffer,
send.c_slice_buffer());
auto msg = arena()->MakePooled(std::move(send), op.flags);
return [this, msg = std::move(msg)]() mutable {
- return Map(server_to_client_messages_.sender.Push(std::move(msg)),
- [](bool r) { return StatusFlag(r); });
+ return call_handler_.PushMessage(std::move(msg));
};
});
- auto send_trailing_metadata = MaybeOp(
- ops, got_ops[GRPC_OP_SEND_STATUS_FROM_SERVER], [this](const grpc_op& op) {
+ auto send_trailing_metadata = MaybeOp(
+ ops, got_ops, [this](const grpc_op& op) {
auto metadata = arena()->MakePooled();
CToMetadata(op.data.send_status_from_server.trailing_metadata,
op.data.send_status_from_server.trailing_metadata_count,
@@ -3958,18 +3542,18 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
return [this,
metadata = std::move(metadata)]() mutable -> Poll {
CHECK(metadata != nullptr);
- PushServerTrailingMetadata(std::move(metadata));
+ call_handler_.PushServerTrailingMetadata(std::move(metadata));
return Success{};
};
};
});
auto recv_message =
- MaybeOp(ops, got_ops[GRPC_OP_RECV_MESSAGE], [this](const grpc_op& op) {
+ MaybeOp(ops, got_ops, [this](const grpc_op& op) {
CHECK_EQ(recv_message_, nullptr);
recv_message_ = op.data.recv_message.recv_message;
return [this]() mutable {
- return Map(client_to_server_messages_.receiver.Next(),
- [this](NextResult msg) {
+ return Map(call_handler_.PullMessage(),
+ [this](ValueOrFailure> msg) {
return FinishRecvMessage(std::move(msg));
});
};
@@ -3980,10 +3564,10 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
std::move(send_trailing_metadata)),
std::move(recv_message));
if (got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER] != 255) {
- auto recv_trailing_metadata = MaybeOp(
- ops, got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER], [this](const grpc_op& op) {
+ auto recv_trailing_metadata = MaybeOp(
+ ops, got_ops, [this](const grpc_op& op) {
return [this, cancelled = op.data.recv_close_on_server.cancelled]() {
- return Map(WasCancelled(),
+ return Map(call_handler_.WasCancelled(),
[cancelled, this](bool result) -> Success {
ResetDeadline();
*cancelled = result ? 1 : 0;
@@ -3991,7 +3575,7 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
});
};
});
- SpawnInfallible(
+ call_handler_.SpawnInfallible(
"final-batch",
[primary_ops = std::move(primary_ops),
recv_trailing_metadata = std::move(recv_trailing_metadata),
@@ -4001,37 +3585,38 @@ void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
Seq(std::move(primary_ops), std::move(recv_trailing_metadata),
[is_notify_tag_closure, notify_tag, this](StatusFlag) {
return WaitForCqEndOp(is_notify_tag_closure, notify_tag,
- absl::OkStatus(), cq());
+ absl::OkStatus(), cq_);
}));
});
} else {
- SpawnInfallible("batch", [primary_ops = std::move(primary_ops),
- is_notify_tag_closure, notify_tag,
- this]() mutable {
- return LogPollBatch(
- notify_tag,
- Seq(std::move(primary_ops),
- [is_notify_tag_closure, notify_tag, this](StatusFlag r) {
+ call_handler_.SpawnInfallible(
+ "batch", [primary_ops = std::move(primary_ops), is_notify_tag_closure,
+ notify_tag, this]() mutable {
+ return LogPollBatch(
+ notify_tag,
+ Seq(std::move(primary_ops), [is_notify_tag_closure, notify_tag,
+ this](StatusFlag r) {
return WaitForCqEndOp(is_notify_tag_closure, notify_tag,
- StatusCast(r), cq());
+ StatusCast(r), cq_);
}));
- });
+ });
}
}
-RefCountedPtr MakeServerCall(
- ClientMetadataHandle client_initial_metadata, ServerInterface* server,
- Channel* channel, Arena* arena) {
- return RefCountedPtr(arena->New(
- std::move(client_initial_metadata), server, channel, arena));
+grpc_call* MakeServerCall(CallHandler call_handler,
+ ClientMetadataHandle client_initial_metadata,
+ ServerInterface* server, grpc_completion_queue* cq,
+ grpc_metadata_array* publish_initial_metadata) {
+ PublishMetadataArray(client_initial_metadata.get(), publish_initial_metadata,
+ false);
+ // TODO(ctiller): ideally we'd put this in the arena with the CallHandler,
+ // but there's an ownership problem: CallHandler owns the arena, and so would
+ // get destroyed before the base class Call destructor runs, leading to
+ // UB/crash. Investigate another path.
+ return (new ServerCall(std::move(client_initial_metadata),
+ std::move(call_handler), server, cq))
+ ->c_ptr();
}
-#else
-RefCountedPtr MakeServerCall(ClientMetadataHandle,
- ServerInterface*, Channel*,
- Arena*) {
- Crash("not implemented");
-}
-#endif
} // namespace grpc_core
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index 9fb12f279e4..9388c142f67 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -82,7 +82,7 @@ class Call : public CppImplOf,
public grpc_event_engine::experimental::EventEngine::
Closure /* for deadlines */ {
public:
- Arena* arena() { return arena_; }
+ virtual Arena* arena() = 0;
bool is_client() const { return is_client_; }
virtual void ContextSet(grpc_context_index elem, void* value,
@@ -92,7 +92,7 @@ class Call : public CppImplOf,
void CancelWithStatus(grpc_status_code status, const char* description);
virtual void CancelWithError(grpc_error_handle error) = 0;
virtual void SetCompletionQueue(grpc_completion_queue* cq) = 0;
- char* GetPeer();
+ virtual char* GetPeer() = 0;
virtual grpc_call_error StartBatch(const grpc_op* ops, size_t nops,
void* notify_tag,
bool is_notify_tag_closure) = 0;
@@ -157,25 +157,15 @@ class Call : public CppImplOf,
Call* sibling_prev = nullptr;
};
- Call(Arena* arena, bool is_client, Timestamp send_deadline,
- RefCountedPtr channel)
- : channel_(std::move(channel)),
- arena_(arena),
- send_deadline_(send_deadline),
- is_client_(is_client) {
- DCHECK_NE(arena_, nullptr);
- DCHECK(channel_ != nullptr);
- }
+ Call(bool is_client, Timestamp send_deadline,
+ grpc_event_engine::experimental::EventEngine* event_engine)
+ : send_deadline_(send_deadline),
+ is_client_(is_client),
+ event_engine_(event_engine) {}
~Call() override = default;
- void DeleteThis();
-
ParentCall* GetOrCreateParentCall();
ParentCall* parent_call();
- Channel* channel() const {
- DCHECK(channel_ != nullptr);
- return channel_.get();
- }
absl::Status InitParent(Call* parent, uint32_t propagation_mask);
void PublishToParent(Call* parent);
@@ -221,9 +211,9 @@ class Call : public CppImplOf,
gpr_cycle_counter start_time() const { return start_time_; }
+ virtual grpc_compression_options compression_options() = 0;
+
private:
- RefCountedPtr channel_;
- Arena* const arena_;
std::atomic parent_call_{nullptr};
ChildCall* child_ = nullptr;
Timestamp send_deadline_;
@@ -247,34 +237,13 @@ class Call : public CppImplOf,
Timestamp deadline_ ABSL_GUARDED_BY(deadline_mu_) = Timestamp::InfFuture();
grpc_event_engine::experimental::EventEngine::TaskHandle ABSL_GUARDED_BY(
deadline_mu_) deadline_task_;
+ grpc_event_engine::experimental::EventEngine* const event_engine_;
gpr_cycle_counter start_time_ = gpr_get_cycle_counter();
};
class BasicPromiseBasedCall;
class ServerPromiseBasedCall;
-class ServerCallContext {
- public:
- virtual void PublishInitialMetadata(
- ClientMetadataHandle metadata,
- grpc_metadata_array* publish_initial_metadata) = 0;
-
- // Construct the top of the server call promise for the v2 filter stack.
- // TODO(ctiller): delete when v3 is available.
- virtual ArenaPromise MakeTopOfServerCallPromise(
- CallArgs call_args, grpc_completion_queue* cq,
- absl::FunctionRef publish) = 0;
-
- // Server stream data as supplied by the transport (so we can link the
- // transport stream up with the call again).
- // TODO(ctiller): legacy API - once we move transports to promises we'll
- // create the promise directly and not need to pass around this token.
- virtual const void* server_stream_data() = 0;
-
- protected:
- ~ServerCallContext() = default;
-};
-
// TODO(ctiller): move more call things into this type
class CallContext {
public:
@@ -300,8 +269,6 @@ class CallContext {
gpr_atm* peer_string_atm_ptr();
gpr_cycle_counter call_start_time() { return start_time_; }
- ServerCallContext* server_call_context();
-
void set_traced(bool traced) { traced_ = traced; }
bool traced() const { return traced_; }
@@ -329,9 +296,10 @@ template <>
struct ContextType {};
// TODO(ctiller): remove once call-v3 finalized
-RefCountedPtr MakeServerCall(
- ClientMetadataHandle client_initial_metadata, ServerInterface* server,
- Channel* channel, Arena* arena);
+grpc_call* MakeServerCall(CallHandler call_handler,
+ ClientMetadataHandle client_initial_metadata,
+ ServerInterface* server, grpc_completion_queue* cq,
+ grpc_metadata_array* publish_initial_metadata);
} // namespace grpc_core
diff --git a/src/core/lib/surface/channel_init.cc b/src/core/lib/surface/channel_init.cc
index b94b189ffea..698a57b195d 100644
--- a/src/core/lib/surface/channel_init.cc
+++ b/src/core/lib/surface/channel_init.cc
@@ -104,9 +104,9 @@ ChannelInit::FilterRegistration::ExcludeFromMinimalStack() {
ChannelInit::FilterRegistration& ChannelInit::Builder::RegisterFilter(
grpc_channel_stack_type type, const grpc_channel_filter* filter,
- const ChannelFilterVtable* vtable, SourceLocation registration_source) {
+ FilterAdder filter_adder, SourceLocation registration_source) {
filters_[type].emplace_back(std::make_unique(
- filter, vtable, registration_source));
+ filter, filter_adder, registration_source));
return *filters_[type].back();
}
@@ -223,9 +223,10 @@ ChannelInit::StackConfig ChannelInit::BuildStackConfig(
while (!dependencies.empty()) {
auto filter = take_ready_dependency();
auto* registration = filter_to_registration[filter];
- filters.emplace_back(
- filter, registration->vtable_, std::move(registration->predicates_),
- registration->skip_v3_, registration->registration_source_);
+ filters.emplace_back(filter, registration->filter_adder_,
+ std::move(registration->predicates_),
+ registration->skip_v3_,
+ registration->registration_source_);
for (auto& p : dependencies) {
p.second.erase(filter);
}
@@ -406,78 +407,21 @@ bool ChannelInit::CreateStack(ChannelStackBuilder* builder) const {
return true;
}
-absl::StatusOr ChannelInit::CreateStackSegment(
- grpc_channel_stack_type type, const ChannelArgs& args) const {
+void ChannelInit::AddToInterceptionChainBuilder(
+ grpc_channel_stack_type type, InterceptionChainBuilder& builder) const {
const auto& stack_config = stack_configs_[type];
- std::vector filters;
- size_t channel_data_size = 0;
- size_t channel_data_alignment = 0;
// Based on predicates build a list of filters to include in this segment.
for (const auto& filter : stack_config.filters) {
if (filter.skip_v3) continue;
- if (!filter.CheckPredicates(args)) continue;
- if (filter.vtable == nullptr) {
- return absl::InvalidArgumentError(
+ if (!filter.CheckPredicates(builder.channel_args())) continue;
+ if (filter.filter_adder == nullptr) {
+ builder.Fail(absl::InvalidArgumentError(
absl::StrCat("Filter ", NameFromChannelFilter(filter.filter),
- " has no v3-callstack vtable"));
+ " has no v3-callstack vtable")));
+ return;
}
- channel_data_alignment =
- std::max(channel_data_alignment, filter.vtable->alignment);
- if (channel_data_size % filter.vtable->alignment != 0) {
- channel_data_size += filter.vtable->alignment -
- (channel_data_size % filter.vtable->alignment);
- }
- filters.push_back({channel_data_size, filter.vtable});
- channel_data_size += filter.vtable->size;
- }
- // Shortcut for empty segments.
- if (filters.empty()) return StackSegment();
- // Allocate memory for the channel data, initialize channel filters into it.
- uint8_t* p = static_cast(
- gpr_malloc_aligned(channel_data_size, channel_data_alignment));
- for (size_t i = 0; i < filters.size(); i++) {
- auto r = filters[i].vtable->init(p + filters[i].offset, args);
- if (!r.ok()) {
- for (size_t j = 0; j < i; j++) {
- filters[j].vtable->destroy(p + filters[j].offset);
- }
- gpr_free_aligned(p);
- return r;
- }
- }
- return StackSegment(std::move(filters), p);
-}
-
-///////////////////////////////////////////////////////////////////////////////
-// ChannelInit::StackSegment
-
-ChannelInit::StackSegment::StackSegment(std::vector filters,
- uint8_t* channel_data)
- : data_(MakeRefCounted(std::move(filters), channel_data)) {}
-
-void ChannelInit::StackSegment::AddToCallFilterStack(
- CallFilters::StackBuilder& builder) {
- if (data_ == nullptr) return;
- data_->AddToCallFilterStack(builder);
- builder.AddOwnedObject(data_);
-};
-
-ChannelInit::StackSegment::ChannelData::ChannelData(
- std::vector filters, uint8_t* channel_data)
- : filters_(std::move(filters)), channel_data_(channel_data) {}
-
-void ChannelInit::StackSegment::ChannelData::AddToCallFilterStack(
- CallFilters::StackBuilder& builder) {
- for (const auto& filter : filters_) {
- filter.vtable->add_to_stack_builder(channel_data_ + filter.offset, builder);
- }
-}
-
-ChannelInit::StackSegment::ChannelData::~ChannelData() {
- for (const auto& filter : filters_) {
- filter.vtable->destroy(channel_data_ + filter.offset);
+ filter.filter_adder(builder);
}
- gpr_free_aligned(channel_data_);
}
} // namespace grpc_core
diff --git a/src/core/lib/surface/channel_init.h b/src/core/lib/surface/channel_init.h
index fcfef582994..c5067394014 100644
--- a/src/core/lib/surface/channel_init.h
+++ b/src/core/lib/surface/channel_init.h
@@ -38,6 +38,7 @@
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/transport/call_filters.h"
+#include "src/core/lib/transport/interception_chain.h"
/// This module provides a way for plugins (and the grpc core library itself)
/// to register mutators for channel stacks.
@@ -68,6 +69,8 @@ class ChannelInit {
using InclusionPredicate = absl::AnyInvocable;
// Post processor for the channel stack - applied in PostProcessorSlot order
using PostProcessor = absl::AnyInvocable;
+ // Function that can be called to add a filter to a stack builder
+ using FilterAdder = void (*)(InterceptionChainBuilder&);
// Post processing slots - up to one PostProcessor per slot can be registered
// They run after filters registered are added to the channel stack builder,
// but before Build is called - allowing ad-hoc mutation to the channel stack.
@@ -77,25 +80,15 @@ class ChannelInit {
kCount
};
- // Vtable-like data structure for channel data initialization
- struct ChannelFilterVtable {
- size_t size;
- size_t alignment;
- absl::Status (*init)(void* data, const ChannelArgs& args);
- void (*destroy)(void* data);
- void (*add_to_stack_builder)(void* data,
- CallFilters::StackBuilder& builder);
- };
-
class FilterRegistration {
public:
// TODO(ctiller): Remove grpc_channel_filter* arg when that can be
// deprecated (once filter stack is removed).
explicit FilterRegistration(const grpc_channel_filter* filter,
- const ChannelFilterVtable* vtable,
+ FilterAdder filter_adder,
SourceLocation registration_source)
: filter_(filter),
- vtable_(vtable),
+ filter_adder_(filter_adder),
registration_source_(registration_source) {}
FilterRegistration(const FilterRegistration&) = delete;
FilterRegistration& operator=(const FilterRegistration&) = delete;
@@ -170,7 +163,7 @@ class ChannelInit {
private:
friend class ChannelInit;
const grpc_channel_filter* const filter_;
- const ChannelFilterVtable* const vtable_;
+ const FilterAdder filter_adder_;
std::vector after_;
std::vector before_;
std::vector predicates_;
@@ -188,16 +181,17 @@ class ChannelInit {
// properties of the filter being registered.
// TODO(ctiller): remove in favor of the version that does not mention
// grpc_channel_filter
- FilterRegistration& RegisterFilter(
- grpc_channel_stack_type type, const grpc_channel_filter* filter,
- const ChannelFilterVtable* vtable = nullptr,
- SourceLocation registration_source = {});
+ FilterRegistration& RegisterFilter(grpc_channel_stack_type type,
+ const grpc_channel_filter* filter,
+ FilterAdder filter_adder = nullptr,
+ SourceLocation registration_source = {});
template
FilterRegistration& RegisterFilter(
grpc_channel_stack_type type, SourceLocation registration_source = {}) {
- return RegisterFilter(type, &Filter::kFilter,
- VtableForType::vtable(),
- registration_source);
+ return RegisterFilter(
+ type, &Filter::kFilter,
+ [](InterceptionChainBuilder& builder) { builder.Add(); },
+ registration_source);
}
// Filter does not participate in v3
@@ -232,58 +226,13 @@ class ChannelInit {
[static_cast(PostProcessorSlot::kCount)];
};
- // A set of channel filters that can be added to a call stack.
- // TODO(ctiller): move this out so it can be used independently of
- // the global registration mechanisms.
- class StackSegment final {
- public:
- // Registration of one channel filter in the stack.
- struct ChannelFilter {
- size_t offset;
- const ChannelFilterVtable* vtable;
- };
-
- StackSegment() = default;
- explicit StackSegment(std::vector filters,
- uint8_t* channel_data);
- StackSegment(const StackSegment& other) = delete;
- StackSegment& operator=(const StackSegment& other) = delete;
- StackSegment(StackSegment&& other) noexcept = default;
- StackSegment& operator=(StackSegment&& other) = default;
-
- // Add this segment to a call filter stack builder
- void AddToCallFilterStack(CallFilters::StackBuilder& builder);
-
- private:
- // Combined channel data for the stack
- class ChannelData : public RefCounted {
- public:
- explicit ChannelData(std::vector filters,
- uint8_t* channel_data);
- ~ChannelData() override;
-
- void AddToCallFilterStack(CallFilters::StackBuilder& builder);
-
- private:
- std::vector filters_;
- uint8_t* channel_data_;
- };
-
- RefCountedPtr data_;
- };
-
/// Construct a channel stack of some sort: see channel_stack.h for details
/// \a builder is the channel stack builder to build into.
GRPC_MUST_USE_RESULT
bool CreateStack(ChannelStackBuilder* builder) const;
- // Create a segment of a channel stack.
- // Terminators and post processors are not included in this construction:
- // terminators are a legacy filter-stack concept, and post processors
- // need to migrate to other mechanisms.
- // TODO(ctiller): figure out other mechanisms.
- absl::StatusOr CreateStackSegment(
- grpc_channel_stack_type type, const ChannelArgs& args) const;
+ void AddToInterceptionChainBuilder(grpc_channel_stack_type type,
+ InterceptionChainBuilder& builder) const;
private:
// The type of object returned by a filter's Create method.
@@ -292,16 +241,16 @@ class ChannelInit {
typename decltype(T::Create(ChannelArgs(), {}))::value_type;
struct Filter {
- Filter(const grpc_channel_filter* filter, const ChannelFilterVtable* vtable,
+ Filter(const grpc_channel_filter* filter, FilterAdder filter_adder,
std::vector predicates, bool skip_v3,
SourceLocation registration_source)
: filter(filter),
- vtable(vtable),
+ filter_adder(filter_adder),
predicates(std::move(predicates)),
registration_source(registration_source),
skip_v3(skip_v3) {}
const grpc_channel_filter* filter;
- const ChannelFilterVtable* vtable;
+ const FilterAdder filter_adder;
std::vector predicates;
SourceLocation registration_source;
bool skip_v3 = false;
@@ -313,17 +262,6 @@ class ChannelInit {
std::vector post_processors;
};
- template
- struct VtableForType {
- static const ChannelFilterVtable* vtable() { return nullptr; }
- };
-
- template
- struct VtableForType> {
- static const ChannelFilterVtable kVtable;
- static const ChannelFilterVtable* vtable() { return &kVtable; }
- };
-
StackConfig stack_configs_[GRPC_NUM_CHANNEL_STACK_TYPES];
static StackConfig BuildStackConfig(
@@ -331,22 +269,6 @@ class ChannelInit {
PostProcessor* post_processors, grpc_channel_stack_type type);
};
-template
-const ChannelInit::ChannelFilterVtable
- ChannelInit::VtableForType>::kVtable = {
- sizeof(CreatedType), alignof(CreatedType),
- [](void* data, const ChannelArgs& args) -> absl::Status {
- // TODO(ctiller): fill in ChannelFilter::Args (2nd arg)
- absl::StatusOr> r = T::Create(args, {});
- if (!r.ok()) return r.status();
- new (data) CreatedType(std::move(*r));
- return absl::OkStatus();
- },
- [](void* data) { Destruct(static_cast*>(data)); },
- [](void* data, CallFilters::StackBuilder& builder) {
- builder.Add(static_cast*>(data)->get());
- }};
-
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_SURFACE_CHANNEL_INIT_H
diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc
index 3ff4c77f062..b2a766ac8a0 100644
--- a/src/core/lib/surface/init.cc
+++ b/src/core/lib/surface/init.cc
@@ -66,10 +66,10 @@ static bool g_shutting_down ABSL_GUARDED_BY(g_init_mu) = false;
namespace grpc_core {
void RegisterSecurityFilters(CoreConfiguration::Builder* builder) {
builder->channel_init()
- ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL)
+ ->RegisterV2Filter(GRPC_CLIENT_SUBCHANNEL)
.IfHasChannelArg(GRPC_ARG_SECURITY_CONNECTOR);
builder->channel_init()
- ->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL)
+ ->RegisterV2Filter(GRPC_CLIENT_DIRECT_CHANNEL)
.IfHasChannelArg(GRPC_ARG_SECURITY_CONNECTOR);
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL)
diff --git a/src/core/lib/transport/call_destination.h b/src/core/lib/transport/call_destination.h
index 77683e230f5..938e0fd3725 100644
--- a/src/core/lib/transport/call_destination.h
+++ b/src/core/lib/transport/call_destination.h
@@ -51,6 +51,24 @@ class CallDestination : public DualRefCounted {
virtual void HandleCall(CallHandler unstarted_call_handler) = 0;
};
+template
+auto MakeCallDestinationFromHandlerFunction(HC handle_call) {
+ class Impl : public CallDestination {
+ public:
+ explicit Impl(HC handle_call) : handle_call_(std::move(handle_call)) {}
+
+ void Orphaned() override {}
+
+ void HandleCall(CallHandler call_handler) override {
+ handle_call_(std::move(call_handler));
+ }
+
+ private:
+ HC handle_call_;
+ };
+ return MakeRefCounted(std::move(handle_call));
+}
+
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_DESTINATION_H
diff --git a/src/core/lib/transport/call_filters.h b/src/core/lib/transport/call_filters.h
index a9e3a1c5b98..d637b6fb0de 100644
--- a/src/core/lib/transport/call_filters.h
+++ b/src/core/lib/transport/call_filters.h
@@ -1245,6 +1245,42 @@ const NoInterceptor
template
const NoInterceptor ServerTrailingMetadataInterceptor::Call::OnFinalize;
+template
+class ClientInitialMetadataInterceptor {
+ public:
+ class Call {
+ public:
+ auto OnClientInitialMetadata(ClientMetadata& md,
+ ClientInitialMetadataInterceptor* filter) {
+ return filter->fn_(md);
+ }
+ static const NoInterceptor OnServerInitialMetadata;
+ static const NoInterceptor OnClientToServerMessage;
+ static const NoInterceptor OnServerToClientMessage;
+ static const NoInterceptor OnServerTrailingMetadata;
+ static const NoInterceptor OnFinalize;
+ };
+
+ explicit ClientInitialMetadataInterceptor(Fn fn) : fn_(std::move(fn)) {}
+
+ private:
+ GPR_NO_UNIQUE_ADDRESS Fn fn_;
+};
+template
+const NoInterceptor
+ ClientInitialMetadataInterceptor::Call::OnServerInitialMetadata;
+template
+const NoInterceptor
+ ClientInitialMetadataInterceptor::Call::OnClientToServerMessage;
+template
+const NoInterceptor
+ ClientInitialMetadataInterceptor::Call::OnServerToClientMessage;
+template
+const NoInterceptor
+ ClientInitialMetadataInterceptor::Call::OnServerTrailingMetadata;
+template
+const NoInterceptor ClientInitialMetadataInterceptor::Call::OnFinalize;
+
} // namespace filters_detail
// Execution environment for a stack of filters.
@@ -1302,6 +1338,14 @@ class CallFilters {
AddOwnedObject([](void* p) { delete static_cast(p); }, p.release());
}
+ template
+ void AddOnClientInitialMetadata(Fn fn) {
+ auto filter = std::make_unique<
+ filters_detail::ClientInitialMetadataInterceptor>(std::move(fn));
+ Add(filter.get());
+ AddOwnedObject(std::move(filter));
+ }
+
template