From 1e4fb21b7cfb5bc1b50cfb445e0ae4dd8a0a68f9 Mon Sep 17 00:00:00 2001 From: Esun Kim Date: Wed, 27 Dec 2023 09:32:51 -0800 Subject: [PATCH 1/5] Fix Bazel 7 build (more) (#35392) It turned out that the previous change missed two things which this PR has - Fix function `_dockerized_genrule` not to have `timeout` and `flaky` which Bazel doesn't have. (Bazel 7 may either drop these arguments or become more strict about passing unrecognized ones) - Disabled python bazel distribution tests with Bazel 7. This needs to be addressed by https://github.com/grpc/grpc/issues/35391. --- tools/bazelify_tests/build_defs.bzl | 6 ++---- tools/bazelify_tests/test/bazel_distribtests.bzl | 3 +++ 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/tools/bazelify_tests/build_defs.bzl b/tools/bazelify_tests/build_defs.bzl index 4a4662a72a5..c8b936e06c5 100644 --- a/tools/bazelify_tests/build_defs.bzl +++ b/tools/bazelify_tests/build_defs.bzl @@ -63,7 +63,7 @@ def _dockerized_sh_test(name, srcs = [], args = [], data = [], size = "medium", **test_args ) -def _dockerized_genrule(name, cmd, outs, srcs = [], timeout = None, tags = [], exec_compatible_with = [], flaky = None, docker_image_version = None, docker_run_as_root = False): +def _dockerized_genrule(name, cmd, outs, srcs = [], tags = [], exec_compatible_with = [], docker_image_version = None, docker_run_as_root = False): """Runs genrule under docker either via RBE or via docker sandbox.""" if docker_image_version: image_spec = DOCKERIMAGE_CURRENT_VERSIONS.get(docker_image_version, None) @@ -94,8 +94,6 @@ def _dockerized_genrule(name, cmd, outs, srcs = [], timeout = None, tags = [], e "cmd": cmd, "srcs": srcs, "tags": tags, - "flaky": flaky, - "timeout": timeout, "exec_compatible_with": exec_compatible_with, "exec_properties": exec_properties, "outs": outs, @@ -292,7 +290,7 @@ def grpc_build_artifact_task(name, timeout = None, artifact_deps = [], tags = [] cmd = cmd + " $(location " + dep_archive_name + ")" genrule_srcs.append(dep_archive_name) - _dockerized_genrule(name = name, cmd = cmd, outs = genrule_outs, srcs = genrule_srcs, timeout = timeout, tags = tags, exec_compatible_with = exec_compatible_with, flaky = flaky, docker_image_version = docker_image_version, docker_run_as_root = False) + _dockerized_genrule(name = name, cmd = cmd, outs = genrule_outs, srcs = genrule_srcs, tags = tags, exec_compatible_with = exec_compatible_with, docker_image_version = docker_image_version, docker_run_as_root = False) # The genrule above always succeeds (even if the underlying build fails), so that we can create rules that depend # on multiple artifact builds (of which some can fail). The actual build status (exitcode) and the log of the build diff --git a/tools/bazelify_tests/test/bazel_distribtests.bzl b/tools/bazelify_tests/test/bazel_distribtests.bzl index 7098fc2e461..3a6639750fd 100644 --- a/tools/bazelify_tests/test/bazel_distribtests.bzl +++ b/tools/bazelify_tests/test/bazel_distribtests.bzl @@ -35,6 +35,9 @@ def generate_bazel_distribtests(name): for bazel_version in SUPPORTED_BAZEL_VERSIONS: for shard_name in _TEST_SHARDS: + # TODO(https://github.com/grpc/grpc/issues/35391): Fix bazel 7 + distribtest_python test + if bazel_version.startswith("7") and shard_name == "distribtest_python": + continue test_name = "bazel_distribtest_%s_%s" % (bazel_version, shard_name) grpc_run_bazel_distribtest_test( name = test_name, From b25a382b602aabbd99439261f88cf05728988207 Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Wed, 27 Dec 2023 10:31:06 -0800 Subject: [PATCH 2/5] [EventEngine] Adjust visibility of event_engine_base_hdrs and //src/core:useful targets PiperOrigin-RevId: 594060115 --- BUILD | 1 + bazel/grpc_build_system.bzl | 2 ++ src/core/BUILD | 1 + 3 files changed, 4 insertions(+) diff --git a/BUILD b/BUILD index 5a3deed5bc3..2d7d69fcb44 100644 --- a/BUILD +++ b/BUILD @@ -1274,6 +1274,7 @@ grpc_cc_library( tags = [ "nofixdeps", ], + visibility = ["@grpc:event_engine_base_hdrs"], deps = [ "channel_arg_names", "gpr", diff --git a/bazel/grpc_build_system.bzl b/bazel/grpc_build_system.bzl index cc0801a4576..f9cee08a186 100644 --- a/bazel/grpc_build_system.bzl +++ b/bazel/grpc_build_system.bzl @@ -120,6 +120,8 @@ def _update_visibility(visibility): "xds": PRIVATE, "xds_client_core": PRIVATE, "grpc_python_observability": PRIVATE, + "event_engine_base_hdrs": PRIVATE, + "useful": PRIVATE, } final_visibility = [] for rule in visibility: diff --git a/src/core/BUILD b/src/core/BUILD index 523fae21830..1c9774b816b 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -155,6 +155,7 @@ grpc_cc_library( "absl/types:variant", ], language = "c++", + visibility = ["@grpc:useful"], deps = ["//:gpr_platform"], ) From f703530b6d45421c8f4c5872b24da970071388c1 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 27 Dec 2023 17:43:27 -0800 Subject: [PATCH 3/5] [call-v3] initial surface call integration (#35312) Adds temporary `call.cc` and `connected_channel.cc` scaffolding to run `CallInterceptor`/`CallHandler` style calls. This will get ripped out as soon as the v3 transition is completed. Closes #35312 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35312 from ctiller:v3-accept ae0bf81f8b1c2b23980fcedde6139a6afa219b97 PiperOrigin-RevId: 594128029 --- BUILD | 3 + CMakeLists.txt | 2 +- Package.swift | 4 + build_autogenerated.yaml | 27 +- gRPC-C++.podspec | 8 + gRPC-Core.podspec | 8 + grpc.gemspec | 4 + package.xml | 4 + src/core/BUILD | 3 + .../chaotic_good/client_transport.cc | 4 +- .../transport/chaotic_good/client_transport.h | 2 +- src/core/lib/channel/channel_stack.cc | 10 + src/core/lib/channel/connected_channel.cc | 81 +- src/core/lib/promise/detail/promise_factory.h | 1 + src/core/lib/promise/latch.h | 2 + src/core/lib/promise/status_flag.h | 49 +- src/core/lib/promise/try_join.h | 45 +- src/core/lib/promise/try_seq.h | 4 +- src/core/lib/surface/call.cc | 862 ++++++++++++++---- src/core/lib/surface/call.h | 38 +- src/core/lib/surface/server.cc | 305 ++++--- src/core/lib/surface/server.h | 1 + src/core/lib/surface/wait_for_cq_end_op.h | 94 ++ src/core/lib/transport/transport.h | 12 +- test/core/promise/status_flag_test.cc | 5 +- test/core/promise/try_join_test.cc | 150 ++- test/core/promise/try_seq_test.cc | 138 ++- tools/doxygen/Doxyfile.c++.internal | 4 + tools/doxygen/Doxyfile.core.internal | 4 + 29 files changed, 1438 insertions(+), 436 deletions(-) create mode 100644 src/core/lib/surface/wait_for_cq_end_op.h diff --git a/BUILD b/BUILD index 2d7d69fcb44..3cbb84fcc1d 100644 --- a/BUILD +++ b/BUILD @@ -1464,6 +1464,7 @@ grpc_cc_library( "//src/core:lib/surface/lame_client.h", "//src/core:lib/surface/server.h", "//src/core:lib/surface/validate_metadata.h", + "//src/core:lib/surface/wait_for_cq_end_op.h", "//src/core:lib/transport/batch_builder.h", "//src/core:lib/transport/connectivity_state.h", "//src/core:lib/transport/custom_metadata.h", @@ -1540,6 +1541,7 @@ grpc_cc_library( "work_serializer", "//src/core:1999", "//src/core:activity", + "//src/core:all_ok", "//src/core:arena", "//src/core:arena_promise", "//src/core:atomic_utils", @@ -1612,6 +1614,7 @@ grpc_cc_library( "//src/core:thread_quota", "//src/core:time", "//src/core:transport_fwd", + "//src/core:try_join", "//src/core:try_seq", "//src/core:type_list", "//src/core:useful", diff --git a/CMakeLists.txt b/CMakeLists.txt index 60817e90943..a35b971bd1b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23456,8 +23456,8 @@ target_include_directories(status_flag_test target_link_libraries(status_flag_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest - absl::status absl::statusor + gpr ) diff --git a/Package.swift b/Package.swift index e2c68c7eefe..7fa98e64126 100644 --- a/Package.swift +++ b/Package.swift @@ -1634,10 +1634,12 @@ let package = Package( "src/core/lib/matchers/matchers.h", "src/core/lib/promise/activity.cc", "src/core/lib/promise/activity.h", + "src/core/lib/promise/all_ok.h", "src/core/lib/promise/arena_promise.h", "src/core/lib/promise/cancel_callback.h", "src/core/lib/promise/context.h", "src/core/lib/promise/detail/basic_seq.h", + "src/core/lib/promise/detail/join_state.h", "src/core/lib/promise/detail/promise_factory.h", "src/core/lib/promise/detail/promise_like.h", "src/core/lib/promise/detail/seq_state.h", @@ -1662,6 +1664,7 @@ let package = Package( "src/core/lib/promise/status_flag.h", "src/core/lib/promise/trace.cc", "src/core/lib/promise/trace.h", + "src/core/lib/promise/try_join.h", "src/core/lib/promise/try_seq.h", "src/core/lib/resolver/endpoint_addresses.cc", "src/core/lib/resolver/endpoint_addresses.h", @@ -1869,6 +1872,7 @@ let package = Package( "src/core/lib/surface/validate_metadata.cc", "src/core/lib/surface/validate_metadata.h", "src/core/lib/surface/version.cc", + "src/core/lib/surface/wait_for_cq_end_op.h", "src/core/lib/transport/batch_builder.cc", "src/core/lib/transport/batch_builder.h", "src/core/lib/transport/bdp_estimator.cc", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 71a87f34adb..2e7cd202f5c 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -1033,10 +1033,12 @@ libs: - src/core/lib/load_balancing/subchannel_interface.h - src/core/lib/matchers/matchers.h - src/core/lib/promise/activity.h + - src/core/lib/promise/all_ok.h - src/core/lib/promise/arena_promise.h - src/core/lib/promise/cancel_callback.h - src/core/lib/promise/context.h - src/core/lib/promise/detail/basic_seq.h + - src/core/lib/promise/detail/join_state.h - src/core/lib/promise/detail/promise_factory.h - src/core/lib/promise/detail/promise_like.h - src/core/lib/promise/detail/seq_state.h @@ -1058,6 +1060,7 @@ libs: - src/core/lib/promise/sleep.h - src/core/lib/promise/status_flag.h - src/core/lib/promise/trace.h + - src/core/lib/promise/try_join.h - src/core/lib/promise/try_seq.h - src/core/lib/resolver/endpoint_addresses.h - src/core/lib/resolver/resolver.h @@ -1156,6 +1159,7 @@ libs: - src/core/lib/surface/lame_client.h - src/core/lib/surface/server.h - src/core/lib/surface/validate_metadata.h + - src/core/lib/surface/wait_for_cq_end_op.h - src/core/lib/transport/batch_builder.h - src/core/lib/transport/bdp_estimator.h - src/core/lib/transport/connectivity_state.h @@ -2490,10 +2494,12 @@ libs: - src/core/lib/load_balancing/lb_policy_registry.h - src/core/lib/load_balancing/subchannel_interface.h - src/core/lib/promise/activity.h + - src/core/lib/promise/all_ok.h - src/core/lib/promise/arena_promise.h - src/core/lib/promise/cancel_callback.h - src/core/lib/promise/context.h - src/core/lib/promise/detail/basic_seq.h + - src/core/lib/promise/detail/join_state.h - src/core/lib/promise/detail/promise_factory.h - src/core/lib/promise/detail/promise_like.h - src/core/lib/promise/detail/seq_state.h @@ -2515,6 +2521,7 @@ libs: - src/core/lib/promise/sleep.h - src/core/lib/promise/status_flag.h - src/core/lib/promise/trace.h + - src/core/lib/promise/try_join.h - src/core/lib/promise/try_seq.h - src/core/lib/resolver/endpoint_addresses.h - src/core/lib/resolver/resolver.h @@ -2582,6 +2589,7 @@ libs: - src/core/lib/surface/lame_client.h - src/core/lib/surface/server.h - src/core/lib/surface/validate_metadata.h + - src/core/lib/surface/wait_for_cq_end_op.h - src/core/lib/transport/batch_builder.h - src/core/lib/transport/bdp_estimator.h - src/core/lib/transport/connectivity_state.h @@ -4624,10 +4632,12 @@ libs: - src/core/lib/load_balancing/subchannel_interface.h - src/core/lib/matchers/matchers.h - src/core/lib/promise/activity.h + - src/core/lib/promise/all_ok.h - src/core/lib/promise/arena_promise.h - src/core/lib/promise/cancel_callback.h - src/core/lib/promise/context.h - src/core/lib/promise/detail/basic_seq.h + - src/core/lib/promise/detail/join_state.h - src/core/lib/promise/detail/promise_factory.h - src/core/lib/promise/detail/promise_like.h - src/core/lib/promise/detail/seq_state.h @@ -4647,6 +4657,7 @@ libs: - src/core/lib/promise/seq.h - src/core/lib/promise/status_flag.h - src/core/lib/promise/trace.h + - src/core/lib/promise/try_join.h - src/core/lib/promise/try_seq.h - src/core/lib/resolver/endpoint_addresses.h - src/core/lib/resolver/resolver.h @@ -4716,6 +4727,7 @@ libs: - src/core/lib/surface/lame_client.h - src/core/lib/surface/server.h - src/core/lib/surface/validate_metadata.h + - src/core/lib/surface/wait_for_cq_end_op.h - src/core/lib/transport/batch_builder.h - src/core/lib/transport/connectivity_state.h - src/core/lib/transport/custom_metadata.h @@ -7638,12 +7650,10 @@ targets: - src/core/ext/transport/chaotic_good/client_transport.h - src/core/ext/transport/chaotic_good/frame.h - src/core/ext/transport/chaotic_good/frame_header.h - - src/core/lib/promise/detail/join_state.h - src/core/lib/promise/event_engine_wakeup_scheduler.h - src/core/lib/promise/inter_activity_pipe.h - src/core/lib/promise/join.h - src/core/lib/promise/mpsc.h - - src/core/lib/promise/try_join.h - src/core/lib/promise/wait_set.h - src/core/lib/transport/promise_endpoint.h - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h @@ -7668,12 +7678,10 @@ targets: - src/core/ext/transport/chaotic_good/client_transport.h - src/core/ext/transport/chaotic_good/frame.h - src/core/ext/transport/chaotic_good/frame_header.h - - src/core/lib/promise/detail/join_state.h - src/core/lib/promise/event_engine_wakeup_scheduler.h - src/core/lib/promise/inter_activity_pipe.h - src/core/lib/promise/join.h - src/core/lib/promise/mpsc.h - - src/core/lib/promise/try_join.h - src/core/lib/promise/wait_set.h - src/core/lib/transport/promise_endpoint.h - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h @@ -12921,7 +12929,6 @@ targets: build: test language: c++ headers: - - src/core/lib/promise/detail/join_state.h - src/core/lib/promise/join.h - test/core/promise/test_wakeup_schedulers.h src: @@ -13122,7 +13129,6 @@ targets: build: test language: c++ headers: - - src/core/lib/promise/detail/join_state.h - src/core/lib/promise/join.h - src/core/lib/transport/promise_endpoint.h - test/core/promise/test_wakeup_schedulers.h @@ -16126,8 +16132,8 @@ targets: - test/core/promise/status_flag_test.cc deps: - gtest - - absl/status:status - absl/status:statusor + - gpr uses_polling: false - name: status_helper_test gtest: true @@ -17004,10 +17010,12 @@ targets: - src/core/lib/load_balancing/lb_policy_registry.h - src/core/lib/load_balancing/subchannel_interface.h - src/core/lib/promise/activity.h + - src/core/lib/promise/all_ok.h - src/core/lib/promise/arena_promise.h - src/core/lib/promise/cancel_callback.h - src/core/lib/promise/context.h - src/core/lib/promise/detail/basic_seq.h + - src/core/lib/promise/detail/join_state.h - src/core/lib/promise/detail/promise_factory.h - src/core/lib/promise/detail/promise_like.h - src/core/lib/promise/detail/seq_state.h @@ -17027,6 +17035,7 @@ targets: - src/core/lib/promise/seq.h - src/core/lib/promise/status_flag.h - src/core/lib/promise/trace.h + - src/core/lib/promise/try_join.h - src/core/lib/promise/try_seq.h - src/core/lib/resolver/endpoint_addresses.h - src/core/lib/resolver/resolver.h @@ -17071,6 +17080,7 @@ targets: - src/core/lib/surface/lame_client.h - src/core/lib/surface/server.h - src/core/lib/surface/validate_metadata.h + - src/core/lib/surface/wait_for_cq_end_op.h - src/core/lib/transport/batch_builder.h - src/core/lib/transport/connectivity_state.h - src/core/lib/transport/custom_metadata.h @@ -18003,8 +18013,10 @@ targets: - src/core/lib/gprpp/bitset.h - src/core/lib/promise/detail/join_state.h - src/core/lib/promise/detail/promise_like.h + - src/core/lib/promise/detail/status.h - src/core/lib/promise/map.h - src/core/lib/promise/poll.h + - src/core/lib/promise/status_flag.h - src/core/lib/promise/trace.h - src/core/lib/promise/try_join.h src: @@ -18041,6 +18053,7 @@ targets: - src/core/lib/promise/detail/seq_state.h - src/core/lib/promise/detail/status.h - src/core/lib/promise/poll.h + - src/core/lib/promise/status_flag.h - src/core/lib/promise/trace.h - src/core/lib/promise/try_seq.h src: diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 5bc1877ac30..83f07e2fe62 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -1128,10 +1128,12 @@ Pod::Spec.new do |s| 'src/core/lib/load_balancing/subchannel_interface.h', 'src/core/lib/matchers/matchers.h', 'src/core/lib/promise/activity.h', + 'src/core/lib/promise/all_ok.h', 'src/core/lib/promise/arena_promise.h', 'src/core/lib/promise/cancel_callback.h', 'src/core/lib/promise/context.h', 'src/core/lib/promise/detail/basic_seq.h', + 'src/core/lib/promise/detail/join_state.h', 'src/core/lib/promise/detail/promise_factory.h', 'src/core/lib/promise/detail/promise_like.h', 'src/core/lib/promise/detail/seq_state.h', @@ -1153,6 +1155,7 @@ Pod::Spec.new do |s| 'src/core/lib/promise/sleep.h', 'src/core/lib/promise/status_flag.h', 'src/core/lib/promise/trace.h', + 'src/core/lib/promise/try_join.h', 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/endpoint_addresses.h', 'src/core/lib/resolver/resolver.h', @@ -1251,6 +1254,7 @@ Pod::Spec.new do |s| 'src/core/lib/surface/lame_client.h', 'src/core/lib/surface/server.h', 'src/core/lib/surface/validate_metadata.h', + 'src/core/lib/surface/wait_for_cq_end_op.h', 'src/core/lib/transport/batch_builder.h', 'src/core/lib/transport/bdp_estimator.h', 'src/core/lib/transport/connectivity_state.h', @@ -2364,10 +2368,12 @@ Pod::Spec.new do |s| 'src/core/lib/load_balancing/subchannel_interface.h', 'src/core/lib/matchers/matchers.h', 'src/core/lib/promise/activity.h', + 'src/core/lib/promise/all_ok.h', 'src/core/lib/promise/arena_promise.h', 'src/core/lib/promise/cancel_callback.h', 'src/core/lib/promise/context.h', 'src/core/lib/promise/detail/basic_seq.h', + 'src/core/lib/promise/detail/join_state.h', 'src/core/lib/promise/detail/promise_factory.h', 'src/core/lib/promise/detail/promise_like.h', 'src/core/lib/promise/detail/seq_state.h', @@ -2389,6 +2395,7 @@ Pod::Spec.new do |s| 'src/core/lib/promise/sleep.h', 'src/core/lib/promise/status_flag.h', 'src/core/lib/promise/trace.h', + 'src/core/lib/promise/try_join.h', 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/endpoint_addresses.h', 'src/core/lib/resolver/resolver.h', @@ -2487,6 +2494,7 @@ Pod::Spec.new do |s| 'src/core/lib/surface/lame_client.h', 'src/core/lib/surface/server.h', 'src/core/lib/surface/validate_metadata.h', + 'src/core/lib/surface/wait_for_cq_end_op.h', 'src/core/lib/transport/batch_builder.h', 'src/core/lib/transport/bdp_estimator.h', 'src/core/lib/transport/connectivity_state.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 728112f7bb5..64a415514b5 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1737,10 +1737,12 @@ Pod::Spec.new do |s| 'src/core/lib/matchers/matchers.h', 'src/core/lib/promise/activity.cc', 'src/core/lib/promise/activity.h', + 'src/core/lib/promise/all_ok.h', 'src/core/lib/promise/arena_promise.h', 'src/core/lib/promise/cancel_callback.h', 'src/core/lib/promise/context.h', 'src/core/lib/promise/detail/basic_seq.h', + 'src/core/lib/promise/detail/join_state.h', 'src/core/lib/promise/detail/promise_factory.h', 'src/core/lib/promise/detail/promise_like.h', 'src/core/lib/promise/detail/seq_state.h', @@ -1765,6 +1767,7 @@ Pod::Spec.new do |s| 'src/core/lib/promise/status_flag.h', 'src/core/lib/promise/trace.cc', 'src/core/lib/promise/trace.h', + 'src/core/lib/promise/try_join.h', 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/endpoint_addresses.cc', 'src/core/lib/resolver/endpoint_addresses.h', @@ -1968,6 +1971,7 @@ Pod::Spec.new do |s| 'src/core/lib/surface/validate_metadata.cc', 'src/core/lib/surface/validate_metadata.h', 'src/core/lib/surface/version.cc', + 'src/core/lib/surface/wait_for_cq_end_op.h', 'src/core/lib/transport/batch_builder.cc', 'src/core/lib/transport/batch_builder.h', 'src/core/lib/transport/bdp_estimator.cc', @@ -3130,10 +3134,12 @@ Pod::Spec.new do |s| 'src/core/lib/load_balancing/subchannel_interface.h', 'src/core/lib/matchers/matchers.h', 'src/core/lib/promise/activity.h', + 'src/core/lib/promise/all_ok.h', 'src/core/lib/promise/arena_promise.h', 'src/core/lib/promise/cancel_callback.h', 'src/core/lib/promise/context.h', 'src/core/lib/promise/detail/basic_seq.h', + 'src/core/lib/promise/detail/join_state.h', 'src/core/lib/promise/detail/promise_factory.h', 'src/core/lib/promise/detail/promise_like.h', 'src/core/lib/promise/detail/seq_state.h', @@ -3155,6 +3161,7 @@ Pod::Spec.new do |s| 'src/core/lib/promise/sleep.h', 'src/core/lib/promise/status_flag.h', 'src/core/lib/promise/trace.h', + 'src/core/lib/promise/try_join.h', 'src/core/lib/promise/try_seq.h', 'src/core/lib/resolver/endpoint_addresses.h', 'src/core/lib/resolver/resolver.h', @@ -3253,6 +3260,7 @@ Pod::Spec.new do |s| 'src/core/lib/surface/lame_client.h', 'src/core/lib/surface/server.h', 'src/core/lib/surface/validate_metadata.h', + 'src/core/lib/surface/wait_for_cq_end_op.h', 'src/core/lib/transport/batch_builder.h', 'src/core/lib/transport/bdp_estimator.h', 'src/core/lib/transport/connectivity_state.h', diff --git a/grpc.gemspec b/grpc.gemspec index 67b0773a721..82993ca7ac9 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1640,10 +1640,12 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/matchers/matchers.h ) s.files += %w( src/core/lib/promise/activity.cc ) s.files += %w( src/core/lib/promise/activity.h ) + s.files += %w( src/core/lib/promise/all_ok.h ) s.files += %w( src/core/lib/promise/arena_promise.h ) s.files += %w( src/core/lib/promise/cancel_callback.h ) s.files += %w( src/core/lib/promise/context.h ) s.files += %w( src/core/lib/promise/detail/basic_seq.h ) + s.files += %w( src/core/lib/promise/detail/join_state.h ) s.files += %w( src/core/lib/promise/detail/promise_factory.h ) s.files += %w( src/core/lib/promise/detail/promise_like.h ) s.files += %w( src/core/lib/promise/detail/seq_state.h ) @@ -1668,6 +1670,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/promise/status_flag.h ) s.files += %w( src/core/lib/promise/trace.cc ) s.files += %w( src/core/lib/promise/trace.h ) + s.files += %w( src/core/lib/promise/try_join.h ) s.files += %w( src/core/lib/promise/try_seq.h ) s.files += %w( src/core/lib/resolver/endpoint_addresses.cc ) s.files += %w( src/core/lib/resolver/endpoint_addresses.h ) @@ -1871,6 +1874,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/surface/validate_metadata.cc ) s.files += %w( src/core/lib/surface/validate_metadata.h ) s.files += %w( src/core/lib/surface/version.cc ) + s.files += %w( src/core/lib/surface/wait_for_cq_end_op.h ) s.files += %w( src/core/lib/transport/batch_builder.cc ) s.files += %w( src/core/lib/transport/batch_builder.h ) s.files += %w( src/core/lib/transport/bdp_estimator.cc ) diff --git a/package.xml b/package.xml index 54d49bdff94..9ed1cf552fb 100644 --- a/package.xml +++ b/package.xml @@ -1622,10 +1622,12 @@ + + @@ -1650,6 +1652,7 @@ + @@ -1853,6 +1856,7 @@ + diff --git a/src/core/BUILD b/src/core/BUILD index 1c9774b816b..7c273dce60e 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -439,6 +439,7 @@ grpc_cc_library( ], deps = [ "promise_status", + "//:gpr", "//:gpr_platform", ], ) @@ -710,6 +711,7 @@ grpc_cc_library( "join_state", "map", "poll", + "status_flag", "//:gpr_platform", ], ) @@ -810,6 +812,7 @@ grpc_cc_library( "promise_like", "promise_status", "seq_state", + "status_flag", "//:debug_location", "//:gpr_platform", ], diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc index e958b03a7d8..24f30687182 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.cc +++ b/src/core/ext/transport/chaotic_good/client_transport.cc @@ -96,7 +96,7 @@ ClientTransport::ClientTransport( }, // Write buffers to corresponding endpoints concurrently. [this]() { - return TryJoin( + return TryJoin( control_endpoint_->Write( std::move(control_endpoint_write_buffer_)), data_endpoint_->Write(std::move(data_endpoint_write_buffer_))); @@ -134,7 +134,7 @@ ClientTransport::ClientTransport( .value()); // Read header and trailers from control endpoint. // Read message padding and message from data endpoint. - return TryJoin( + return TryJoin( control_endpoint_->Read(frame_header_->GetFrameLength()), data_endpoint_->Read(frame_header_->message_padding + frame_header_->message_length)); diff --git a/src/core/ext/transport/chaotic_good/client_transport.h b/src/core/ext/transport/chaotic_good/client_transport.h index 86f9072fcfc..23ecdbfe84a 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.h +++ b/src/core/ext/transport/chaotic_good/client_transport.h @@ -105,7 +105,7 @@ class ClientTransport { std::move(pipe_server_frames.sender)))); } return TrySeq( - TryJoin( + TryJoin( // Continuously send client frame with client to server messages. ForEach(std::move(*call_args.client_to_server_messages), [stream_id, initial_frame = true, diff --git a/src/core/lib/channel/channel_stack.cc b/src/core/lib/channel/channel_stack.cc index 3f8edc6dc4f..528a299632d 100644 --- a/src/core/lib/channel/channel_stack.cc +++ b/src/core/lib/channel/channel_stack.cc @@ -326,6 +326,11 @@ void grpc_channel_stack::InitClientCallSpine( grpc_core::CallSpineInterface* call) { for (size_t i = 0; i < count; i++) { auto* elem = grpc_channel_stack_element(this, i); + if (elem->filter->init_call == nullptr) { + grpc_core::Crash( + absl::StrCat("Filter '", elem->filter->name, + "' does not support the call-v3 interface")); + } elem->filter->init_call(elem, call); } } @@ -334,6 +339,11 @@ void grpc_channel_stack::InitServerCallSpine( grpc_core::CallSpineInterface* call) { for (size_t i = 0; i < count; i++) { auto* elem = grpc_channel_stack_element(this, count - 1 - i); + if (elem->filter->init_call == nullptr) { + grpc_core::Crash( + absl::StrCat("Filter '", elem->filter->name, + "' does not support the call-v3 interface")); + } elem->filter->init_call(elem, call); } } diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index b7eccc29dd4..9ae7e14ee90 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -873,10 +873,12 @@ grpc_channel_filter MakeConnectedFilter() { // do this, and I'm not sure what that is yet. This is only "safe" // because call stacks place no additional data after the last call // element, and the last call element MUST be the connected channel. - channel_stack->call_stack_size += - static_cast(elem->channel_data) - ->transport->filter_stack_transport() - ->SizeOfStream(); + auto* transport = + static_cast(elem->channel_data)->transport; + if (transport->filter_stack_transport() != nullptr) { + channel_stack->call_stack_size += + transport->filter_stack_transport()->SizeOfStream(); + } }, connected_channel_destroy_channel_elem, connected_channel_get_channel_info, @@ -884,13 +886,27 @@ grpc_channel_filter MakeConnectedFilter() { }; } -ArenaPromise MakeTransportCallPromise( - Transport*, CallArgs, NextPromiseFactory) { - Crash("unimplemented"); +ArenaPromise MakeClientTransportCallPromise( + Transport* transport, CallArgs call_args, NextPromiseFactory) { + auto spine = GetContext()->MakeCallSpine(std::move(call_args)); + transport->client_transport()->StartCall(CallHandler{spine}); + return Map(spine->server_trailing_metadata().receiver.Next(), + [](NextResult r) { + if (r.has_value()) { + auto md = std::move(r.value()); + md->Set(GrpcStatusFromWire(), true); + return md; + } + auto m = GetContext()->MakePooled( + GetContext()); + m->Set(GrpcStatusMetadata(), GRPC_STATUS_CANCELLED); + m->Set(GrpcCallWasCancelled(), true); + return m; + }); } -const grpc_channel_filter kPromiseBasedTransportFilter = - MakeConnectedFilter(); +const grpc_channel_filter kClientPromiseBasedTransportFilter = + MakeConnectedFilter(); #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL const grpc_channel_filter kClientEmulatedFilter = @@ -908,11 +924,37 @@ const grpc_channel_filter kServerEmulatedFilter = MakeConnectedFilter(); #endif -bool TransportSupportsPromiseBasedCalls(const ChannelArgs& args) { +// noop filter for the v3 stack: placeholder for now because other code requires +// we have a terminator. +// TODO(ctiller): delete when v3 transition is complete. +const grpc_channel_filter kServerPromiseBasedTransportFilter = { + nullptr, + [](grpc_channel_element*, CallArgs, NextPromiseFactory) + -> ArenaPromise { Crash("not implemented"); }, + /* init_call: */ [](grpc_channel_element*, CallSpineInterface*) {}, + connected_channel_start_transport_op, + 0, + nullptr, + set_pollset_or_pollset_set, + nullptr, + sizeof(channel_data), + connected_channel_init_channel_elem, + +[](grpc_channel_stack*, grpc_channel_element*) {}, + connected_channel_destroy_channel_elem, + connected_channel_get_channel_info, + "connected", +}; + +bool TransportSupportsClientPromiseBasedCalls(const ChannelArgs& args) { auto* transport = args.GetObject(); return transport->client_transport() != nullptr; } +bool TransportSupportsServerPromiseBasedCalls(const ChannelArgs& args) { + auto* transport = args.GetObject(); + return transport->server_transport() != nullptr; +} + } // namespace void RegisterConnectedChannel(CoreConfiguration::Builder* builder) { @@ -925,32 +967,33 @@ void RegisterConnectedChannel(CoreConfiguration::Builder* builder) { // Option 1, and our ideal: the transport supports promise based calls, // and so we simply use the transport directly. builder->channel_init() - ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &kPromiseBasedTransportFilter) + ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, + &kClientPromiseBasedTransportFilter) .Terminal() - .If(TransportSupportsPromiseBasedCalls); + .If(TransportSupportsClientPromiseBasedCalls); builder->channel_init() ->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, - &kPromiseBasedTransportFilter) + &kClientPromiseBasedTransportFilter) .Terminal() - .If(TransportSupportsPromiseBasedCalls); + .If(TransportSupportsClientPromiseBasedCalls); builder->channel_init() - ->RegisterFilter(GRPC_SERVER_CHANNEL, &kPromiseBasedTransportFilter) + ->RegisterFilter(GRPC_SERVER_CHANNEL, &kServerPromiseBasedTransportFilter) .Terminal() - .If(TransportSupportsPromiseBasedCalls); + .If(TransportSupportsServerPromiseBasedCalls); // Option 2: the transport does not support promise based calls. builder->channel_init() ->RegisterFilter(GRPC_CLIENT_SUBCHANNEL, &kClientEmulatedFilter) .Terminal() - .IfNot(TransportSupportsPromiseBasedCalls); + .IfNot(TransportSupportsClientPromiseBasedCalls); builder->channel_init() ->RegisterFilter(GRPC_CLIENT_DIRECT_CHANNEL, &kClientEmulatedFilter) .Terminal() - .IfNot(TransportSupportsPromiseBasedCalls); + .IfNot(TransportSupportsClientPromiseBasedCalls); builder->channel_init() ->RegisterFilter(GRPC_SERVER_CHANNEL, &kServerEmulatedFilter) .Terminal() - .IfNot(TransportSupportsPromiseBasedCalls); + .IfNot(TransportSupportsServerPromiseBasedCalls); } } // namespace grpc_core diff --git a/src/core/lib/promise/detail/promise_factory.h b/src/core/lib/promise/detail/promise_factory.h index d278a650782..127819598a2 100644 --- a/src/core/lib/promise/detail/promise_factory.h +++ b/src/core/lib/promise/detail/promise_factory.h @@ -18,6 +18,7 @@ #include #include +#include #include #include "absl/meta/type_traits.h" diff --git a/src/core/lib/promise/latch.h b/src/core/lib/promise/latch.h index 4ade84b8939..67002234b7c 100644 --- a/src/core/lib/promise/latch.h +++ b/src/core/lib/promise/latch.h @@ -185,6 +185,8 @@ class Latch { waiter_.Wake(); } + bool is_set() const { return is_set_; } + private: std::string DebugTag() { return absl::StrCat(Activity::current()->DebugTag(), " LATCH(void)[0x", diff --git a/src/core/lib/promise/status_flag.h b/src/core/lib/promise/status_flag.h index 0e5af4f0da8..d9067509c32 100644 --- a/src/core/lib/promise/status_flag.h +++ b/src/core/lib/promise/status_flag.h @@ -21,6 +21,8 @@ #include "absl/status/statusor.h" #include "absl/types/optional.h" +#include + #include "src/core/lib/promise/detail/status.h" namespace grpc_core { @@ -41,6 +43,16 @@ struct StatusCastImpl { static absl::Status Cast(Success) { return absl::OkStatus(); } }; +template <> +struct StatusCastImpl { + static absl::Status Cast(Failure) { return absl::CancelledError(); } +}; + +template +struct StatusCastImpl, Failure> { + static absl::StatusOr Cast(Failure) { return absl::CancelledError(); } +}; + // A boolean representing whether an operation succeeded (true) or failed // (false). class StatusFlag { @@ -91,18 +103,25 @@ class ValueOrFailure { ValueOrFailure(T value) : value_(std::move(value)) {} // NOLINTNEXTLINE(google-explicit-constructor) ValueOrFailure(Failure) {} + // NOLINTNEXTLINE(google-explicit-constructor) + ValueOrFailure(StatusFlag status) { GPR_ASSERT(!status.ok()); } static ValueOrFailure FromOptional(absl::optional value) { return ValueOrFailure{std::move(value)}; } bool ok() const { return value_.has_value(); } + StatusFlag status() const { return StatusFlag(ok()); } const T& value() const { return value_.value(); } T& value() { return value_.value(); } const T& operator*() const { return *value_; } T& operator*() { return *value_; } + bool operator==(const ValueOrFailure& other) const { + return value_ == other.value_; + } + private: absl::optional value_; }; @@ -117,13 +136,6 @@ inline T TakeValue(ValueOrFailure&& value) { return std::move(value.value()); } -template -struct StatusCastImpl> { - static absl::Status Cast(const ValueOrFailure flag) { - return flag.ok() ? absl::OkStatus() : absl::CancelledError(); - } -}; - template struct StatusCastImpl, ValueOrFailure> { static absl::StatusOr Cast(ValueOrFailure value) { @@ -132,6 +144,29 @@ struct StatusCastImpl, ValueOrFailure> { } }; +template +struct StatusCastImpl, Failure> { + static ValueOrFailure Cast(Failure) { + return ValueOrFailure(Failure{}); + } +}; + +template +struct StatusCastImpl, StatusFlag&> { + static ValueOrFailure Cast(StatusFlag f) { + GPR_ASSERT(!f.ok()); + return ValueOrFailure(Failure{}); + } +}; + +template +struct StatusCastImpl, StatusFlag> { + static ValueOrFailure Cast(StatusFlag f) { + GPR_ASSERT(!f.ok()); + return ValueOrFailure(Failure{}); + } +}; + } // namespace grpc_core #endif // GRPC_SRC_CORE_LIB_PROMISE_STATUS_FLAG_H \ No newline at end of file diff --git a/src/core/lib/promise/try_join.h b/src/core/lib/promise/try_join.h index 14503db5796..be3354dc9b7 100644 --- a/src/core/lib/promise/try_join.h +++ b/src/core/lib/promise/try_join.h @@ -27,6 +27,7 @@ #include "src/core/lib/promise/detail/join_state.h" #include "src/core/lib/promise/map.h" #include "src/core/lib/promise/poll.h" +#include "src/core/lib/promise/status_flag.h" namespace grpc_core { @@ -44,48 +45,68 @@ T IntoResult(absl::StatusOr* status) { inline Empty IntoResult(absl::Status*) { return Empty{}; } // Traits object to pass to BasicJoin +template