diff --git a/BUILD b/BUILD
index 2d7d69fcb44..3cbb84fcc1d 100644
--- a/BUILD
+++ b/BUILD
@@ -1464,6 +1464,7 @@ grpc_cc_library(
"//src/core:lib/surface/lame_client.h",
"//src/core:lib/surface/server.h",
"//src/core:lib/surface/validate_metadata.h",
+ "//src/core:lib/surface/wait_for_cq_end_op.h",
"//src/core:lib/transport/batch_builder.h",
"//src/core:lib/transport/connectivity_state.h",
"//src/core:lib/transport/custom_metadata.h",
@@ -1540,6 +1541,7 @@ grpc_cc_library(
"work_serializer",
"//src/core:1999",
"//src/core:activity",
+ "//src/core:all_ok",
"//src/core:arena",
"//src/core:arena_promise",
"//src/core:atomic_utils",
@@ -1612,6 +1614,7 @@ grpc_cc_library(
"//src/core:thread_quota",
"//src/core:time",
"//src/core:transport_fwd",
+ "//src/core:try_join",
"//src/core:try_seq",
"//src/core:type_list",
"//src/core:useful",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 60817e90943..a35b971bd1b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -23456,8 +23456,8 @@ target_include_directories(status_flag_test
target_link_libraries(status_flag_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
- absl::status
absl::statusor
+ gpr
)
diff --git a/Package.swift b/Package.swift
index e2c68c7eefe..7fa98e64126 100644
--- a/Package.swift
+++ b/Package.swift
@@ -1634,10 +1634,12 @@ let package = Package(
"src/core/lib/matchers/matchers.h",
"src/core/lib/promise/activity.cc",
"src/core/lib/promise/activity.h",
+ "src/core/lib/promise/all_ok.h",
"src/core/lib/promise/arena_promise.h",
"src/core/lib/promise/cancel_callback.h",
"src/core/lib/promise/context.h",
"src/core/lib/promise/detail/basic_seq.h",
+ "src/core/lib/promise/detail/join_state.h",
"src/core/lib/promise/detail/promise_factory.h",
"src/core/lib/promise/detail/promise_like.h",
"src/core/lib/promise/detail/seq_state.h",
@@ -1662,6 +1664,7 @@ let package = Package(
"src/core/lib/promise/status_flag.h",
"src/core/lib/promise/trace.cc",
"src/core/lib/promise/trace.h",
+ "src/core/lib/promise/try_join.h",
"src/core/lib/promise/try_seq.h",
"src/core/lib/resolver/endpoint_addresses.cc",
"src/core/lib/resolver/endpoint_addresses.h",
@@ -1869,6 +1872,7 @@ let package = Package(
"src/core/lib/surface/validate_metadata.cc",
"src/core/lib/surface/validate_metadata.h",
"src/core/lib/surface/version.cc",
+ "src/core/lib/surface/wait_for_cq_end_op.h",
"src/core/lib/transport/batch_builder.cc",
"src/core/lib/transport/batch_builder.h",
"src/core/lib/transport/bdp_estimator.cc",
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 71a87f34adb..2e7cd202f5c 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -1033,10 +1033,12 @@ libs:
- src/core/lib/load_balancing/subchannel_interface.h
- src/core/lib/matchers/matchers.h
- src/core/lib/promise/activity.h
+ - src/core/lib/promise/all_ok.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/cancel_callback.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
+ - src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/seq_state.h
@@ -1058,6 +1060,7 @@ libs:
- src/core/lib/promise/sleep.h
- src/core/lib/promise/status_flag.h
- src/core/lib/promise/trace.h
+ - src/core/lib/promise/try_join.h
- src/core/lib/promise/try_seq.h
- src/core/lib/resolver/endpoint_addresses.h
- src/core/lib/resolver/resolver.h
@@ -1156,6 +1159,7 @@ libs:
- src/core/lib/surface/lame_client.h
- src/core/lib/surface/server.h
- src/core/lib/surface/validate_metadata.h
+ - src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/connectivity_state.h
@@ -2490,10 +2494,12 @@ libs:
- src/core/lib/load_balancing/lb_policy_registry.h
- src/core/lib/load_balancing/subchannel_interface.h
- src/core/lib/promise/activity.h
+ - src/core/lib/promise/all_ok.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/cancel_callback.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
+ - src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/seq_state.h
@@ -2515,6 +2521,7 @@ libs:
- src/core/lib/promise/sleep.h
- src/core/lib/promise/status_flag.h
- src/core/lib/promise/trace.h
+ - src/core/lib/promise/try_join.h
- src/core/lib/promise/try_seq.h
- src/core/lib/resolver/endpoint_addresses.h
- src/core/lib/resolver/resolver.h
@@ -2582,6 +2589,7 @@ libs:
- src/core/lib/surface/lame_client.h
- src/core/lib/surface/server.h
- src/core/lib/surface/validate_metadata.h
+ - src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/connectivity_state.h
@@ -4624,10 +4632,12 @@ libs:
- src/core/lib/load_balancing/subchannel_interface.h
- src/core/lib/matchers/matchers.h
- src/core/lib/promise/activity.h
+ - src/core/lib/promise/all_ok.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/cancel_callback.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
+ - src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/seq_state.h
@@ -4647,6 +4657,7 @@ libs:
- src/core/lib/promise/seq.h
- src/core/lib/promise/status_flag.h
- src/core/lib/promise/trace.h
+ - src/core/lib/promise/try_join.h
- src/core/lib/promise/try_seq.h
- src/core/lib/resolver/endpoint_addresses.h
- src/core/lib/resolver/resolver.h
@@ -4716,6 +4727,7 @@ libs:
- src/core/lib/surface/lame_client.h
- src/core/lib/surface/server.h
- src/core/lib/surface/validate_metadata.h
+ - src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/custom_metadata.h
@@ -7638,12 +7650,10 @@ targets:
- src/core/ext/transport/chaotic_good/client_transport.h
- src/core/ext/transport/chaotic_good/frame.h
- src/core/ext/transport/chaotic_good/frame_header.h
- - src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/event_engine_wakeup_scheduler.h
- src/core/lib/promise/inter_activity_pipe.h
- src/core/lib/promise/join.h
- src/core/lib/promise/mpsc.h
- - src/core/lib/promise/try_join.h
- src/core/lib/promise/wait_set.h
- src/core/lib/transport/promise_endpoint.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
@@ -7668,12 +7678,10 @@ targets:
- src/core/ext/transport/chaotic_good/client_transport.h
- src/core/ext/transport/chaotic_good/frame.h
- src/core/ext/transport/chaotic_good/frame_header.h
- - src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/event_engine_wakeup_scheduler.h
- src/core/lib/promise/inter_activity_pipe.h
- src/core/lib/promise/join.h
- src/core/lib/promise/mpsc.h
- - src/core/lib/promise/try_join.h
- src/core/lib/promise/wait_set.h
- src/core/lib/transport/promise_endpoint.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
@@ -12921,7 +12929,6 @@ targets:
build: test
language: c++
headers:
- - src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/join.h
- test/core/promise/test_wakeup_schedulers.h
src:
@@ -13122,7 +13129,6 @@ targets:
build: test
language: c++
headers:
- - src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/join.h
- src/core/lib/transport/promise_endpoint.h
- test/core/promise/test_wakeup_schedulers.h
@@ -16126,8 +16132,8 @@ targets:
- test/core/promise/status_flag_test.cc
deps:
- gtest
- - absl/status:status
- absl/status:statusor
+ - gpr
uses_polling: false
- name: status_helper_test
gtest: true
@@ -17004,10 +17010,12 @@ targets:
- src/core/lib/load_balancing/lb_policy_registry.h
- src/core/lib/load_balancing/subchannel_interface.h
- src/core/lib/promise/activity.h
+ - src/core/lib/promise/all_ok.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/cancel_callback.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_seq.h
+ - src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
- src/core/lib/promise/detail/seq_state.h
@@ -17027,6 +17035,7 @@ targets:
- src/core/lib/promise/seq.h
- src/core/lib/promise/status_flag.h
- src/core/lib/promise/trace.h
+ - src/core/lib/promise/try_join.h
- src/core/lib/promise/try_seq.h
- src/core/lib/resolver/endpoint_addresses.h
- src/core/lib/resolver/resolver.h
@@ -17071,6 +17080,7 @@ targets:
- src/core/lib/surface/lame_client.h
- src/core/lib/surface/server.h
- src/core/lib/surface/validate_metadata.h
+ - src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/custom_metadata.h
@@ -18003,8 +18013,10 @@ targets:
- src/core/lib/gprpp/bitset.h
- src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/detail/promise_like.h
+ - src/core/lib/promise/detail/status.h
- src/core/lib/promise/map.h
- src/core/lib/promise/poll.h
+ - src/core/lib/promise/status_flag.h
- src/core/lib/promise/trace.h
- src/core/lib/promise/try_join.h
src:
@@ -18041,6 +18053,7 @@ targets:
- src/core/lib/promise/detail/seq_state.h
- src/core/lib/promise/detail/status.h
- src/core/lib/promise/poll.h
+ - src/core/lib/promise/status_flag.h
- src/core/lib/promise/trace.h
- src/core/lib/promise/try_seq.h
src:
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 5bc1877ac30..83f07e2fe62 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -1128,10 +1128,12 @@ Pod::Spec.new do |s|
'src/core/lib/load_balancing/subchannel_interface.h',
'src/core/lib/matchers/matchers.h',
'src/core/lib/promise/activity.h',
+ 'src/core/lib/promise/all_ok.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/cancel_callback.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
+ 'src/core/lib/promise/detail/join_state.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
'src/core/lib/promise/detail/seq_state.h',
@@ -1153,6 +1155,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/status_flag.h',
'src/core/lib/promise/trace.h',
+ 'src/core/lib/promise/try_join.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/endpoint_addresses.h',
'src/core/lib/resolver/resolver.h',
@@ -1251,6 +1254,7 @@ Pod::Spec.new do |s|
'src/core/lib/surface/lame_client.h',
'src/core/lib/surface/server.h',
'src/core/lib/surface/validate_metadata.h',
+ 'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/connectivity_state.h',
@@ -2364,10 +2368,12 @@ Pod::Spec.new do |s|
'src/core/lib/load_balancing/subchannel_interface.h',
'src/core/lib/matchers/matchers.h',
'src/core/lib/promise/activity.h',
+ 'src/core/lib/promise/all_ok.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/cancel_callback.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
+ 'src/core/lib/promise/detail/join_state.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
'src/core/lib/promise/detail/seq_state.h',
@@ -2389,6 +2395,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/status_flag.h',
'src/core/lib/promise/trace.h',
+ 'src/core/lib/promise/try_join.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/endpoint_addresses.h',
'src/core/lib/resolver/resolver.h',
@@ -2487,6 +2494,7 @@ Pod::Spec.new do |s|
'src/core/lib/surface/lame_client.h',
'src/core/lib/surface/server.h',
'src/core/lib/surface/validate_metadata.h',
+ 'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/connectivity_state.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 728112f7bb5..64a415514b5 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -1737,10 +1737,12 @@ Pod::Spec.new do |s|
'src/core/lib/matchers/matchers.h',
'src/core/lib/promise/activity.cc',
'src/core/lib/promise/activity.h',
+ 'src/core/lib/promise/all_ok.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/cancel_callback.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
+ 'src/core/lib/promise/detail/join_state.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
'src/core/lib/promise/detail/seq_state.h',
@@ -1765,6 +1767,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/status_flag.h',
'src/core/lib/promise/trace.cc',
'src/core/lib/promise/trace.h',
+ 'src/core/lib/promise/try_join.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/endpoint_addresses.cc',
'src/core/lib/resolver/endpoint_addresses.h',
@@ -1968,6 +1971,7 @@ Pod::Spec.new do |s|
'src/core/lib/surface/validate_metadata.cc',
'src/core/lib/surface/validate_metadata.h',
'src/core/lib/surface/version.cc',
+ 'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.cc',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.cc',
@@ -3130,10 +3134,12 @@ Pod::Spec.new do |s|
'src/core/lib/load_balancing/subchannel_interface.h',
'src/core/lib/matchers/matchers.h',
'src/core/lib/promise/activity.h',
+ 'src/core/lib/promise/all_ok.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/cancel_callback.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_seq.h',
+ 'src/core/lib/promise/detail/join_state.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
'src/core/lib/promise/detail/seq_state.h',
@@ -3155,6 +3161,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/status_flag.h',
'src/core/lib/promise/trace.h',
+ 'src/core/lib/promise/try_join.h',
'src/core/lib/promise/try_seq.h',
'src/core/lib/resolver/endpoint_addresses.h',
'src/core/lib/resolver/resolver.h',
@@ -3253,6 +3260,7 @@ Pod::Spec.new do |s|
'src/core/lib/surface/lame_client.h',
'src/core/lib/surface/server.h',
'src/core/lib/surface/validate_metadata.h',
+ 'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/connectivity_state.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 67b0773a721..82993ca7ac9 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -1640,10 +1640,12 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/matchers/matchers.h )
s.files += %w( src/core/lib/promise/activity.cc )
s.files += %w( src/core/lib/promise/activity.h )
+ s.files += %w( src/core/lib/promise/all_ok.h )
s.files += %w( src/core/lib/promise/arena_promise.h )
s.files += %w( src/core/lib/promise/cancel_callback.h )
s.files += %w( src/core/lib/promise/context.h )
s.files += %w( src/core/lib/promise/detail/basic_seq.h )
+ s.files += %w( src/core/lib/promise/detail/join_state.h )
s.files += %w( src/core/lib/promise/detail/promise_factory.h )
s.files += %w( src/core/lib/promise/detail/promise_like.h )
s.files += %w( src/core/lib/promise/detail/seq_state.h )
@@ -1668,6 +1670,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/status_flag.h )
s.files += %w( src/core/lib/promise/trace.cc )
s.files += %w( src/core/lib/promise/trace.h )
+ s.files += %w( src/core/lib/promise/try_join.h )
s.files += %w( src/core/lib/promise/try_seq.h )
s.files += %w( src/core/lib/resolver/endpoint_addresses.cc )
s.files += %w( src/core/lib/resolver/endpoint_addresses.h )
@@ -1871,6 +1874,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/surface/validate_metadata.cc )
s.files += %w( src/core/lib/surface/validate_metadata.h )
s.files += %w( src/core/lib/surface/version.cc )
+ s.files += %w( src/core/lib/surface/wait_for_cq_end_op.h )
s.files += %w( src/core/lib/transport/batch_builder.cc )
s.files += %w( src/core/lib/transport/batch_builder.h )
s.files += %w( src/core/lib/transport/bdp_estimator.cc )
diff --git a/package.xml b/package.xml
index 54d49bdff94..9ed1cf552fb 100644
--- a/package.xml
+++ b/package.xml
@@ -1622,10 +1622,12 @@
+
+
@@ -1650,6 +1652,7 @@
+
@@ -1853,6 +1856,7 @@
+
diff --git a/src/core/BUILD b/src/core/BUILD
index 1c9774b816b..7c273dce60e 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -439,6 +439,7 @@ grpc_cc_library(
],
deps = [
"promise_status",
+ "//:gpr",
"//:gpr_platform",
],
)
@@ -710,6 +711,7 @@ grpc_cc_library(
"join_state",
"map",
"poll",
+ "status_flag",
"//:gpr_platform",
],
)
@@ -810,6 +812,7 @@ grpc_cc_library(
"promise_like",
"promise_status",
"seq_state",
+ "status_flag",
"//:debug_location",
"//:gpr_platform",
],
diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc
index e958b03a7d8..24f30687182 100644
--- a/src/core/ext/transport/chaotic_good/client_transport.cc
+++ b/src/core/ext/transport/chaotic_good/client_transport.cc
@@ -96,7 +96,7 @@ ClientTransport::ClientTransport(
},
// Write buffers to corresponding endpoints concurrently.
[this]() {
- return TryJoin(
+ return TryJoin(
control_endpoint_->Write(
std::move(control_endpoint_write_buffer_)),
data_endpoint_->Write(std::move(data_endpoint_write_buffer_)));
@@ -134,7 +134,7 @@ ClientTransport::ClientTransport(
.value());
// Read header and trailers from control endpoint.
// Read message padding and message from data endpoint.
- return TryJoin(
+ return TryJoin(
control_endpoint_->Read(frame_header_->GetFrameLength()),
data_endpoint_->Read(frame_header_->message_padding +
frame_header_->message_length));
diff --git a/src/core/ext/transport/chaotic_good/client_transport.h b/src/core/ext/transport/chaotic_good/client_transport.h
index 86f9072fcfc..23ecdbfe84a 100644
--- a/src/core/ext/transport/chaotic_good/client_transport.h
+++ b/src/core/ext/transport/chaotic_good/client_transport.h
@@ -105,7 +105,7 @@ class ClientTransport {
std::move(pipe_server_frames.sender))));
}
return TrySeq(
- TryJoin(
+ TryJoin(
// Continuously send client frame with client to server messages.
ForEach(std::move(*call_args.client_to_server_messages),
[stream_id, initial_frame = true,
diff --git a/src/core/lib/channel/channel_stack.cc b/src/core/lib/channel/channel_stack.cc
index 3f8edc6dc4f..528a299632d 100644
--- a/src/core/lib/channel/channel_stack.cc
+++ b/src/core/lib/channel/channel_stack.cc
@@ -326,6 +326,11 @@ void grpc_channel_stack::InitClientCallSpine(
grpc_core::CallSpineInterface* call) {
for (size_t i = 0; i < count; i++) {
auto* elem = grpc_channel_stack_element(this, i);
+ if (elem->filter->init_call == nullptr) {
+ grpc_core::Crash(
+ absl::StrCat("Filter '", elem->filter->name,
+ "' does not support the call-v3 interface"));
+ }
elem->filter->init_call(elem, call);
}
}
@@ -334,6 +339,11 @@ void grpc_channel_stack::InitServerCallSpine(
grpc_core::CallSpineInterface* call) {
for (size_t i = 0; i < count; i++) {
auto* elem = grpc_channel_stack_element(this, count - 1 - i);
+ if (elem->filter->init_call == nullptr) {
+ grpc_core::Crash(
+ absl::StrCat("Filter '", elem->filter->name,
+ "' does not support the call-v3 interface"));
+ }
elem->filter->init_call(elem, call);
}
}
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index b7eccc29dd4..9ae7e14ee90 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -873,10 +873,12 @@ grpc_channel_filter MakeConnectedFilter() {
// do this, and I'm not sure what that is yet. This is only "safe"
// because call stacks place no additional data after the last call
// element, and the last call element MUST be the connected channel.
- channel_stack->call_stack_size +=
- static_cast(elem->channel_data)
- ->transport->filter_stack_transport()
- ->SizeOfStream();
+ auto* transport =
+ static_cast(elem->channel_data)->transport;
+ if (transport->filter_stack_transport() != nullptr) {
+ channel_stack->call_stack_size +=
+ transport->filter_stack_transport()->SizeOfStream();
+ }
},
connected_channel_destroy_channel_elem,
connected_channel_get_channel_info,
@@ -884,13 +886,27 @@ grpc_channel_filter MakeConnectedFilter() {
};
}
-ArenaPromise MakeTransportCallPromise(
- Transport*, CallArgs, NextPromiseFactory) {
- Crash("unimplemented");
+ArenaPromise MakeClientTransportCallPromise(
+ Transport* transport, CallArgs call_args, NextPromiseFactory) {
+ auto spine = GetContext()->MakeCallSpine(std::move(call_args));
+ transport->client_transport()->StartCall(CallHandler{spine});
+ return Map(spine->server_trailing_metadata().receiver.Next(),
+ [](NextResult r) {
+ if (r.has_value()) {
+ auto md = std::move(r.value());
+ md->Set(GrpcStatusFromWire(), true);
+ return md;
+ }
+ auto m = GetContext()->MakePooled(
+ GetContext());
+ m->Set(GrpcStatusMetadata(), GRPC_STATUS_CANCELLED);
+ m->Set(GrpcCallWasCancelled(), true);
+ return m;
+ });
}
-const grpc_channel_filter kPromiseBasedTransportFilter =
- MakeConnectedFilter();
+const grpc_channel_filter kClientPromiseBasedTransportFilter =
+ MakeConnectedFilter();
#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL
const grpc_channel_filter kClientEmulatedFilter =
@@ -908,11 +924,37 @@ const grpc_channel_filter kServerEmulatedFilter =
MakeConnectedFilter();
#endif
-bool TransportSupportsPromiseBasedCalls(const ChannelArgs& args) {
+// noop filter for the v3 stack: placeholder for now because other code requires
+// we have a terminator.
+// TODO(ctiller): delete when v3 transition is complete.
+const grpc_channel_filter kServerPromiseBasedTransportFilter = {
+ nullptr,
+ [](grpc_channel_element*, CallArgs, NextPromiseFactory)
+ -> ArenaPromise { Crash("not implemented"); },
+ /* init_call: */ [](grpc_channel_element*, CallSpineInterface*) {},
+ connected_channel_start_transport_op,
+ 0,
+ nullptr,
+ set_pollset_or_pollset_set,
+ nullptr,
+ sizeof(channel_data),
+ connected_channel_init_channel_elem,
+ +[](grpc_channel_stack*, grpc_channel_element*) {},
+ connected_channel_destroy_channel_elem,
+ connected_channel_get_channel_info,
+ "connected",
+};
+
+bool TransportSupportsClientPromiseBasedCalls(const ChannelArgs& args) {
auto* transport = args.GetObject();
return transport->client_transport() != nullptr;
}
+bool TransportSupportsServerPromiseBasedCalls(const ChannelArgs& args) {
+ auto* transport = args.GetObject();
+ return transport->server_transport() != nullptr;
+}
+
} // namespace
void RegisterConnectedChannel(CoreConfiguration::Builder* builder) {
@@ -925,32 +967,33 @@ void RegisterConnectedChannel(CoreConfiguration::Builder* builder) {
// Option 1, and our ideal: the transport supports promise based calls,
// and so we simply use the transport directly.
builder->channel_init()
- ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &kPromiseBasedTransportFilter)
+ ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL,
+ &kClientPromiseBasedTransportFilter)
.Terminal()
- .If(TransportSupportsPromiseBasedCalls);
+ .If(TransportSupportsClientPromiseBasedCalls);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL,
- &kPromiseBasedTransportFilter)
+ &kClientPromiseBasedTransportFilter)
.Terminal()
- .If(TransportSupportsPromiseBasedCalls);
+ .If(TransportSupportsClientPromiseBasedCalls);
builder->channel_init()
- ->RegisterFilter(GRPC_SERVER_CHANNEL, &kPromiseBasedTransportFilter)
+ ->RegisterFilter(GRPC_SERVER_CHANNEL, &kServerPromiseBasedTransportFilter)
.Terminal()
- .If(TransportSupportsPromiseBasedCalls);
+ .If(TransportSupportsServerPromiseBasedCalls);
// Option 2: the transport does not support promise based calls.
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &kClientEmulatedFilter)
.Terminal()
- .IfNot(TransportSupportsPromiseBasedCalls);
+ .IfNot(TransportSupportsClientPromiseBasedCalls);
builder->channel_init()
->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, &kClientEmulatedFilter)
.Terminal()
- .IfNot(TransportSupportsPromiseBasedCalls);
+ .IfNot(TransportSupportsClientPromiseBasedCalls);
builder->channel_init()
->RegisterFilter(GRPC_SERVER_CHANNEL, &kServerEmulatedFilter)
.Terminal()
- .IfNot(TransportSupportsPromiseBasedCalls);
+ .IfNot(TransportSupportsServerPromiseBasedCalls);
}
} // namespace grpc_core
diff --git a/src/core/lib/promise/detail/promise_factory.h b/src/core/lib/promise/detail/promise_factory.h
index d278a650782..127819598a2 100644
--- a/src/core/lib/promise/detail/promise_factory.h
+++ b/src/core/lib/promise/detail/promise_factory.h
@@ -18,6 +18,7 @@
#include
#include
+#include
#include
#include "absl/meta/type_traits.h"
diff --git a/src/core/lib/promise/latch.h b/src/core/lib/promise/latch.h
index 4ade84b8939..67002234b7c 100644
--- a/src/core/lib/promise/latch.h
+++ b/src/core/lib/promise/latch.h
@@ -185,6 +185,8 @@ class Latch {
waiter_.Wake();
}
+ bool is_set() const { return is_set_; }
+
private:
std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(), " LATCH(void)[0x",
diff --git a/src/core/lib/promise/status_flag.h b/src/core/lib/promise/status_flag.h
index 0e5af4f0da8..d9067509c32 100644
--- a/src/core/lib/promise/status_flag.h
+++ b/src/core/lib/promise/status_flag.h
@@ -21,6 +21,8 @@
#include "absl/status/statusor.h"
#include "absl/types/optional.h"
+#include
+
#include "src/core/lib/promise/detail/status.h"
namespace grpc_core {
@@ -41,6 +43,16 @@ struct StatusCastImpl {
static absl::Status Cast(Success) { return absl::OkStatus(); }
};
+template <>
+struct StatusCastImpl {
+ static absl::Status Cast(Failure) { return absl::CancelledError(); }
+};
+
+template
+struct StatusCastImpl, Failure> {
+ static absl::StatusOr Cast(Failure) { return absl::CancelledError(); }
+};
+
// A boolean representing whether an operation succeeded (true) or failed
// (false).
class StatusFlag {
@@ -91,18 +103,25 @@ class ValueOrFailure {
ValueOrFailure(T value) : value_(std::move(value)) {}
// NOLINTNEXTLINE(google-explicit-constructor)
ValueOrFailure(Failure) {}
+ // NOLINTNEXTLINE(google-explicit-constructor)
+ ValueOrFailure(StatusFlag status) { GPR_ASSERT(!status.ok()); }
static ValueOrFailure FromOptional(absl::optional value) {
return ValueOrFailure{std::move(value)};
}
bool ok() const { return value_.has_value(); }
+ StatusFlag status() const { return StatusFlag(ok()); }
const T& value() const { return value_.value(); }
T& value() { return value_.value(); }
const T& operator*() const { return *value_; }
T& operator*() { return *value_; }
+ bool operator==(const ValueOrFailure& other) const {
+ return value_ == other.value_;
+ }
+
private:
absl::optional value_;
};
@@ -117,13 +136,6 @@ inline T TakeValue(ValueOrFailure&& value) {
return std::move(value.value());
}
-template
-struct StatusCastImpl> {
- static absl::Status Cast(const ValueOrFailure flag) {
- return flag.ok() ? absl::OkStatus() : absl::CancelledError();
- }
-};
-
template
struct StatusCastImpl, ValueOrFailure> {
static absl::StatusOr Cast(ValueOrFailure value) {
@@ -132,6 +144,29 @@ struct StatusCastImpl, ValueOrFailure> {
}
};
+template
+struct StatusCastImpl, Failure> {
+ static ValueOrFailure Cast(Failure) {
+ return ValueOrFailure(Failure{});
+ }
+};
+
+template
+struct StatusCastImpl, StatusFlag&> {
+ static ValueOrFailure Cast(StatusFlag f) {
+ GPR_ASSERT(!f.ok());
+ return ValueOrFailure(Failure{});
+ }
+};
+
+template
+struct StatusCastImpl, StatusFlag> {
+ static ValueOrFailure Cast(StatusFlag f) {
+ GPR_ASSERT(!f.ok());
+ return ValueOrFailure(Failure{});
+ }
+};
+
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_PROMISE_STATUS_FLAG_H
\ No newline at end of file
diff --git a/src/core/lib/promise/try_join.h b/src/core/lib/promise/try_join.h
index 14503db5796..be3354dc9b7 100644
--- a/src/core/lib/promise/try_join.h
+++ b/src/core/lib/promise/try_join.h
@@ -27,6 +27,7 @@
#include "src/core/lib/promise/detail/join_state.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/poll.h"
+#include "src/core/lib/promise/status_flag.h"
namespace grpc_core {
@@ -44,48 +45,68 @@ T IntoResult(absl::StatusOr* status) {
inline Empty IntoResult(absl::Status*) { return Empty{}; }
// Traits object to pass to BasicJoin
+template class Result>
struct TryJoinTraits {
template
- using ResultType = absl::StatusOr>;
+ using ResultType = Result>;
template
static bool IsOk(const absl::StatusOr& x) {
return x.ok();
}
static bool IsOk(const absl::Status& x) { return x.ok(); }
+ static bool IsOk(StatusFlag x) { return x.ok(); }
+ template
+ static bool IsOk(const ValueOrFailure& x) {
+ return x.ok();
+ }
template
static T Unwrapped(absl::StatusOr x) {
return std::move(*x);
}
+ template
+ static T Unwrapped(ValueOrFailure x) {
+ return std::move(*x);
+ }
static Empty Unwrapped(absl::Status) { return Empty{}; }
+ static Empty Unwrapped(StatusFlag) { return Empty{}; }
template
static R EarlyReturn(absl::StatusOr x) {
return x.status();
}
template
static R EarlyReturn(absl::Status x) {
- return x;
+ return StatusCast(std::move(x));
+ }
+ template
+ static R EarlyReturn(StatusFlag x) {
+ return StatusCast(x);
+ }
+ template
+ static R EarlyReturn(const ValueOrFailure& x) {
+ GPR_ASSERT(!x.ok());
+ return StatusCast(Failure{});
}
template
static auto FinalReturn(A&&... a) {
- return absl::StatusOr>(
- std::make_tuple(std::forward(a)...));
+ return Result>(std::make_tuple(std::forward(a)...));
}
};
// Implementation of TryJoin combinator.
-template
+template class R, typename... Promises>
class TryJoin {
public:
explicit TryJoin(Promises... promises) : state_(std::move(promises)...) {}
auto operator()() { return state_.PollOnce(); }
private:
- JoinState state_;
+ JoinState, Promises...> state_;
};
+template class R>
struct WrapInStatusOrTuple {
template
- absl::StatusOr> operator()(absl::StatusOr x) {
+ R> operator()(R x) {
if (!x.ok()) return x.status();
return std::make_tuple(std::move(*x));
}
@@ -96,14 +117,14 @@ struct WrapInStatusOrTuple {
// Run all promises.
// If any fail, cancel the rest and return the failure.
// If all succeed, return Ok(tuple-of-results).
-template
-promise_detail::TryJoin TryJoin(Promises... promises) {
- return promise_detail::TryJoin(std::move(promises)...);
+template class R, typename... Promises>
+promise_detail::TryJoin TryJoin(Promises... promises) {
+ return promise_detail::TryJoin(std::move(promises)...);
}
-template
+template class R, typename F>
auto TryJoin(F promise) {
- return Map(promise, promise_detail::WrapInStatusOrTuple{});
+ return Map(promise, promise_detail::WrapInStatusOrTuple{});
}
} // namespace grpc_core
diff --git a/src/core/lib/promise/try_seq.h b/src/core/lib/promise/try_seq.h
index 8ef145d1814..ca04904ab1d 100644
--- a/src/core/lib/promise/try_seq.h
+++ b/src/core/lib/promise/try_seq.h
@@ -31,6 +31,7 @@
#include "src/core/lib/promise/detail/seq_state.h"
#include "src/core/lib/promise/detail/status.h"
#include "src/core/lib/promise/poll.h"
+#include "src/core/lib/promise/status_flag.h"
namespace grpc_core {
@@ -89,6 +90,7 @@ struct TrySeqTraitsWithSfinae> {
return run_next(std::move(prior));
}
};
+
template
struct TakeValueExists {
static constexpr bool value = false;
@@ -146,7 +148,7 @@ struct TrySeqTraitsWithSfinae<
template
static R ReturnValue(T&& status) {
GPR_DEBUG_ASSERT(!IsStatusOk(status));
- return StatusCast(std::move(status));
+ return StatusCast(status.status());
}
template
static Poll CheckResultAndRunNext(T prior, RunNext run_next) {
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 6d3f5c6eaa5..1bb29fa6e25 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -80,6 +80,7 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/promise/activity.h"
+#include "src/core/lib/promise/all_ok.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/latch.h"
@@ -89,6 +90,7 @@
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/race.h"
#include "src/core/lib/promise/seq.h"
+#include "src/core/lib/promise/status_flag.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -98,6 +100,7 @@
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/surface/server.h"
#include "src/core/lib/surface/validate_metadata.h"
+#include "src/core/lib/surface/wait_for_cq_end_op.h"
#include "src/core/lib/transport/batch_builder.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/metadata_batch.h"
@@ -1987,22 +1990,45 @@ bool ValidateMetadata(size_t count, grpc_metadata* metadata) {
// PromiseBasedCall
// Will be folded into Call once the promise conversion is done
-class PromiseBasedCall : public Call,
- public Party,
- public grpc_event_engine::experimental::EventEngine::
- Closure /* for deadlines */ {
+class BasicPromiseBasedCall : public Call,
+ public Party,
+ public grpc_event_engine::experimental::
+ EventEngine::Closure /* for deadlines */ {
public:
- PromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
- const grpc_call_create_args& args);
+ using Call::arena;
- void ContextSet(grpc_context_index elem, void* value,
- void (*destroy)(void* value)) override;
- void* ContextGet(grpc_context_index elem) const override;
- void SetCompletionQueue(grpc_completion_queue* cq) override;
- bool Completed() final { return finished_.IsSet(); }
+ BasicPromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
+ const grpc_call_create_args& args)
+ : Call(arena, args.server_transport_data == nullptr, args.send_deadline,
+ args.channel->Ref()),
+ Party(arena, initial_external_refs != 0 ? 1 : 0),
+ external_refs_(initial_external_refs),
+ cq_(args.cq) {
+ if (args.cq != nullptr) {
+ GRPC_CQ_INTERNAL_REF(args.cq, "bind");
+ }
+ }
+
+ ~BasicPromiseBasedCall() override {
+ if (cq_) GRPC_CQ_INTERNAL_UNREF(cq_, "bind");
+ for (int i = 0; i < GRPC_CONTEXT_COUNT; i++) {
+ if (context_[i].destroy) {
+ context_[i].destroy(context_[i].value);
+ }
+ }
+ }
+
+ // Implementation of EventEngine::Closure, called when deadline expires
+ void Run() final;
virtual void OrphanCall() = 0;
+ virtual ServerCallContext* server_call_context() { return nullptr; }
+ void SetCompletionQueue(grpc_completion_queue* cq) final {
+ cq_ = cq;
+ GRPC_CQ_INTERNAL_REF(cq, "bind");
+ }
+
// Implementation of call refcounting: move this to DualRefCounted once we
// don't need to maintain FilterStackCall compatibility
void ExternalRef() final {
@@ -2039,9 +2065,18 @@ class PromiseBasedCall : public Call,
[](Empty) {});
}
- // This should return nullptr for the promise stack (and alternative means
- // for that functionality be invented)
- grpc_call_stack* call_stack() override { return nullptr; }
+ void ContextSet(grpc_context_index elem, void* value,
+ void (*destroy)(void*)) final {
+ if (context_[elem].destroy != nullptr) {
+ context_[elem].destroy(context_[elem].value);
+ }
+ context_[elem].value = value;
+ context_[elem].destroy = destroy;
+ }
+
+ void* ContextGet(grpc_context_index elem) const final {
+ return context_[elem].value;
+ }
void UpdateDeadline(Timestamp deadline) ABSL_LOCKS_EXCLUDED(deadline_mu_);
void ResetDeadline() ABSL_LOCKS_EXCLUDED(deadline_mu_);
@@ -2050,40 +2085,149 @@ class PromiseBasedCall : public Call,
return deadline_;
}
- // Implementation of EventEngine::Closure, called when deadline expires
- void Run() override;
-
- virtual ServerCallContext* server_call_context() { return nullptr; }
- bool failed_before_recv_message() const final {
- return failed_before_recv_message_.load(std::memory_order_relaxed);
+ // Accept the stats from the context (call once we have proof the transport is
+ // done with them).
+ void AcceptTransportStatsFromContext() {
+ final_stats_ = *call_context_.call_stats();
}
- grpc_event_engine::experimental::EventEngine* event_engine() const override {
- return channel()->event_engine();
- }
+ // This should return nullptr for the promise stack (and alternative means
+ // for that functionality be invented)
+ grpc_call_stack* call_stack() final { return nullptr; }
- using Call::arena;
+ virtual RefCountedPtr MakeCallSpine(CallArgs) {
+ Crash("Not implemented");
+ }
protected:
class ScopedContext
: public ScopedActivity,
- public BatchBuilder,
- public promise_detail::Context,
public promise_detail::Context,
public promise_detail::Context,
public promise_detail::Context,
public promise_detail::Context {
public:
- explicit ScopedContext(PromiseBasedCall* call)
+ explicit ScopedContext(BasicPromiseBasedCall* call)
: ScopedActivity(call),
- BatchBuilder(&call->batch_payload_),
- promise_detail::Context(this),
promise_detail::Context(call->arena()),
promise_detail::Context(call->context_),
promise_detail::Context(&call->call_context_),
promise_detail::Context(&call->finalization_) {}
};
+ grpc_call_context_element* context() { return context_; }
+
+ grpc_completion_queue* cq() { return cq_; }
+
+ // At the end of the call run any finalization actions.
+ void SetFinalizationStatus(grpc_status_code status, Slice status_details) {
+ final_message_ = std::move(status_details);
+ final_status_ = status;
+ }
+
+ grpc_event_engine::experimental::EventEngine* event_engine() const override {
+ return channel()->event_engine();
+ }
+
+ private:
+ void PartyOver() final {
+ {
+ ScopedContext ctx(this);
+ std::string message;
+ grpc_call_final_info final_info;
+ final_info.stats = final_stats_;
+ final_info.final_status = final_status_;
+ // TODO(ctiller): change type here so we don't need to copy this string.
+ final_info.error_string = nullptr;
+ if (!final_message_.empty()) {
+ message = std::string(final_message_.begin(), final_message_.end());
+ final_info.error_string = message.c_str();
+ }
+ final_info.stats.latency =
+ gpr_cycle_counter_sub(gpr_get_cycle_counter(), start_time());
+ finalization_.Run(&final_info);
+ CancelRemainingParticipants();
+ arena()->DestroyManagedNewObjects();
+ }
+ DeleteThis();
+ }
+
+ // Double refcounted for now: party owns the internal refcount, we track the
+ // external refcount. Figure out a better scheme post-promise conversion.
+ std::atomic external_refs_;
+ CallFinalization finalization_;
+ CallContext call_context_{this};
+ // Contexts for various subsystems (security, tracing, ...).
+ grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
+ grpc_call_stats final_stats_{};
+ // Current deadline.
+ Mutex deadline_mu_;
+ Timestamp deadline_ ABSL_GUARDED_BY(deadline_mu_) = Timestamp::InfFuture();
+ grpc_event_engine::experimental::EventEngine::TaskHandle ABSL_GUARDED_BY(
+ deadline_mu_) deadline_task_;
+ Slice final_message_;
+ grpc_status_code final_status_ = GRPC_STATUS_UNKNOWN;
+ grpc_completion_queue* cq_;
+};
+
+void BasicPromiseBasedCall::UpdateDeadline(Timestamp deadline) {
+ MutexLock lock(&deadline_mu_);
+ if (grpc_call_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "%s[call] UpdateDeadline from=%s to=%s",
+ DebugTag().c_str(), deadline_.ToString().c_str(),
+ deadline.ToString().c_str());
+ }
+ if (deadline >= deadline_) return;
+ auto* const event_engine = channel()->event_engine();
+ if (deadline_ != Timestamp::InfFuture()) {
+ if (!event_engine->Cancel(deadline_task_)) return;
+ } else {
+ InternalRef("deadline");
+ }
+ deadline_ = deadline;
+ deadline_task_ = event_engine->RunAfter(deadline - Timestamp::Now(), this);
+}
+
+void BasicPromiseBasedCall::ResetDeadline() {
+ MutexLock lock(&deadline_mu_);
+ if (deadline_ == Timestamp::InfFuture()) return;
+ auto* const event_engine = channel()->event_engine();
+ if (!event_engine->Cancel(deadline_task_)) return;
+ deadline_ = Timestamp::InfFuture();
+ InternalUnref("deadline");
+}
+
+void BasicPromiseBasedCall::Run() {
+ ApplicationCallbackExecCtx callback_exec_ctx;
+ ExecCtx exec_ctx;
+ CancelWithError(absl::DeadlineExceededError("Deadline exceeded"));
+ InternalUnref("deadline");
+}
+
+class PromiseBasedCall : public BasicPromiseBasedCall {
+ public:
+ PromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
+ const grpc_call_create_args& args);
+
+ bool Completed() final { return finished_.IsSet(); }
+
+ bool failed_before_recv_message() const final {
+ return failed_before_recv_message_.load(std::memory_order_relaxed);
+ }
+
+ using Call::arena;
+
+ protected:
+ class ScopedContext : public BasicPromiseBasedCall::ScopedContext,
+ public BatchBuilder,
+ public promise_detail::Context {
+ public:
+ explicit ScopedContext(PromiseBasedCall* call)
+ : BasicPromiseBasedCall::ScopedContext(call),
+ BatchBuilder(&call->batch_payload_),
+ promise_detail::Context(this) {}
+ };
+
class Completion {
public:
Completion() : index_(kNullIndex) {}
@@ -2110,15 +2254,6 @@ class PromiseBasedCall : public Call,
uint8_t index_;
};
- ~PromiseBasedCall() override {
- if (cq_) GRPC_CQ_INTERNAL_UNREF(cq_, "bind");
- for (int i = 0; i < GRPC_CONTEXT_COUNT; i++) {
- if (context_[i].destroy) {
- context_[i].destroy(context_[i].value);
- }
- }
- }
-
// Enumerates why a Completion is still pending
enum class PendingOp {
// We're in the midst of starting a batch of operations
@@ -2185,26 +2320,6 @@ class PromiseBasedCall : public Call,
// Mark the completion as infallible. Overrides FailCompletion to report
// success always.
void ForceCompletionSuccess(const Completion& completion);
- // Accept the stats from the context (call once we have proof the transport is
- // done with them).
- // Right now this means that promise based calls do not record correct stats
- // with census if they are cancelled.
- // TODO(ctiller): this should be remedied before promise based calls are
- // dexperimentalized.
- void AcceptTransportStatsFromContext() {
- final_stats_ = *call_context_.call_stats();
- }
-
- grpc_completion_queue* cq() { return cq_; }
-
- void CToMetadata(grpc_metadata* metadata, size_t count,
- grpc_metadata_batch* batch);
-
- // At the end of the call run any finalization actions.
- void SetFinalizationStatus(grpc_status_code status, Slice status_details) {
- final_message_ = std::move(status_details);
- final_status_ = status;
- }
std::string PresentAndCompletionText(const char* caption, bool has,
const Completion& completion) const {
@@ -2355,45 +2470,7 @@ class PromiseBasedCall : public Call,
grpc_cq_completion completion;
};
- void PartyOver() override {
- {
- ScopedContext ctx(this);
- std::string message;
- grpc_call_final_info final_info;
- final_info.stats = final_stats_;
- final_info.final_status = final_status_;
- // TODO(ctiller): change type here so we don't need to copy this string.
- final_info.error_string = nullptr;
- if (!final_message_.empty()) {
- message = std::string(final_message_.begin(), final_message_.end());
- final_info.error_string = message.c_str();
- }
- final_info.stats.latency =
- gpr_cycle_counter_sub(gpr_get_cycle_counter(), start_time());
- finalization_.Run(&final_info);
- CancelRemainingParticipants();
- arena()->DestroyManagedNewObjects();
- }
- DeleteThis();
- }
-
- CallContext call_context_{this};
- // Double refcounted for now: party owns the internal refcount, we track the
- // external refcount. Figure out a better scheme post-promise conversion.
- std::atomic external_refs_;
- // Contexts for various subsystems (security, tracing, ...).
- grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
- grpc_completion_queue* cq_;
CompletionInfo completion_info_[6];
- grpc_call_stats final_stats_{};
- Slice final_message_;
- grpc_status_code final_status_ = GRPC_STATUS_UNKNOWN;
- CallFinalization finalization_;
- // Current deadline.
- Mutex deadline_mu_;
- Timestamp deadline_ ABSL_GUARDED_BY(deadline_mu_) = Timestamp::InfFuture();
- grpc_event_engine::experimental::EventEngine::TaskHandle ABSL_GUARDED_BY(
- deadline_mu_) deadline_task_;
ExternallyObservableLatch finished_;
// Non-zero with an outstanding GRPC_OP_SEND_INITIAL_METADATA or
// GRPC_OP_SEND_MESSAGE (one count each), and 0 once those payloads have been
@@ -2403,7 +2480,7 @@ class PromiseBasedCall : public Call,
// Waiter for when sends_queued_ becomes 0.
IntraActivityWaiter waiting_for_queued_sends_;
grpc_byte_buffer** recv_message_ = nullptr;
- grpc_transport_stream_op_batch_payload batch_payload_{context_};
+ grpc_transport_stream_op_batch_payload batch_payload_{context()};
};
template
@@ -2423,18 +2500,10 @@ grpc_error_handle MakePromiseBasedCall(grpc_call_create_args* args,
PromiseBasedCall::PromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
const grpc_call_create_args& args)
- : Call(arena, args.server_transport_data == nullptr, args.send_deadline,
- args.channel->Ref()),
- Party(arena, initial_external_refs != 0 ? 1 : 0),
- external_refs_(initial_external_refs),
- cq_(args.cq) {
- if (args.cq != nullptr) {
- GRPC_CQ_INTERNAL_REF(args.cq, "bind");
- }
-}
+ : BasicPromiseBasedCall(arena, initial_external_refs, args) {}
-void PromiseBasedCall::CToMetadata(grpc_metadata* metadata, size_t count,
- grpc_metadata_batch* b) {
+static void CToMetadata(grpc_metadata* metadata, size_t count,
+ grpc_metadata_batch* b) {
for (size_t i = 0; i < count; i++) {
grpc_metadata* md = &metadata[i];
auto key = StringViewFromSlice(md->key);
@@ -2451,19 +2520,6 @@ void PromiseBasedCall::CToMetadata(grpc_metadata* metadata, size_t count,
}
}
-void PromiseBasedCall::ContextSet(grpc_context_index elem, void* value,
- void (*destroy)(void*)) {
- if (context_[elem].destroy != nullptr) {
- context_[elem].destroy(context_[elem].value);
- }
- context_[elem].value = value;
- context_[elem].destroy = destroy;
-}
-
-void* PromiseBasedCall::ContextGet(grpc_context_index elem) const {
- return context_[elem].value;
-}
-
PromiseBasedCall::Completion PromiseBasedCall::StartCompletion(
void* tag, bool is_closure, const grpc_op* ops) {
Completion c(BatchSlotForOp(ops[0].op));
@@ -2543,45 +2599,6 @@ void PromiseBasedCall::FinishOpOnCompletion(Completion* completion,
}
}
-void PromiseBasedCall::SetCompletionQueue(grpc_completion_queue* cq) {
- cq_ = cq;
- GRPC_CQ_INTERNAL_REF(cq, "bind");
-}
-
-void PromiseBasedCall::UpdateDeadline(Timestamp deadline) {
- MutexLock lock(&deadline_mu_);
- if (grpc_call_trace.enabled()) {
- gpr_log(GPR_DEBUG, "%s[call] UpdateDeadline from=%s to=%s",
- DebugTag().c_str(), deadline_.ToString().c_str(),
- deadline.ToString().c_str());
- }
- if (deadline >= deadline_) return;
- auto* const event_engine = channel()->event_engine();
- if (deadline_ != Timestamp::InfFuture()) {
- if (!event_engine->Cancel(deadline_task_)) return;
- } else {
- InternalRef("deadline");
- }
- deadline_ = deadline;
- deadline_task_ = event_engine->RunAfter(deadline - Timestamp::Now(), this);
-}
-
-void PromiseBasedCall::ResetDeadline() {
- MutexLock lock(&deadline_mu_);
- if (deadline_ == Timestamp::InfFuture()) return;
- auto* const event_engine = channel()->event_engine();
- if (!event_engine->Cancel(deadline_task_)) return;
- deadline_ = Timestamp::InfFuture();
- InternalUnref("deadline");
-}
-
-void PromiseBasedCall::Run() {
- ApplicationCallbackExecCtx callback_exec_ctx;
- ExecCtx exec_ctx;
- CancelWithError(absl::DeadlineExceededError("Deadline exceeded"));
- InternalUnref("deadline");
-}
-
void PromiseBasedCall::StartSendMessage(const grpc_op& op,
const Completion& completion,
PipeSender* sender,
@@ -2694,6 +2711,13 @@ ServerCallContext* CallContext::server_call_context() {
return call_->server_call_context();
}
+RefCountedPtr CallContext::MakeCallSpine(
+ CallArgs call_args) {
+ return call_->MakeCallSpine(std::move(call_args));
+}
+
+grpc_call* CallContext::c_call() { return call_->c_ptr(); }
+
///////////////////////////////////////////////////////////////////////////////
// PublishMetadataArray
@@ -2814,6 +2838,81 @@ class ClientPromiseBasedCall final : public PromiseBasedCall {
return absl::StrFormat("CLIENT_CALL[%p]: ", this);
}
+ RefCountedPtr MakeCallSpine(CallArgs call_args) final {
+ class WrappingCallSpine final : public CallSpineInterface {
+ public:
+ WrappingCallSpine(ClientPromiseBasedCall* call,
+ ClientMetadataHandle metadata)
+ : call_(call) {
+ call_->InternalRef("call-spine");
+ SpawnInfallible("send_client_initial_metadata",
+ [this, metadata = std::move(metadata)]() mutable {
+ return Map(client_initial_metadata_.sender.Push(
+ std::move(metadata)),
+ [](bool) { return Empty{}; });
+ });
+ SpawnInfallible("monitor_cancellation", [this]() {
+ return Seq(cancel_error_.Wait(),
+ [this](ServerMetadataHandle trailing_metadata) {
+ Crash("here");
+ return Map(server_trailing_metadata_.sender.Push(
+ std::move(trailing_metadata)),
+ [](bool) { return Empty{}; });
+ });
+ });
+ }
+
+ ~WrappingCallSpine() override { call_->InternalUnref("call-spine"); }
+
+ Pipe& client_initial_metadata() override {
+ return client_initial_metadata_;
+ }
+
+ Pipe& client_to_server_messages() override {
+ return call_->client_to_server_messages_;
+ }
+
+ Pipe& server_initial_metadata() override {
+ return call_->server_initial_metadata_;
+ }
+
+ Pipe& server_to_client_messages() override {
+ return call_->server_to_client_messages_;
+ }
+
+ Pipe& server_trailing_metadata() override {
+ return server_trailing_metadata_;
+ }
+
+ Latch& cancel_latch() override {
+ return cancel_error_;
+ }
+
+ Party& party() override { return *call_; }
+
+ void IncrementRefCount() override { refs_.Ref(); }
+ void Unref() override {
+ if (refs_.Unref()) delete this;
+ }
+
+ private:
+ RefCount refs_;
+ ClientPromiseBasedCall* const call_;
+ std::atomic sent_trailing_metadata_{false};
+ Pipe client_initial_metadata_{call_->arena()};
+ Pipe server_trailing_metadata_{call_->arena()};
+ Latch cancel_error_;
+ };
+ GPR_ASSERT(call_args.server_initial_metadata ==
+ &server_initial_metadata_.sender);
+ GPR_ASSERT(call_args.client_to_server_messages ==
+ &client_to_server_messages_.receiver);
+ GPR_ASSERT(call_args.server_to_client_messages ==
+ &server_to_client_messages_.sender);
+ return MakeRefCounted(
+ this, std::move(call_args.client_initial_metadata));
+ }
+
private:
// Finish the call with the given status/trailing metadata.
void Finish(ServerMetadataHandle trailing_metadata);
@@ -3157,7 +3256,8 @@ void ClientPromiseBasedCall::StartRecvStatusOnClient(
#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
-class ServerPromiseBasedCall final : public PromiseBasedCall {
+class ServerPromiseBasedCall final : public PromiseBasedCall,
+ public ServerCallContext {
public:
ServerPromiseBasedCall(Arena* arena, grpc_call_create_args* args);
@@ -3165,7 +3265,9 @@ class ServerPromiseBasedCall final : public PromiseBasedCall {
void CancelWithError(grpc_error_handle) override;
grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
bool is_notify_tag_closure) override;
- bool is_trailers_only() const override { abort(); }
+ bool is_trailers_only() const override {
+ Crash("is_trailers_only not implemented for server calls");
+ }
absl::string_view GetServerAuthority() const override {
const Slice* authority_metadata =
client_initial_metadata_->get_pointer(HttpAuthorityMetadata());
@@ -3198,7 +3300,15 @@ class ServerPromiseBasedCall final : public PromiseBasedCall {
return absl::StrFormat("SERVER_CALL[%p]: ", this);
}
- ServerCallContext* server_call_context() override { return &call_context_; }
+ ServerCallContext* server_call_context() override { return this; }
+
+ const void* server_stream_data() override { return server_transport_data_; }
+ void PublishInitialMetadata(
+ ClientMetadataHandle metadata,
+ grpc_metadata_array* publish_initial_metadata) override;
+ ArenaPromise MakeTopOfServerCallPromise(
+ CallArgs call_args, grpc_completion_queue* cq,
+ absl::FunctionRef publish) override;
private:
class RecvCloseOpCancelState {
@@ -3283,14 +3393,12 @@ class ServerPromiseBasedCall final : public PromiseBasedCall {
std::atomic state_{kUnset};
};
- grpc_call_error ValidateBatch(const grpc_op* ops, size_t nops) const;
void CommitBatch(const grpc_op* ops, size_t nops,
const Completion& completion);
void Finish(ServerMetadataHandle result);
- friend class ServerCallContext;
- ServerCallContext call_context_;
Server* const server_;
+ const void* const server_transport_data_;
PipeSender* server_initial_metadata_ = nullptr;
PipeSender* server_to_client_messages_ = nullptr;
PipeReceiver* client_to_server_messages_ = nullptr;
@@ -3304,8 +3412,8 @@ class ServerPromiseBasedCall final : public PromiseBasedCall {
ServerPromiseBasedCall::ServerPromiseBasedCall(Arena* arena,
grpc_call_create_args* args)
: PromiseBasedCall(arena, 0, *args),
- call_context_(this, args->server_transport_data),
- server_(args->server) {
+ server_(args->server),
+ server_transport_data_(args->server_transport_data) {
global_stats().IncrementServerCallsCreated();
channelz::ServerNode* channelz_node = server_->channelz_node();
if (channelz_node != nullptr) {
@@ -3375,8 +3483,7 @@ void ServerPromiseBasedCall::Finish(ServerMetadataHandle result) {
PropagateCancellationToChildren();
}
-grpc_call_error ServerPromiseBasedCall::ValidateBatch(const grpc_op* ops,
- size_t nops) const {
+grpc_call_error ValidateServerBatch(const grpc_op* ops, size_t nops) {
BitSet<8> got_ops;
for (size_t op_idx = 0; op_idx < nops; op_idx++) {
const grpc_op& op = ops[op_idx];
@@ -3536,7 +3643,7 @@ grpc_call_error ServerPromiseBasedCall::StartBatch(const grpc_op* ops,
EndOpImmediately(cq(), notify_tag, is_notify_tag_closure);
return GRPC_CALL_OK;
}
- const grpc_call_error validation_result = ValidateBatch(ops, nops);
+ const grpc_call_error validation_result = ValidateServerBatch(ops, nops);
if (validation_result != GRPC_CALL_OK) {
return validation_result;
}
@@ -3570,35 +3677,404 @@ void ServerPromiseBasedCall::CancelWithError(absl::Status error) {
#endif
#ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL
+void ServerPromiseBasedCall::PublishInitialMetadata(
+ ClientMetadataHandle metadata,
+ grpc_metadata_array* publish_initial_metadata) {
+ if (grpc_call_trace.enabled()) {
+ gpr_log(GPR_INFO, "%s[call] PublishInitialMetadata: %s", DebugTag().c_str(),
+ metadata->DebugString().c_str());
+ }
+ PublishMetadataArray(metadata.get(), publish_initial_metadata, false);
+ client_initial_metadata_ = std::move(metadata);
+}
+
ArenaPromise
-ServerCallContext::MakeTopOfServerCallPromise(
+ServerPromiseBasedCall::MakeTopOfServerCallPromise(
CallArgs call_args, grpc_completion_queue* cq,
- grpc_metadata_array* publish_initial_metadata,
absl::FunctionRef publish) {
- call_->SetCompletionQueue(cq);
+ SetCompletionQueue(cq);
call_args.polling_entity->Set(
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)));
- call_->server_to_client_messages_ = call_args.server_to_client_messages;
- call_->client_to_server_messages_ = call_args.client_to_server_messages;
- call_->server_initial_metadata_ = call_args.server_initial_metadata;
- call_->client_initial_metadata_ =
- std::move(call_args.client_initial_metadata);
- call_->set_send_deadline(call_->deadline());
- call_->ProcessIncomingInitialMetadata(*call_->client_initial_metadata_);
- PublishMetadataArray(call_->client_initial_metadata_.get(),
- publish_initial_metadata, false);
- call_->ExternalRef();
- publish(call_->c_ptr());
- return Seq(call_->server_to_client_messages_->AwaitClosed(),
- call_->send_trailing_metadata_.Wait());
+ server_to_client_messages_ = call_args.server_to_client_messages;
+ client_to_server_messages_ = call_args.client_to_server_messages;
+ server_initial_metadata_ = call_args.server_initial_metadata;
+ set_send_deadline(deadline());
+ ProcessIncomingInitialMetadata(*client_initial_metadata_);
+ ExternalRef();
+ publish(c_ptr());
+ return Seq(server_to_client_messages_->AwaitClosed(),
+ send_trailing_metadata_.Wait());
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// CallSpine based Server Call
+
+class ServerCallSpine final : public CallSpineInterface,
+ public ServerCallContext,
+ public BasicPromiseBasedCall {
+ public:
+ ServerCallSpine(Server* server, Channel* channel, Arena* arena);
+
+ // CallSpineInterface
+ Pipe& client_initial_metadata() override {
+ return client_initial_metadata_;
+ }
+ Pipe& server_initial_metadata() override {
+ return server_initial_metadata_;
+ }
+ Pipe& client_to_server_messages() override {
+ return client_to_server_messages_;
+ }
+ Pipe& server_to_client_messages() override {
+ return server_to_client_messages_;
+ }
+ Pipe& server_trailing_metadata() override {
+ return server_trailing_metadata_;
+ }
+ Latch& cancel_latch() override { return cancel_latch_; }
+ Party& party() override { return *this; }
+ void IncrementRefCount() override { InternalRef("CallSpine"); }
+ void Unref() override { InternalUnref("CallSpine"); }
+
+ // PromiseBasedCall
+ void OrphanCall() override {}
+ void CancelWithError(grpc_error_handle error) override {
+ SpawnInfallible("CancelWithError", [this, error = std::move(error)] {
+ std::ignore = Cancel(ServerMetadataFromStatus(error));
+ return Empty{};
+ });
+ }
+ bool is_trailers_only() const override {
+ Crash("is_trailers_only not implemented for server calls");
+ }
+ absl::string_view GetServerAuthority() const override {
+ Crash("unimplemented");
+ }
+ grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
+ bool is_notify_tag_closure) override;
+
+ bool Completed() final { Crash("unimplemented"); }
+ bool failed_before_recv_message() const final { Crash("unimplemented"); }
+
+ ServerCallContext* server_call_context() override { return this; }
+ const void* server_stream_data() override { Crash("unimplemented"); }
+ void PublishInitialMetadata(
+ ClientMetadataHandle metadata,
+ grpc_metadata_array* publish_initial_metadata) override;
+ ArenaPromise MakeTopOfServerCallPromise(
+ CallArgs, grpc_completion_queue*,
+ absl::FunctionRef) override {
+ Crash("unimplemented");
+ }
+
+ bool RunParty() override {
+ ScopedContext ctx(this);
+ return Party::RunParty();
+ }
+
+ private:
+ void CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
+ bool is_notify_tag_closure);
+ StatusFlag FinishRecvMessage(NextResult result);
+
+ std::string DebugTag() const override {
+ return absl::StrFormat("SERVER_CALL_SPINE[%p]: ", this);
+ }
+
+ // Initial metadata from client to server
+ Pipe client_initial_metadata_;
+ // Initial metadata from server to client
+ Pipe server_initial_metadata_;
+ // Messages travelling from the application to the transport.
+ Pipe client_to_server_messages_;
+ // Messages travelling from the transport to the application.
+ Pipe server_to_client_messages_;
+ // Trailing metadata from server to client
+ Pipe server_trailing_metadata_;
+ // Latch that can be set to terminate the call
+ Latch cancel_latch_;
+ grpc_byte_buffer** recv_message_ = nullptr;
+ ClientMetadataHandle client_initial_metadata_stored_;
+};
+
+ServerCallSpine::ServerCallSpine(Server* server, Channel* channel, Arena* arena)
+ : BasicPromiseBasedCall(
+ arena, 1, [channel, server]() -> grpc_call_create_args {
+ grpc_call_create_args args;
+ args.channel = channel->Ref();
+ args.server = server;
+ args.parent = nullptr;
+ args.propagation_mask = 0;
+ args.cq = nullptr;
+ args.pollset_set_alternative = nullptr;
+ args.server_transport_data = &args; // Arbitrary non-null pointer
+ args.send_deadline = Timestamp::InfFuture();
+ return args;
+ }()) {
+ global_stats().IncrementServerCallsCreated();
+ channel->channel_stack()->InitServerCallSpine(this);
+}
+
+void ServerCallSpine::PublishInitialMetadata(
+ ClientMetadataHandle metadata,
+ grpc_metadata_array* publish_initial_metadata) {
+ if (grpc_call_trace.enabled()) {
+ gpr_log(GPR_INFO, "%s[call] PublishInitialMetadata: %s", DebugTag().c_str(),
+ metadata->DebugString().c_str());
+ }
+ PublishMetadataArray(metadata.get(), publish_initial_metadata, false);
+ client_initial_metadata_stored_ = std::move(metadata);
+}
+
+grpc_call_error ServerCallSpine::StartBatch(const grpc_op* ops, size_t nops,
+ void* notify_tag,
+ bool is_notify_tag_closure) {
+ if (nops == 0) {
+ EndOpImmediately(cq(), notify_tag, is_notify_tag_closure);
+ return GRPC_CALL_OK;
+ }
+ const grpc_call_error validation_result = ValidateServerBatch(ops, nops);
+ if (validation_result != GRPC_CALL_OK) {
+ return validation_result;
+ }
+ CommitBatch(ops, nops, notify_tag, is_notify_tag_closure);
+ return GRPC_CALL_OK;
+}
+
+namespace {
+template
+class MaybeOpImpl {
+ public:
+ using SetupResult = decltype(std::declval()(grpc_op()));
+ using PromiseFactory = promise_detail::OncePromiseFactory;
+ using Promise = typename PromiseFactory::Promise;
+ struct Dismissed {};
+ using State = absl::variant;
+
+ MaybeOpImpl() : state_(Dismissed{}) {}
+ explicit MaybeOpImpl(SetupResult result)
+ : state_(PromiseFactory(std::move(result))) {}
+
+ MaybeOpImpl(const MaybeOpImpl&) = delete;
+ MaybeOpImpl& operator=(const MaybeOpImpl&) = delete;
+ MaybeOpImpl(MaybeOpImpl&& other) noexcept : state_(MoveState(other.state_)) {}
+ MaybeOpImpl& operator=(MaybeOpImpl&& other) noexcept {
+ if (absl::holds_alternative(state_)) {
+ state_.template emplace();
+ return *this;
+ }
+ // Can't move after first poll => Promise is not an option
+ state_.template emplace(
+ std::move(absl::get(other.state_)));
+ return *this;
+ }
+
+ Poll operator()() {
+ if (absl::holds_alternative(state_)) return Success{};
+ if (absl::holds_alternative(state_)) {
+ auto& factory = absl::get(state_);
+ auto promise = factory.Make();
+ state_.template emplace(std::move(promise));
+ }
+ auto& promise = absl::get(state_);
+ return poll_cast(promise());
+ }
+
+ private:
+ State state_;
+
+ static State MoveState(State& state) {
+ if (absl::holds_alternative(state)) return Dismissed{};
+ // Can't move after first poll => Promise is not an option
+ return std::move(absl::get(state));
+ }
+};
+
+// MaybeOp captures a fairly complicated dance we need to do for the batch API.
+// We first check if an op is included or not, and if it is, we run the setup
+// function in the context of the API call (NOT in the call party).
+// This setup function returns a promise factory which we'll then run *in* the
+// party to do initial setup, and have it return the promise that we'll
+// ultimately poll on til completion.
+// Once we express our surface API in terms of core internal types this whole
+// dance will go away.
+template
+auto MaybeOp(const grpc_op* ops, uint8_t idx, SetupFn setup) {
+ if (idx == 255) {
+ return MaybeOpImpl();
+ } else {
+ return MaybeOpImpl(setup(ops[idx]));
+ }
+}
+} // namespace
+
+StatusFlag ServerCallSpine::FinishRecvMessage(
+ NextResult result) {
+ if (result.has_value()) {
+ MessageHandle& message = *result;
+ NoteLastMessageFlags(message->flags());
+ if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ (incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) {
+ *recv_message_ = grpc_raw_compressed_byte_buffer_create(
+ nullptr, 0, incoming_compression_algorithm());
+ } else {
+ *recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0);
+ }
+ grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(),
+ &(*recv_message_)->data.raw.slice_buffer);
+ if (grpc_call_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "%s[call] RecvMessage: outstanding_recv "
+ "finishes: received %" PRIdPTR " byte message",
+ DebugTag().c_str(),
+ (*recv_message_)->data.raw.slice_buffer.length);
+ }
+ return Success{};
+ }
+ if (result.cancelled()) {
+ if (grpc_call_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "%s[call] RecvMessage: outstanding_recv "
+ "finishes: received end-of-stream with error",
+ DebugTag().c_str());
+ }
+ *recv_message_ = nullptr;
+ return Failure{};
+ }
+ if (grpc_call_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "%s[call] RecvMessage: outstanding_recv "
+ "finishes: received end-of-stream",
+ DebugTag().c_str());
+ }
+ *recv_message_ = nullptr;
+ return Success{};
+}
+
+void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops,
+ void* notify_tag,
+ bool is_notify_tag_closure) {
+ std::array got_ops{255, 255, 255, 255, 255, 255, 255, 255};
+ for (size_t op_idx = 0; op_idx < nops; op_idx++) {
+ const grpc_op& op = ops[op_idx];
+ got_ops[op.op] = op_idx;
+ }
+ if (!is_notify_tag_closure) grpc_cq_begin_op(cq(), notify_tag);
+ auto send_initial_metadata = MaybeOp(
+ ops, got_ops[GRPC_OP_SEND_INITIAL_METADATA], [this](const grpc_op& op) {
+ auto metadata = arena()->MakePooled(arena());
+ PrepareOutgoingInitialMetadata(op, *metadata);
+ CToMetadata(op.data.send_initial_metadata.metadata,
+ op.data.send_initial_metadata.count, metadata.get());
+ if (grpc_call_trace.enabled()) {
+ gpr_log(GPR_INFO, "%s[call] Send initial metadata",
+ DebugTag().c_str());
+ }
+ return [this, metadata = std::move(metadata)]() mutable {
+ return Map(server_initial_metadata_.sender.Push(std::move(metadata)),
+ [this](bool r) {
+ server_initial_metadata_.sender.Close();
+ return StatusFlag(r);
+ });
+ };
+ });
+ auto send_message =
+ MaybeOp(ops, got_ops[GRPC_OP_SEND_MESSAGE], [this](const grpc_op& op) {
+ SliceBuffer send;
+ grpc_slice_buffer_swap(
+ &op.data.send_message.send_message->data.raw.slice_buffer,
+ send.c_slice_buffer());
+ auto msg = arena()->MakePooled(std::move(send), op.flags);
+ return [this, msg = std::move(msg)]() mutable {
+ return Map(server_to_client_messages_.sender.Push(std::move(msg)),
+ [](bool r) { return StatusFlag(r); });
+ };
+ });
+ auto send_trailing_metadata = MaybeOp(
+ ops, got_ops[GRPC_OP_SEND_STATUS_FROM_SERVER], [this](const grpc_op& op) {
+ auto metadata = arena()->MakePooled(arena());
+ CToMetadata(op.data.send_status_from_server.trailing_metadata,
+ op.data.send_status_from_server.trailing_metadata_count,
+ metadata.get());
+ metadata->Set(GrpcStatusMetadata(),
+ op.data.send_status_from_server.status);
+ if (auto* details = op.data.send_status_from_server.status_details) {
+ // TODO(ctiller): this should not be a copy, but we have
+ // callers that allocate and pass in a slice created with
+ // grpc_slice_from_static_string and then delete the string
+ // after passing it in, which shouldn't be a supported API.
+ metadata->Set(GrpcMessageMetadata(),
+ Slice(grpc_slice_copy(*details)));
+ }
+ return [this, metadata = std::move(metadata)]() mutable {
+ server_to_client_messages_.sender.Close();
+ return Map(server_trailing_metadata_.sender.Push(std::move(metadata)),
+ [](bool r) { return StatusFlag(r); });
+ };
+ });
+ auto recv_message =
+ MaybeOp(ops, got_ops[GRPC_OP_RECV_MESSAGE], [this](const grpc_op& op) {
+ GPR_ASSERT(recv_message_ == nullptr);
+ recv_message_ = op.data.recv_message.recv_message;
+ return [this]() mutable {
+ return Map(client_to_server_messages_.receiver.Next(),
+ [this](NextResult msg) {
+ return FinishRecvMessage(std::move(msg));
+ });
+ };
+ });
+ auto primary_ops = AllOk(
+ std::move(send_initial_metadata), std::move(send_message),
+ std::move(send_trailing_metadata), std::move(recv_message));
+ if (got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER] != 255) {
+ auto recv_trailing_metadata = MaybeOp(
+ ops, got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER], [this](const grpc_op& op) {
+ return [this, cancelled = op.data.recv_close_on_server.cancelled]() {
+ return Map(server_trailing_metadata_.receiver.AwaitClosed(),
+ [cancelled](bool result) -> Success {
+ *cancelled = result ? 1 : 0;
+ Crash("return metadata here");
+ return Success{};
+ });
+ };
+ });
+ SpawnInfallible(
+ "final-batch",
+ [primary_ops = std::move(primary_ops),
+ recv_trailing_metadata = std::move(recv_trailing_metadata),
+ is_notify_tag_closure, notify_tag, this]() mutable {
+ return Seq(std::move(primary_ops), std::move(recv_trailing_metadata),
+ [is_notify_tag_closure, notify_tag, this](StatusFlag) {
+ return WaitForCqEndOp(is_notify_tag_closure, notify_tag,
+ absl::OkStatus(), cq());
+ });
+ });
+ } else {
+ SpawnInfallible(
+ "batch", [primary_ops = std::move(primary_ops), is_notify_tag_closure,
+ notify_tag, this]() mutable {
+ return Seq(std::move(primary_ops), [is_notify_tag_closure, notify_tag,
+ this](StatusFlag r) {
+ return WaitForCqEndOp(is_notify_tag_closure, notify_tag,
+ StatusCast(r), cq());
+ });
+ });
+ }
+}
+
+RefCountedPtr MakeServerCall(Server* server,
+ Channel* channel) {
+ const auto initial_size = channel->CallSizeEstimate();
+ global_stats().IncrementCallInitialSize(initial_size);
+ auto alloc = Arena::CreateWithAlloc(initial_size, sizeof(ServerCallSpine),
+ channel->allocator());
+ auto* call = new (alloc.second) ServerCallSpine(server, channel, alloc.first);
+ return RefCountedPtr(call);
}
#else
-ArenaPromise
-ServerCallContext::MakeTopOfServerCallPromise(
- CallArgs, grpc_completion_queue*, grpc_metadata_array*,
- absl::FunctionRef) {
- (void)call_;
- Crash("Promise-based server call is not enabled");
+RefCountedPtr MakeServerCall(Server* server,
+ Channel* channel) {
+ Crash("not implemented");
}
#endif
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index 958cb2f36e9..6653bb6a0dd 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -77,34 +77,35 @@ typedef struct grpc_call_create_args {
} grpc_call_create_args;
namespace grpc_core {
-class PromiseBasedCall;
+class BasicPromiseBasedCall;
class ServerPromiseBasedCall;
class ServerCallContext {
public:
- ServerCallContext(ServerPromiseBasedCall* call,
- const void* server_stream_data)
- : call_(call), server_stream_data_(server_stream_data) {}
- ArenaPromise MakeTopOfServerCallPromise(
+ virtual void PublishInitialMetadata(
+ ClientMetadataHandle metadata,
+ grpc_metadata_array* publish_initial_metadata) = 0;
+
+ // Construct the top of the server call promise for the v2 filter stack.
+ // TODO(ctiller): delete when v3 is available.
+ virtual ArenaPromise MakeTopOfServerCallPromise(
CallArgs call_args, grpc_completion_queue* cq,
- grpc_metadata_array* publish_initial_metadata,
- absl::FunctionRef publish);
+ absl::FunctionRef publish) = 0;
// Server stream data as supplied by the transport (so we can link the
// transport stream up with the call again).
// TODO(ctiller): legacy API - once we move transports to promises we'll
// create the promise directly and not need to pass around this token.
- const void* server_stream_data() { return server_stream_data_; }
+ virtual const void* server_stream_data() = 0;
- private:
- ServerPromiseBasedCall* const call_;
- const void* const server_stream_data_;
+ protected:
+ ~ServerCallContext() = default;
};
// TODO(ctiller): move more call things into this type
class CallContext {
public:
- explicit CallContext(PromiseBasedCall* call) : call_(call) {}
+ explicit CallContext(BasicPromiseBasedCall* call) : call_(call) {}
// Update the deadline (if deadline < the current deadline).
void UpdateDeadline(Timestamp deadline);
@@ -135,13 +136,21 @@ class CallContext {
void set_traced(bool traced) { traced_ = traced; }
bool traced() const { return traced_; }
+ // TEMPORARY HACK
+ // Create a call spine object for this call.
+ // Said object should only be created once.
+ // Allows interop between the v2 call stack and the v3 (which is required by
+ // transports).
+ RefCountedPtr MakeCallSpine(CallArgs call_args);
+ grpc_call* c_call();
+
private:
friend class PromiseBasedCall;
// Call final info.
grpc_call_stats call_stats_;
// TODO(ctiller): remove this once transport APIs are promise based and we
// don't need refcounting here.
- PromiseBasedCall* const call_;
+ BasicPromiseBasedCall* const call_;
gpr_cycle_counter start_time_ = gpr_get_cycle_counter();
// Is this call traced?
bool traced_ = false;
@@ -150,6 +159,9 @@ class CallContext {
template <>
struct ContextType {};
+RefCountedPtr MakeServerCall(Server* server,
+ Channel* channel);
+
} // namespace grpc_core
// Create a new call based on \a args.
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index 3406881f7fd..249dca33497 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -67,6 +67,7 @@
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/seq.h"
+#include "src/core/lib/promise/try_join.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -75,6 +76,7 @@
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/surface/wait_for_cq_end_op.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/error_utils.h"
@@ -82,57 +84,6 @@ namespace grpc_core {
TraceFlag grpc_server_channel_trace(false, "server_channel");
-//
-// Server::RequestedCall
-//
-
-struct Server::RequestedCall {
- enum class Type { BATCH_CALL, REGISTERED_CALL };
-
- RequestedCall(void* tag_arg, grpc_completion_queue* call_cq,
- grpc_call** call_arg, grpc_metadata_array* initial_md,
- grpc_call_details* details)
- : type(Type::BATCH_CALL),
- tag(tag_arg),
- cq_bound_to_call(call_cq),
- call(call_arg),
- initial_metadata(initial_md) {
- data.batch.details = details;
- }
-
- RequestedCall(void* tag_arg, grpc_completion_queue* call_cq,
- grpc_call** call_arg, grpc_metadata_array* initial_md,
- RegisteredMethod* rm, gpr_timespec* deadline,
- grpc_byte_buffer** optional_payload)
- : type(Type::REGISTERED_CALL),
- tag(tag_arg),
- cq_bound_to_call(call_cq),
- call(call_arg),
- initial_metadata(initial_md) {
- data.registered.method = rm;
- data.registered.deadline = deadline;
- data.registered.optional_payload = optional_payload;
- }
-
- MultiProducerSingleConsumerQueue::Node mpscq_node;
- const Type type;
- void* const tag;
- grpc_completion_queue* const cq_bound_to_call;
- grpc_call** const call;
- grpc_cq_completion completion;
- grpc_metadata_array* const initial_metadata;
- union {
- struct {
- grpc_call_details* details;
- } batch;
- struct {
- RegisteredMethod* method;
- gpr_timespec* deadline;
- grpc_byte_buffer** optional_payload;
- } registered;
- } data;
-};
-
//
// Server::RegisteredMethod
//
@@ -248,6 +199,87 @@ class Server::RequestMatcherInterface {
virtual Server* server() const = 0;
};
+//
+// Server::RequestedCall
+//
+
+struct Server::RequestedCall {
+ enum class Type { BATCH_CALL, REGISTERED_CALL };
+
+ RequestedCall(void* tag_arg, grpc_completion_queue* call_cq,
+ grpc_call** call_arg, grpc_metadata_array* initial_md,
+ grpc_call_details* details)
+ : type(Type::BATCH_CALL),
+ tag(tag_arg),
+ cq_bound_to_call(call_cq),
+ call(call_arg),
+ initial_metadata(initial_md) {
+ data.batch.details = details;
+ }
+
+ RequestedCall(void* tag_arg, grpc_completion_queue* call_cq,
+ grpc_call** call_arg, grpc_metadata_array* initial_md,
+ RegisteredMethod* rm, gpr_timespec* deadline,
+ grpc_byte_buffer** optional_payload)
+ : type(Type::REGISTERED_CALL),
+ tag(tag_arg),
+ cq_bound_to_call(call_cq),
+ call(call_arg),
+ initial_metadata(initial_md) {
+ data.registered.method = rm;
+ data.registered.deadline = deadline;
+ data.registered.optional_payload = optional_payload;
+ }
+
+ void Complete(NextResult payload, ClientMetadata& md) {
+ Timestamp deadline = GetContext()->deadline();
+ switch (type) {
+ case RequestedCall::Type::BATCH_CALL:
+ GPR_ASSERT(!payload.has_value());
+ data.batch.details->host =
+ CSliceRef(md.get_pointer(HttpAuthorityMetadata())->c_slice());
+ data.batch.details->method =
+ CSliceRef(md.Take(HttpPathMetadata())->c_slice());
+ data.batch.details->deadline =
+ deadline.as_timespec(GPR_CLOCK_MONOTONIC);
+ break;
+ case RequestedCall::Type::REGISTERED_CALL:
+ md.Remove(HttpPathMetadata());
+ *data.registered.deadline = deadline.as_timespec(GPR_CLOCK_MONOTONIC);
+ if (data.registered.optional_payload != nullptr) {
+ if (payload.has_value()) {
+ auto* sb = payload.value()->payload()->c_slice_buffer();
+ *data.registered.optional_payload =
+ grpc_raw_byte_buffer_create(sb->slices, sb->count);
+ } else {
+ *data.registered.optional_payload = nullptr;
+ }
+ }
+ break;
+ default:
+ GPR_UNREACHABLE_CODE(abort());
+ }
+ }
+
+ MultiProducerSingleConsumerQueue::Node mpscq_node;
+ const Type type;
+ void* const tag;
+ grpc_completion_queue* const cq_bound_to_call;
+ grpc_call** const call;
+ grpc_cq_completion completion;
+ grpc_metadata_array* const initial_metadata;
+ union {
+ struct {
+ grpc_call_details* details;
+ } batch;
+ struct {
+ RegisteredMethod* method;
+ gpr_timespec* deadline;
+ grpc_byte_buffer** optional_payload;
+ } registered;
+ } data;
+};
+
// The RealRequestMatcher is an implementation of RequestMatcherInterface that
// actually uses all the features of RequestMatcherInterface: expecting the
// application to explicitly request RPCs and then matching those to incoming
@@ -757,7 +789,9 @@ class ChannelBroadcaster {
const grpc_channel_filter Server::kServerTopFilter = {
Server::CallData::StartTransportStreamOpBatch,
Server::ChannelData::MakeCallPromise,
- /* init_call: */ nullptr,
+ [](grpc_channel_element*, CallSpineInterface*) {
+ // TODO(ctiller): remove the server filter when call-v3 is finalized
+ },
grpc_channel_next_op,
sizeof(Server::CallData),
Server::CallData::InitCallElement,
@@ -1275,16 +1309,31 @@ void Server::ChannelData::InitTransport(RefCountedPtr server,
}
// Start accept_stream transport op.
grpc_transport_op* op = grpc_make_transport_op(nullptr);
- op->set_accept_stream = true;
- op->set_accept_stream_fn = AcceptStream;
- if (IsRegisteredMethodLookupInTransportEnabled()) {
- op->set_registered_method_matcher_fn = [](void* arg,
- ClientMetadata* metadata) {
- static_cast(arg)->SetRegisteredMethodOnMetadata(*metadata);
- };
- }
- // op->set_registered_method_matcher_fn = Registered
- op->set_accept_stream_user_data = this;
+ int accept_stream_types = 0;
+ if (transport->filter_stack_transport() != nullptr) {
+ ++accept_stream_types;
+ op->set_accept_stream = true;
+ op->set_accept_stream_fn = AcceptStream;
+ if (IsRegisteredMethodLookupInTransportEnabled()) {
+ op->set_registered_method_matcher_fn = [](void* arg,
+ ClientMetadata* metadata) {
+ static_cast(arg)->SetRegisteredMethodOnMetadata(
+ *metadata);
+ };
+ }
+ op->set_accept_stream_user_data = this;
+ }
+ if (transport->server_transport() != nullptr) {
+ ++accept_stream_types;
+ transport->server_transport()->SetAcceptFunction(
+ [this](ClientMetadata& metadata) {
+ SetRegisteredMethodOnMetadata(metadata);
+ auto call = MakeServerCall(server_.get(), channel_.get());
+ InitCall(call);
+ return CallInitiator(std::move(call));
+ });
+ }
+ GPR_ASSERT(accept_stream_types == 1);
op->start_connectivity_watch = MakeOrphanable(this);
if (server_->ShutdownCalled()) {
op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
@@ -1368,17 +1417,89 @@ auto CancelledDueToServerShutdown() {
}
} // namespace
+void Server::ChannelData::InitCall(RefCountedPtr call) {
+ call->SpawnGuarded("request_matcher", [this, call]() {
+ return TrySeq(
+ // Wait for initial metadata to pass through all filters
+ Map(call->client_initial_metadata().receiver.Next(),
+ [](NextResult md)
+ -> absl::StatusOr {
+ if (!md.has_value()) {
+ return absl::InternalError("Missing metadata");
+ }
+ if (!md.value()->get_pointer(HttpPathMetadata())) {
+ return absl::InternalError("Missing :path header");
+ }
+ if (!md.value()->get_pointer(HttpAuthorityMetadata())) {
+ return absl::InternalError("Missing :authority header");
+ }
+ return std::move(*md);
+ }),
+ // Match request with requested call
+ [this, call](ClientMetadataHandle md) {
+ auto* registered_method = static_cast(
+ md->get(GrpcRegisteredMethod()).value_or(nullptr));
+ RequestMatcherInterface* rm;
+ grpc_server_register_method_payload_handling payload_handling =
+ GRPC_SRM_PAYLOAD_NONE;
+ if (registered_method == nullptr) {
+ rm = server_->unregistered_request_matcher_.get();
+ } else {
+ rm = registered_method->matcher.get();
+ }
+ auto maybe_read_first_message = If(
+ payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
+ [call]() {
+ return call->client_to_server_messages().receiver.Next();
+ },
+ []() -> NextResult {
+ return NextResult();
+ });
+ return TryJoin(
+ Map(std::move(maybe_read_first_message),
+ [](NextResult n) {
+ return ValueOrFailure>{
+ std::move(n)};
+ }),
+ rm->MatchRequest(cq_idx()), [md = std::move(md)]() mutable {
+ return ValueOrFailure(std::move(md));
+ });
+ },
+ // Publish call to cq
+ [](std::tuple,
+ RequestMatcherInterface::MatchResult,
+ ClientMetadataHandle>
+ r) {
+ RequestMatcherInterface::MatchResult& mr = std::get<1>(r);
+ auto md = std::move(std::get<2>(r));
+ auto* rc = mr.TakeCall();
+ rc->Complete(std::move(std::get<0>(r)), *md);
+ auto* call_context = GetContext();
+ *rc->call = call_context->c_call();
+ grpc_call_set_completion_queue(call_context->c_call(),
+ rc->cq_bound_to_call);
+ call_context->server_call_context()->PublishInitialMetadata(
+ std::move(md), rc->initial_metadata);
+ // TODO(ctiller): publish metadata
+ return Map(WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()),
+ [rc = std::unique_ptr(rc)](Empty) {
+ return absl::OkStatus();
+ });
+ });
+ });
+}
+
ArenaPromise Server::ChannelData::MakeCallPromise(
grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory) {
auto* chand = static_cast(elem->channel_data);
auto* server = chand->server_.get();
- absl::optional path =
- call_args.client_initial_metadata->Take(HttpPathMetadata());
if (server->ShutdownCalled()) return CancelledDueToServerShutdown();
auto cleanup_ref =
absl::MakeCleanup([server] { server->ShutdownUnrefOnRequest(); });
if (!server->ShutdownRefOnRequest()) return CancelledDueToServerShutdown();
- if (!path.has_value()) {
+ auto path_ptr =
+ call_args.client_initial_metadata->get_pointer(HttpPathMetadata());
+ if (path_ptr == nullptr) {
return [] {
return ServerMetadataFromStatus(
absl::InternalError("Missing :path header"));
@@ -1392,7 +1513,6 @@ ArenaPromise Server::ChannelData::MakeCallPromise(
absl::InternalError("Missing :authority header"));
};
}
- Timestamp deadline = GetContext()->deadline();
// Find request matcher.
RequestMatcherInterface* matcher;
RegisteredMethod* rm = nullptr;
@@ -1402,7 +1522,7 @@ ArenaPromise Server::ChannelData::MakeCallPromise(
.value_or(nullptr));
} else {
rm = chand->GetRegisteredMethod(host_ptr->as_string_view(),
- path->as_string_view());
+ path_ptr->as_string_view());
}
ArenaPromise>>
maybe_read_first_message([] { return NextResult(); });
@@ -1439,8 +1559,7 @@ ArenaPromise Server::ChannelData::MakeCallPromise(
return std::make_pair(std::move(*mr), std::move(payload));
});
},
- [host_ptr, path = std::move(path), deadline,
- call_args =
+ [call_args =
std::move(call_args)](std::pair>
r) mutable {
@@ -1448,41 +1567,19 @@ ArenaPromise Server::ChannelData::MakeCallPromise(
auto& payload = r.second;
auto* rc = mr.TakeCall();
auto* cq_for_new_request = mr.cq();
- switch (rc->type) {
- case RequestedCall::Type::BATCH_CALL:
- GPR_ASSERT(!payload.has_value());
- rc->data.batch.details->host = CSliceRef(host_ptr->c_slice());
- rc->data.batch.details->method = CSliceRef(path->c_slice());
- rc->data.batch.details->deadline =
- deadline.as_timespec(GPR_CLOCK_MONOTONIC);
- break;
- case RequestedCall::Type::REGISTERED_CALL:
- *rc->data.registered.deadline =
- deadline.as_timespec(GPR_CLOCK_MONOTONIC);
- if (rc->data.registered.optional_payload != nullptr) {
- if (payload.has_value()) {
- auto* sb = payload.value()->payload()->c_slice_buffer();
- *rc->data.registered.optional_payload =
- grpc_raw_byte_buffer_create(sb->slices, sb->count);
- } else {
- *rc->data.registered.optional_payload = nullptr;
- }
- }
- break;
- default:
- GPR_UNREACHABLE_CODE(abort());
- }
- return GetContext()
- ->server_call_context()
- ->MakeTopOfServerCallPromise(
- std::move(call_args), rc->cq_bound_to_call,
- rc->initial_metadata,
- [rc, cq_for_new_request](grpc_call* call) {
- *rc->call = call;
- grpc_cq_end_op(cq_for_new_request, rc->tag, absl::OkStatus(),
- Server::DoneRequestEvent, rc, &rc->completion,
- true);
- });
+ auto* server_call_context =
+ GetContext()->server_call_context();
+ rc->Complete(std::move(payload), *call_args.client_initial_metadata);
+ server_call_context->PublishInitialMetadata(
+ std::move(call_args.client_initial_metadata), rc->initial_metadata);
+ return server_call_context->MakeTopOfServerCallPromise(
+ std::move(call_args), rc->cq_bound_to_call,
+ [rc, cq_for_new_request](grpc_call* call) {
+ *rc->call = call;
+ grpc_cq_end_op(cq_for_new_request, rc->tag, absl::OkStatus(),
+ Server::DoneRequestEvent, rc, &rc->completion,
+ true);
+ });
});
}
diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h
index 226fb9fc132..83d5bc6a8d3 100644
--- a/src/core/lib/surface/server.h
+++ b/src/core/lib/surface/server.h
@@ -239,6 +239,7 @@ class Server : public InternallyRefCounted,
static void DestroyChannelElement(grpc_channel_element* elem);
static ArenaPromise MakeCallPromise(
grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory);
+ void InitCall(RefCountedPtr call);
private:
class ConnectivityWatcher;
diff --git a/src/core/lib/surface/wait_for_cq_end_op.h b/src/core/lib/surface/wait_for_cq_end_op.h
new file mode 100644
index 00000000000..896e629503c
--- /dev/null
+++ b/src/core/lib/surface/wait_for_cq_end_op.h
@@ -0,0 +1,94 @@
+// Copyright 2023 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef GRPC_SRC_CORE_LIB_SURFACE_WAIT_FOR_CQ_END_OP_H
+#define GRPC_SRC_CORE_LIB_SURFACE_WAIT_FOR_CQ_END_OP_H
+
+#include
+
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/promise/activity.h"
+#include "src/core/lib/surface/completion_queue.h"
+
+namespace grpc_core {
+
+// Defines a promise that calls grpc_cq_end_op() (on first poll) and then waits
+// for the callback supplied to grpc_cq_end_op() to be called, before resolving
+// to Empty{}
+class WaitForCqEndOp {
+ public:
+ WaitForCqEndOp(bool is_closure, void* tag, grpc_error_handle error,
+ grpc_completion_queue* cq)
+ : state_{NotStarted{is_closure, tag, std::move(error), cq}} {}
+
+ Poll operator()() {
+ if (auto* n = absl::get_if(&state_)) {
+ if (n->is_closure) {
+ ExecCtx::Run(DEBUG_LOCATION, static_cast(n->tag),
+ std::move(n->error));
+ return Empty{};
+ } else {
+ auto not_started = std::move(*n);
+ auto& started =
+ state_.emplace(Activity::current()->MakeOwningWaker());
+ grpc_cq_end_op(
+ not_started.cq, not_started.tag, std::move(not_started.error),
+ [](void* p, grpc_cq_completion*) {
+ auto started = static_cast(p);
+ started->done.store(true, std::memory_order_release);
+ },
+ &started, &started.completion);
+ }
+ }
+ auto& started = absl::get(state_);
+ if (started.done.load(std::memory_order_acquire)) {
+ return Empty{};
+ } else {
+ return Pending{};
+ }
+ }
+
+ WaitForCqEndOp(const WaitForCqEndOp&) = delete;
+ WaitForCqEndOp& operator=(const WaitForCqEndOp&) = delete;
+ WaitForCqEndOp(WaitForCqEndOp&& other) noexcept
+ : state_(std::move(absl::get(other.state_))) {
+ other.state_.emplace();
+ }
+ WaitForCqEndOp& operator=(WaitForCqEndOp&& other) noexcept {
+ state_ = std::move(absl::get(other.state_));
+ other.state_.emplace();
+ return *this;
+ }
+
+ private:
+ struct NotStarted {
+ bool is_closure;
+ void* tag;
+ grpc_error_handle error;
+ grpc_completion_queue* cq;
+ };
+ struct Started {
+ explicit Started(Waker waker) : waker(std::move(waker)) {}
+ Waker waker;
+ grpc_cq_completion completion;
+ std::atomic done{false};
+ };
+ struct Invalid {};
+ using State = absl::variant;
+ State state_{Invalid{}};
+};
+
+} // namespace grpc_core
+
+#endif // GRPC_SRC_CORE_LIB_SURFACE_WAIT_FOR_CQ_END_OP_H
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 4beb800367f..c9f138c8f09 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -329,6 +329,8 @@ class CallSpineInterface {
class CallSpine final : public CallSpineInterface {
public:
+ CallSpine() { Crash("unimplemented"); }
+
Pipe& client_initial_metadata() override {
return client_initial_metadata_;
}
@@ -366,7 +368,7 @@ class CallSpine final : public CallSpineInterface {
class CallInitiator {
public:
- explicit CallInitiator(RefCountedPtr spine)
+ explicit CallInitiator(RefCountedPtr spine)
: spine_(std::move(spine)) {}
auto PushClientInitialMetadata(ClientMetadataHandle md) {
@@ -427,12 +429,12 @@ class CallInitiator {
}
private:
- const RefCountedPtr spine_;
+ const RefCountedPtr spine_;
};
class CallHandler {
public:
- explicit CallHandler(RefCountedPtr