diff --git a/BUILD b/BUILD
index ec4cc9a4dc1..9f064f92db9 100644
--- a/BUILD
+++ b/BUILD
@@ -3470,6 +3470,7 @@ grpc_cc_library(
"//src/core:percent_encoding",
"//src/core:pipe",
"//src/core:poll",
+ "//src/core:prioritized_race",
"//src/core:race",
"//src/core:slice",
"//src/core:slice_buffer",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2e3a0a3f476..a613d7f5af7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1115,6 +1115,7 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx posix_event_engine_test)
endif()
+ add_dependencies(buildtests_cxx prioritized_race_test)
add_dependencies(buildtests_cxx promise_factory_test)
add_dependencies(buildtests_cxx promise_map_test)
add_dependencies(buildtests_cxx promise_test)
@@ -16703,6 +16704,43 @@ endif()
endif()
if(gRPC_BUILD_TESTS)
+add_executable(prioritized_race_test
+ test/core/promise/prioritized_race_test.cc
+ third_party/googletest/googletest/src/gtest-all.cc
+ third_party/googletest/googlemock/src/gmock-all.cc
+)
+target_compile_features(prioritized_race_test PUBLIC cxx_std_14)
+target_include_directories(prioritized_race_test
+ PRIVATE
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_SOURCE_DIR}/include
+ ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+ ${_gRPC_RE2_INCLUDE_DIR}
+ ${_gRPC_SSL_INCLUDE_DIR}
+ ${_gRPC_UPB_GENERATED_DIR}
+ ${_gRPC_UPB_GRPC_GENERATED_DIR}
+ ${_gRPC_UPB_INCLUDE_DIR}
+ ${_gRPC_XXHASH_INCLUDE_DIR}
+ ${_gRPC_ZLIB_INCLUDE_DIR}
+ third_party/googletest/googletest/include
+ third_party/googletest/googletest
+ third_party/googletest/googlemock/include
+ third_party/googletest/googlemock
+ ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(prioritized_race_test
+ ${_gRPC_BASELIB_LIBRARIES}
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ZLIB_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ gpr
+)
+
+
+endif()
+if(gRPC_BUILD_TESTS)
+
add_executable(promise_factory_test
test/core/promise/promise_factory_test.cc
third_party/googletest/googletest/src/gtest-all.cc
diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl
index c5711abf88a..7199dd9b836 100644
--- a/bazel/experiments.bzl
+++ b/bazel/experiments.bzl
@@ -28,6 +28,9 @@ EXPERIMENTS = {
"promise_based_client_call",
"promise_based_server_call",
],
+ "cpp_end2end_test": [
+ "promise_based_server_call",
+ ],
"endpoint_test": [
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
@@ -51,6 +54,9 @@ EXPERIMENTS = {
"memory_pressure_controller",
"unconstrained_max_quota_buffer_size",
],
+ "xds_end2end_test": [
+ "promise_based_server_call",
+ ],
},
"on": {
"flow_control_test": [
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index f882ae81b9f..908772d963b 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -845,6 +845,7 @@ libs:
- src/core/lib/promise/party.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
+ - src/core/lib/promise/prioritized_race.h
- src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
@@ -2204,6 +2205,7 @@ libs:
- src/core/lib/promise/party.h
- src/core/lib/promise/pipe.h
- src/core/lib/promise/poll.h
+ - src/core/lib/promise/prioritized_race.h
- src/core/lib/promise/promise.h
- src/core/lib/promise/race.h
- src/core/lib/promise/seq.h
@@ -10109,6 +10111,18 @@ targets:
- linux
- posix
- mac
+- name: prioritized_race_test
+ gtest: true
+ build: test
+ language: c++
+ headers:
+ - src/core/lib/promise/poll.h
+ - src/core/lib/promise/prioritized_race.h
+ src:
+ - test/core/promise/prioritized_race_test.cc
+ deps:
+ - gpr
+ uses_polling: false
- name: promise_factory_test
gtest: true
build: test
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 3f6bace55fd..985901968a4 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -940,6 +940,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
+ 'src/core/lib/promise/prioritized_race.h',
'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
@@ -1966,6 +1967,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
+ 'src/core/lib/promise/prioritized_race.h',
'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index a6aef9ffd63..e9a5016206b 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -1522,6 +1522,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
+ 'src/core/lib/promise/prioritized_race.h',
'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
@@ -2685,6 +2686,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/party.h',
'src/core/lib/promise/pipe.h',
'src/core/lib/promise/poll.h',
+ 'src/core/lib/promise/prioritized_race.h',
'src/core/lib/promise/promise.h',
'src/core/lib/promise/race.h',
'src/core/lib/promise/seq.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 5668a2525ff..bb4ef553ede 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -1428,6 +1428,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/party.h )
s.files += %w( src/core/lib/promise/pipe.h )
s.files += %w( src/core/lib/promise/poll.h )
+ s.files += %w( src/core/lib/promise/prioritized_race.h )
s.files += %w( src/core/lib/promise/promise.h )
s.files += %w( src/core/lib/promise/race.h )
s.files += %w( src/core/lib/promise/seq.h )
diff --git a/package.xml b/package.xml
index 79a4700ff39..538d23c92a8 100644
--- a/package.xml
+++ b/package.xml
@@ -1410,6 +1410,7 @@
+
diff --git a/src/core/BUILD b/src/core/BUILD
index 7895a6d417a..db047bc876f 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -573,6 +573,13 @@ grpc_cc_library(
deps = ["//:gpr_platform"],
)
+grpc_cc_library(
+ name = "prioritized_race",
+ language = "c++",
+ public_hdrs = ["lib/promise/prioritized_race.h"],
+ deps = ["//:gpr_platform"],
+)
+
grpc_cc_library(
name = "loop",
external_deps = [
diff --git a/src/core/ext/filters/http/message_compress/compression_filter.cc b/src/core/ext/filters/http/message_compress/compression_filter.cc
index 8392afcbb90..26868c51ef1 100644
--- a/src/core/ext/filters/http/message_compress/compression_filter.cc
+++ b/src/core/ext/filters/http/message_compress/compression_filter.cc
@@ -49,7 +49,7 @@
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
-#include "src/core/lib/promise/race.h"
+#include "src/core/lib/promise/prioritized_race.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/call.h"
@@ -273,8 +273,8 @@ ArenaPromise ClientCompressionFilter::MakeCallPromise(
return std::move(*r);
});
// Run the next filter, and race it with getting an error from decompression.
- return Race(decompress_err->Wait(),
- next_promise_factory(std::move(call_args)));
+ return PrioritizedRace(decompress_err->Wait(),
+ next_promise_factory(std::move(call_args)));
}
ArenaPromise ServerCompressionFilter::MakeCallPromise(
@@ -316,8 +316,8 @@ ArenaPromise ServerCompressionFilter::MakeCallPromise(
return CompressMessage(std::move(message), *compression_algorithm);
});
// Run the next filter, and race it with getting an error from decompression.
- return Race(decompress_err->Wait(),
- next_promise_factory(std::move(call_args)));
+ return PrioritizedRace(decompress_err->Wait(),
+ next_promise_factory(std::move(call_args)));
}
} // namespace grpc_core
diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml
index 586ebb46fb2..129400eb3d2 100644
--- a/src/core/lib/experiments/experiments.yaml
+++ b/src/core/lib/experiments/experiments.yaml
@@ -112,9 +112,7 @@
default: false
expiry: 2023/06/01
owner: ctiller@google.com
- test_tags: ["core_end2end_test"]
- # removing these until I can stabilize tests a little further
- disabled_test_tags: ["cpp_end2end_test", "xds_end2end_test"]
+ test_tags: ["core_end2end_test", "cpp_end2end_test", "xds_end2end_test"]
- name: transport_supplies_client_latency
description: If set, use the transport represented value for client latency in opencensus
default: false
diff --git a/src/core/lib/promise/prioritized_race.h b/src/core/lib/promise/prioritized_race.h
new file mode 100644
index 00000000000..0a6ab57880d
--- /dev/null
+++ b/src/core/lib/promise/prioritized_race.h
@@ -0,0 +1,95 @@
+// Copyright 2023 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef GRPC_SRC_CORE_LIB_PROMISE_PRIORITIZED_RACE_H
+#define GRPC_SRC_CORE_LIB_PROMISE_PRIORITIZED_RACE_H
+
+#include
+
+#include
+#include
+
+namespace grpc_core {
+
+namespace promise_detail {
+
+template
+class TwoPartyPrioritizedRace {
+ public:
+ using Result = decltype(std::declval()());
+
+ explicit TwoPartyPrioritizedRace(A a, B b)
+ : a_(std::move(a)), b_(std::move(b)) {}
+
+ Result operator()() {
+ // Check the priority promise.
+ auto p = a_();
+ if (p.ready()) return p;
+ // Check the other promise.
+ p = b_();
+ if (p.ready()) {
+ // re-poll a to see if it's also completed.
+ auto q = a_();
+ if (q.ready()) {
+ // both are ready, but a is prioritized
+ return q;
+ }
+ }
+ return p;
+ }
+
+ private:
+ A a_;
+ B b_;
+};
+
+template
+class PrioritizedRace;
+
+template
+class PrioritizedRace
+ : public TwoPartyPrioritizedRace> {
+ public:
+ using Result = decltype(std::declval()());
+ explicit PrioritizedRace(Promise promise, Promises... promises)
+ : TwoPartyPrioritizedRace>(
+ std::move(promise),
+ PrioritizedRace(std::move(promises)...)) {}
+};
+
+template
+class PrioritizedRace {
+ public:
+ using Result = decltype(std::declval()());
+ explicit PrioritizedRace(Promise promise) : promise_(std::move(promise)) {}
+ Result operator()() { return promise_(); }
+
+ private:
+ Promise promise_;
+};
+
+} // namespace promise_detail
+
+/// Run all the promises until one is non-pending.
+/// Once there's a non-pending promise, repoll all the promises before that.
+/// Return the result from the lexically first non-pending promise.
+template
+promise_detail::PrioritizedRace PrioritizedRace(
+ Promises... promises) {
+ return promise_detail::PrioritizedRace(std::move(promises)...);
+}
+
+} // namespace grpc_core
+
+#endif // GRPC_SRC_CORE_LIB_PROMISE_PRIORITIZED_RACE_H
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 0ac76543e24..1f689ce03ab 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -2183,7 +2183,7 @@ class PromiseBasedCall : public Call,
void StartRecvMessage(const grpc_op& op, const Completion& completion,
FirstPromise first,
PipeReceiver* receiver,
- Party::BulkSpawner& spawner);
+ bool cancel_on_error, Party::BulkSpawner& spawner);
void StartSendMessage(const grpc_op& op, const Completion& completion,
PipeSender* sender,
Party::BulkSpawner& spawner);
@@ -2538,7 +2538,8 @@ template
void PromiseBasedCall::StartRecvMessage(
const grpc_op& op, const Completion& completion,
FirstPromiseFactory first_promise_factory,
- PipeReceiver* receiver, Party::BulkSpawner& spawner) {
+ PipeReceiver* receiver, bool cancel_on_error,
+ Party::BulkSpawner& spawner) {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%s[call] Start RecvMessage: %s", DebugTag().c_str(),
CompletionString(completion).c_str());
@@ -2549,7 +2550,7 @@ void PromiseBasedCall::StartRecvMessage(
[first_promise_factory = std::move(first_promise_factory), receiver]() {
return Seq(first_promise_factory(), receiver->Next());
},
- [this,
+ [this, cancel_on_error,
completion = AddOpToCompletion(completion, PendingOp::kReceiveMessage)](
NextResult result) mutable {
if (result.has_value()) {
@@ -2580,6 +2581,7 @@ void PromiseBasedCall::StartRecvMessage(
}
failed_before_recv_message_.store(true);
FailCompletion(completion);
+ if (cancel_on_error) CancelWithError(absl::CancelledError());
*recv_message_ = nullptr;
} else {
if (grpc_call_trace.enabled()) {
@@ -2873,7 +2875,7 @@ void ClientPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
[this]() {
return server_initial_metadata_.receiver.AwaitClosed();
},
- &server_to_client_messages_.receiver, spawner);
+ &server_to_client_messages_.receiver, false, spawner);
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
spawner.Spawn(
@@ -3328,7 +3330,7 @@ void ServerPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
}
StartRecvMessage(
op, completion, []() { return []() { return Empty{}; }; },
- client_to_server_messages_, spawner);
+ client_to_server_messages_, true, spawner);
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER: {
auto metadata = arena()->MakePooled(arena());
diff --git a/test/core/promise/BUILD b/test/core/promise/BUILD
index baaade68659..f34b1d68b4c 100644
--- a/test/core/promise/BUILD
+++ b/test/core/promise/BUILD
@@ -155,6 +155,20 @@ grpc_cc_test(
],
)
+grpc_cc_test(
+ name = "prioritized_race_test",
+ srcs = ["prioritized_race_test.cc"],
+ external_deps = ["gtest"],
+ language = "c++",
+ tags = ["promise_test"],
+ uses_event_engine = False,
+ uses_polling = False,
+ deps = [
+ "//src/core:poll",
+ "//src/core:prioritized_race",
+ ],
+)
+
grpc_cc_test(
name = "promise_factory_test",
srcs = ["promise_factory_test.cc"],
diff --git a/test/core/promise/prioritized_race_test.cc b/test/core/promise/prioritized_race_test.cc
new file mode 100644
index 00000000000..e181425c678
--- /dev/null
+++ b/test/core/promise/prioritized_race_test.cc
@@ -0,0 +1,80 @@
+// Copyright 2023 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "src/core/lib/promise/prioritized_race.h"
+
+#include "gtest/gtest.h"
+
+#include "src/core/lib/promise/poll.h"
+
+namespace grpc_core {
+
+Poll instant() { return 1; }
+Poll never() { return Pending(); }
+
+TEST(PrioritizedRaceTest, Race1) {
+ EXPECT_EQ(PrioritizedRace(instant)(), Poll(1));
+}
+
+TEST(PrioritizedRaceTest, Race2A) {
+ EXPECT_EQ(PrioritizedRace(instant, never)(), Poll(1));
+}
+
+TEST(PrioritizedRaceTest, Race2B) {
+ EXPECT_EQ(PrioritizedRace(never, instant)(), Poll(1));
+}
+
+TEST(PrioritizedRaceTest, PrioritizedCompletion2A) {
+ int first_polls = 0;
+ int second_polls = 0;
+ auto r = PrioritizedRace(
+ [&first_polls]() -> Poll {
+ ++first_polls;
+ return 1;
+ },
+ [&second_polls]() {
+ ++second_polls;
+ return 2;
+ })();
+ EXPECT_EQ(r, Poll(1));
+ // First promise completes immediately, so second promise is never polled.
+ EXPECT_EQ(first_polls, 1);
+ EXPECT_EQ(second_polls, 0);
+}
+
+TEST(PrioritizedRaceTest, PrioritizedCompletion2B) {
+ int first_polls = 0;
+ int second_polls = 0;
+ auto r = PrioritizedRace(
+ [&first_polls]() -> Poll {
+ ++first_polls;
+ if (first_polls > 1) return 1;
+ return Pending{};
+ },
+ [&second_polls]() {
+ ++second_polls;
+ return 2;
+ })();
+ EXPECT_EQ(r, Poll(1));
+ // First promise completes after second promise is polled.
+ EXPECT_EQ(first_polls, 2);
+ EXPECT_EQ(second_polls, 1);
+}
+
+} // namespace grpc_core
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 15b4b2ba33d..71cabb0ef0e 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -2424,6 +2424,7 @@ src/core/lib/promise/party.cc \
src/core/lib/promise/party.h \
src/core/lib/promise/pipe.h \
src/core/lib/promise/poll.h \
+src/core/lib/promise/prioritized_race.h \
src/core/lib/promise/promise.h \
src/core/lib/promise/race.h \
src/core/lib/promise/seq.h \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 4c2aa741f56..2d93ddf2e72 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -2205,6 +2205,7 @@ src/core/lib/promise/party.cc \
src/core/lib/promise/party.h \
src/core/lib/promise/pipe.h \
src/core/lib/promise/poll.h \
+src/core/lib/promise/prioritized_race.h \
src/core/lib/promise/promise.h \
src/core/lib/promise/race.h \
src/core/lib/promise/seq.h \
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 932383ca957..74dea7183a4 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -5561,6 +5561,30 @@
],
"uses_polling": true
},
+ {
+ "args": [],
+ "benchmark": false,
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": true,
+ "language": "c++",
+ "name": "prioritized_race_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "uses_polling": false
+ },
{
"args": [],
"benchmark": false,