diff --git a/BUILD b/BUILD index b1797baa2e8..832e608be67 100644 --- a/BUILD +++ b/BUILD @@ -1099,6 +1099,7 @@ grpc_cc_library( name = "promise", external_deps = [ "absl/types:optional", + "absl/status", ], language = "c++", public_hdrs = [ @@ -1313,6 +1314,7 @@ grpc_cc_library( "context", "gpr_base", "gpr_codegen", + "orphanable", "poll", "promise_factory", "promise_status", @@ -1866,6 +1868,7 @@ grpc_cc_library( "src/core/lib/channel/channelz_registry.cc", "src/core/lib/channel/connected_channel.cc", "src/core/lib/channel/handshaker.cc", + "src/core/lib/channel/promise_based_filter.cc", "src/core/lib/channel/status_util.cc", "src/core/lib/compression/compression.cc", "src/core/lib/compression/compression_internal.cc", @@ -3698,6 +3701,7 @@ grpc_cc_library( "gpr_base", "grpc_base", "grpc_security_base", + "promise", "ref_counted_ptr", "tsi_fake_credentials", ], @@ -3716,6 +3720,7 @@ grpc_cc_library( deps = [ "gpr", "grpc_security_base", + "promise", "ref_counted_ptr", "tsi_local_credentials", ], @@ -3758,6 +3763,7 @@ grpc_cc_library( "grpc_client_channel", "grpc_security_base", "grpc_sockaddr", + "promise", "ref_counted_ptr", "sockaddr_utils", "tsi_local_credentials", @@ -3787,6 +3793,7 @@ grpc_cc_library( "gpr_base", "grpc_base", "grpc_security_base", + "promise", "ref_counted_ptr", "tsi_alts_credentials", "tsi_base", @@ -3814,6 +3821,7 @@ grpc_cc_library( "grpc_credentials_util", "grpc_security_base", "grpc_transport_chttp2_alpn", + "promise", "ref_counted_ptr", "tsi_base", "tsi_ssl_credentials", @@ -3882,6 +3890,7 @@ grpc_cc_library( "grpc_base", "grpc_credentials_util", "grpc_security_base", + "promise", "tsi_base", "tsi_ssl_credentials", ], @@ -3904,6 +3913,7 @@ grpc_cc_library( "gpr_base", "grpc_base", "grpc_security_base", + "promise", "ref_counted_ptr", ], ) @@ -3935,6 +3945,7 @@ grpc_cc_library( "httpcli", "httpcli_ssl_credentials", "json", + "promise", "ref_counted", "ref_counted_ptr", "tsi_ssl_types", @@ -3954,9 +3965,11 @@ grpc_cc_library( "absl/container:inlined_vector", "absl/strings", "absl/strings:str_format", + "absl/status", ], language = "c++", deps = [ + "capture", "gpr_base", "grpc_base", "grpc_codegen", @@ -3965,6 +3978,7 @@ grpc_cc_library( "httpcli", "httpcli_ssl_credentials", "json", + "promise", "ref_counted_ptr", "uri_parser", ], @@ -4023,6 +4037,7 @@ grpc_cc_library( "gpr_base", "grpc_base", "grpc_security_base", + "promise", "ref_counted_ptr", "tsi_ssl_credentials", ], @@ -4058,6 +4073,7 @@ grpc_cc_library( "httpcli", "httpcli_ssl_credentials", "json", + "promise", "ref_counted", "ref_counted_ptr", "slice", @@ -4084,6 +4100,7 @@ grpc_cc_library( name = "grpc_security_base", srcs = [ "src/core/lib/security/context/security_context.cc", + "src/core/lib/security/credentials/call_creds_util.cc", "src/core/lib/security/credentials/composite/composite_credentials.cc", "src/core/lib/security/credentials/credentials.cc", "src/core/lib/security/credentials/plugin/plugin_credentials.cc", @@ -4096,6 +4113,7 @@ grpc_cc_library( ], hdrs = [ "src/core/lib/security/context/security_context.h", + "src/core/lib/security/credentials/call_creds_util.h", "src/core/lib/security/credentials/composite/composite_credentials.h", "src/core/lib/security/credentials/credentials.h", "src/core/lib/security/credentials/plugin/plugin_credentials.h", @@ -4115,13 +4133,16 @@ grpc_cc_library( visibility = ["@grpc:public"], deps = [ "arena", + "arena_promise", "config", "gpr_base", "grpc_base", "grpc_trace", "json", + "promise", "ref_counted", "ref_counted_ptr", + "try_seq", "tsi_base", ], ) diff --git a/CMakeLists.txt b/CMakeLists.txt index e58b5d73aab..0398c9df385 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1988,6 +1988,7 @@ add_library(grpc src/core/lib/channel/connected_channel.cc src/core/lib/channel/handshaker.cc src/core/lib/channel/handshaker_registry.cc + src/core/lib/channel/promise_based_filter.cc src/core/lib/channel/status_util.cc src/core/lib/compression/compression.cc src/core/lib/compression/compression_internal.cc @@ -2124,6 +2125,7 @@ add_library(grpc src/core/lib/security/credentials/alts/grpc_alts_credentials_client_options.cc src/core/lib/security/credentials/alts/grpc_alts_credentials_options.cc src/core/lib/security/credentials/alts/grpc_alts_credentials_server_options.cc + src/core/lib/security/credentials/call_creds_util.cc src/core/lib/security/credentials/channel_creds_registry_init.cc src/core/lib/security/credentials/composite/composite_credentials.cc src/core/lib/security/credentials/credentials.cc @@ -2630,6 +2632,7 @@ add_library(grpc_unsecure src/core/lib/channel/connected_channel.cc src/core/lib/channel/handshaker.cc src/core/lib/channel/handshaker_registry.cc + src/core/lib/channel/promise_based_filter.cc src/core/lib/channel/status_util.cc src/core/lib/compression/compression.cc src/core/lib/compression/compression_internal.cc @@ -2753,6 +2756,7 @@ add_library(grpc_unsecure src/core/lib/security/authorization/evaluate_args.cc src/core/lib/security/authorization/grpc_server_authz_filter.cc src/core/lib/security/context/security_context.cc + src/core/lib/security/credentials/call_creds_util.cc src/core/lib/security/credentials/composite/composite_credentials.cc src/core/lib/security/credentials/credentials.cc src/core/lib/security/credentials/fake/fake_credentials.cc @@ -7373,52 +7377,7 @@ endif() if(gRPC_BUILD_TESTS) add_executable(activity_test - src/core/ext/upb-generated/google/protobuf/any.upb.c - src/core/ext/upb-generated/google/rpc/status.upb.c - src/core/lib/gpr/alloc.cc - src/core/lib/gpr/atm.cc - src/core/lib/gpr/cpu_iphone.cc - src/core/lib/gpr/cpu_linux.cc - src/core/lib/gpr/cpu_posix.cc - src/core/lib/gpr/cpu_windows.cc - src/core/lib/gpr/env_linux.cc - src/core/lib/gpr/env_posix.cc - src/core/lib/gpr/env_windows.cc - src/core/lib/gpr/log.cc - src/core/lib/gpr/log_android.cc - src/core/lib/gpr/log_linux.cc - src/core/lib/gpr/log_posix.cc - src/core/lib/gpr/log_windows.cc - src/core/lib/gpr/murmur_hash.cc - src/core/lib/gpr/string.cc - src/core/lib/gpr/string_posix.cc - src/core/lib/gpr/string_util_windows.cc - src/core/lib/gpr/string_windows.cc - src/core/lib/gpr/sync.cc - src/core/lib/gpr/sync_abseil.cc - src/core/lib/gpr/sync_posix.cc - src/core/lib/gpr/sync_windows.cc - src/core/lib/gpr/time.cc - src/core/lib/gpr/time_posix.cc - src/core/lib/gpr/time_precise.cc - src/core/lib/gpr/time_windows.cc - src/core/lib/gpr/tmpfile_msys.cc - src/core/lib/gpr/tmpfile_posix.cc - src/core/lib/gpr/tmpfile_windows.cc - src/core/lib/gpr/wrap_memcpy.cc - src/core/lib/gprpp/examine_stack.cc - src/core/lib/gprpp/fork.cc - src/core/lib/gprpp/global_config_env.cc - src/core/lib/gprpp/host_port.cc - src/core/lib/gprpp/mpscq.cc - src/core/lib/gprpp/stat_posix.cc - src/core/lib/gprpp/stat_windows.cc - src/core/lib/gprpp/status_helper.cc - src/core/lib/gprpp/thd_posix.cc - src/core/lib/gprpp/thd_windows.cc - src/core/lib/gprpp/time_util.cc - src/core/lib/profiling/basic_timers.cc - src/core/lib/profiling/stap_timers.cc + src/core/lib/debug/trace.cc src/core/lib/promise/activity.cc test/core/promise/activity_test.cc third_party/googletest/googletest/src/gtest-all.cc @@ -7447,21 +7406,10 @@ target_include_directories(activity_test target_link_libraries(activity_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} - absl::base - absl::core_headers absl::flat_hash_set - absl::memory - absl::random_random - absl::status absl::statusor - absl::cord - absl::str_format - absl::strings - absl::synchronization - absl::time - absl::optional absl::variant - upb + gpr ) @@ -12503,52 +12451,7 @@ endif() if(gRPC_BUILD_TESTS) add_executable(latch_test - src/core/ext/upb-generated/google/protobuf/any.upb.c - src/core/ext/upb-generated/google/rpc/status.upb.c - src/core/lib/gpr/alloc.cc - src/core/lib/gpr/atm.cc - src/core/lib/gpr/cpu_iphone.cc - src/core/lib/gpr/cpu_linux.cc - src/core/lib/gpr/cpu_posix.cc - src/core/lib/gpr/cpu_windows.cc - src/core/lib/gpr/env_linux.cc - src/core/lib/gpr/env_posix.cc - src/core/lib/gpr/env_windows.cc - src/core/lib/gpr/log.cc - src/core/lib/gpr/log_android.cc - src/core/lib/gpr/log_linux.cc - src/core/lib/gpr/log_posix.cc - src/core/lib/gpr/log_windows.cc - src/core/lib/gpr/murmur_hash.cc - src/core/lib/gpr/string.cc - src/core/lib/gpr/string_posix.cc - src/core/lib/gpr/string_util_windows.cc - src/core/lib/gpr/string_windows.cc - src/core/lib/gpr/sync.cc - src/core/lib/gpr/sync_abseil.cc - src/core/lib/gpr/sync_posix.cc - src/core/lib/gpr/sync_windows.cc - src/core/lib/gpr/time.cc - src/core/lib/gpr/time_posix.cc - src/core/lib/gpr/time_precise.cc - src/core/lib/gpr/time_windows.cc - src/core/lib/gpr/tmpfile_msys.cc - src/core/lib/gpr/tmpfile_posix.cc - src/core/lib/gpr/tmpfile_windows.cc - src/core/lib/gpr/wrap_memcpy.cc - src/core/lib/gprpp/examine_stack.cc - src/core/lib/gprpp/fork.cc - src/core/lib/gprpp/global_config_env.cc - src/core/lib/gprpp/host_port.cc - src/core/lib/gprpp/mpscq.cc - src/core/lib/gprpp/stat_posix.cc - src/core/lib/gprpp/stat_windows.cc - src/core/lib/gprpp/status_helper.cc - src/core/lib/gprpp/thd_posix.cc - src/core/lib/gprpp/thd_windows.cc - src/core/lib/gprpp/time_util.cc - src/core/lib/profiling/basic_timers.cc - src/core/lib/profiling/stap_timers.cc + src/core/lib/debug/trace.cc src/core/lib/promise/activity.cc test/core/promise/latch_test.cc third_party/googletest/googletest/src/gtest-all.cc @@ -12577,20 +12480,9 @@ target_include_directories(latch_test target_link_libraries(latch_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} - absl::base - absl::core_headers - absl::memory - absl::random_random - absl::status absl::statusor - absl::cord - absl::str_format - absl::strings - absl::synchronization - absl::time - absl::optional absl::variant - upb + gpr ) @@ -13166,52 +13058,7 @@ endif() if(gRPC_BUILD_TESTS) add_executable(observable_test - src/core/ext/upb-generated/google/protobuf/any.upb.c - src/core/ext/upb-generated/google/rpc/status.upb.c - src/core/lib/gpr/alloc.cc - src/core/lib/gpr/atm.cc - src/core/lib/gpr/cpu_iphone.cc - src/core/lib/gpr/cpu_linux.cc - src/core/lib/gpr/cpu_posix.cc - src/core/lib/gpr/cpu_windows.cc - src/core/lib/gpr/env_linux.cc - src/core/lib/gpr/env_posix.cc - src/core/lib/gpr/env_windows.cc - src/core/lib/gpr/log.cc - src/core/lib/gpr/log_android.cc - src/core/lib/gpr/log_linux.cc - src/core/lib/gpr/log_posix.cc - src/core/lib/gpr/log_windows.cc - src/core/lib/gpr/murmur_hash.cc - src/core/lib/gpr/string.cc - src/core/lib/gpr/string_posix.cc - src/core/lib/gpr/string_util_windows.cc - src/core/lib/gpr/string_windows.cc - src/core/lib/gpr/sync.cc - src/core/lib/gpr/sync_abseil.cc - src/core/lib/gpr/sync_posix.cc - src/core/lib/gpr/sync_windows.cc - src/core/lib/gpr/time.cc - src/core/lib/gpr/time_posix.cc - src/core/lib/gpr/time_precise.cc - src/core/lib/gpr/time_windows.cc - src/core/lib/gpr/tmpfile_msys.cc - src/core/lib/gpr/tmpfile_posix.cc - src/core/lib/gpr/tmpfile_windows.cc - src/core/lib/gpr/wrap_memcpy.cc - src/core/lib/gprpp/examine_stack.cc - src/core/lib/gprpp/fork.cc - src/core/lib/gprpp/global_config_env.cc - src/core/lib/gprpp/host_port.cc - src/core/lib/gprpp/mpscq.cc - src/core/lib/gprpp/stat_posix.cc - src/core/lib/gprpp/stat_windows.cc - src/core/lib/gprpp/status_helper.cc - src/core/lib/gprpp/thd_posix.cc - src/core/lib/gprpp/thd_windows.cc - src/core/lib/gprpp/time_util.cc - src/core/lib/profiling/basic_timers.cc - src/core/lib/profiling/stap_timers.cc + src/core/lib/debug/trace.cc src/core/lib/promise/activity.cc test/core/promise/observable_test.cc third_party/googletest/googletest/src/gtest-all.cc @@ -13240,21 +13087,10 @@ target_include_directories(observable_test target_link_libraries(observable_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} - absl::base - absl::core_headers absl::flat_hash_set - absl::memory - absl::random_random - absl::status absl::statusor - absl::cord - absl::str_format - absl::strings - absl::synchronization - absl::time - absl::optional absl::variant - upb + gpr ) @@ -13603,6 +13439,7 @@ target_link_libraries(promise_factory_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} absl::bind_front + absl::status absl::optional absl::variant absl::utility @@ -13640,6 +13477,7 @@ target_include_directories(promise_map_test target_link_libraries(promise_map_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::status absl::optional absl::variant ) @@ -13676,6 +13514,7 @@ target_include_directories(promise_test target_link_libraries(promise_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::status absl::optional absl::variant ) diff --git a/Makefile b/Makefile index 4e29ddfdcfd..84d404dc139 100644 --- a/Makefile +++ b/Makefile @@ -1429,6 +1429,7 @@ LIBGRPC_SRC = \ src/core/lib/channel/connected_channel.cc \ src/core/lib/channel/handshaker.cc \ src/core/lib/channel/handshaker_registry.cc \ + src/core/lib/channel/promise_based_filter.cc \ src/core/lib/channel/status_util.cc \ src/core/lib/compression/compression.cc \ src/core/lib/compression/compression_internal.cc \ @@ -1565,6 +1566,7 @@ LIBGRPC_SRC = \ src/core/lib/security/credentials/alts/grpc_alts_credentials_client_options.cc \ src/core/lib/security/credentials/alts/grpc_alts_credentials_options.cc \ src/core/lib/security/credentials/alts/grpc_alts_credentials_server_options.cc \ + src/core/lib/security/credentials/call_creds_util.cc \ src/core/lib/security/credentials/channel_creds_registry_init.cc \ src/core/lib/security/credentials/composite/composite_credentials.cc \ src/core/lib/security/credentials/credentials.cc \ @@ -1920,6 +1922,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/channel/connected_channel.cc \ src/core/lib/channel/handshaker.cc \ src/core/lib/channel/handshaker_registry.cc \ + src/core/lib/channel/promise_based_filter.cc \ src/core/lib/channel/status_util.cc \ src/core/lib/compression/compression.cc \ src/core/lib/compression/compression_internal.cc \ @@ -2043,6 +2046,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/security/authorization/evaluate_args.cc \ src/core/lib/security/authorization/grpc_server_authz_filter.cc \ src/core/lib/security/context/security_context.cc \ + src/core/lib/security/credentials/call_creds_util.cc \ src/core/lib/security/credentials/composite/composite_credentials.cc \ src/core/lib/security/credentials/credentials.cc \ src/core/lib/security/credentials/fake/fake_credentials.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index e95d8105bb1..739c88bf393 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -970,6 +970,7 @@ libs: - src/core/lib/security/credentials/alts/alts_credentials.h - src/core/lib/security/credentials/alts/check_gcp_environment.h - src/core/lib/security/credentials/alts/grpc_alts_credentials_options.h + - src/core/lib/security/credentials/call_creds_util.h - src/core/lib/security/credentials/channel_creds_registry.h - src/core/lib/security/credentials/composite/composite_credentials.h - src/core/lib/security/credentials/credentials.h @@ -1487,6 +1488,7 @@ libs: - src/core/lib/channel/connected_channel.cc - src/core/lib/channel/handshaker.cc - src/core/lib/channel/handshaker_registry.cc + - src/core/lib/channel/promise_based_filter.cc - src/core/lib/channel/status_util.cc - src/core/lib/compression/compression.cc - src/core/lib/compression/compression_internal.cc @@ -1623,6 +1625,7 @@ libs: - src/core/lib/security/credentials/alts/grpc_alts_credentials_client_options.cc - src/core/lib/security/credentials/alts/grpc_alts_credentials_options.cc - src/core/lib/security/credentials/alts/grpc_alts_credentials_server_options.cc + - src/core/lib/security/credentials/call_creds_util.cc - src/core/lib/security/credentials/channel_creds_registry_init.cc - src/core/lib/security/credentials/composite/composite_credentials.cc - src/core/lib/security/credentials/credentials.cc @@ -2129,6 +2132,7 @@ libs: - src/core/lib/security/authorization/evaluate_args.h - src/core/lib/security/authorization/grpc_server_authz_filter.h - src/core/lib/security/context/security_context.h + - src/core/lib/security/credentials/call_creds_util.h - src/core/lib/security/credentials/channel_creds_registry.h - src/core/lib/security/credentials/composite/composite_credentials.h - src/core/lib/security/credentials/credentials.h @@ -2308,6 +2312,7 @@ libs: - src/core/lib/channel/connected_channel.cc - src/core/lib/channel/handshaker.cc - src/core/lib/channel/handshaker_registry.cc + - src/core/lib/channel/promise_based_filter.cc - src/core/lib/channel/status_util.cc - src/core/lib/compression/compression.cc - src/core/lib/compression/compression_internal.cc @@ -2431,6 +2436,7 @@ libs: - src/core/lib/security/authorization/evaluate_args.cc - src/core/lib/security/authorization/grpc_server_authz_filter.cc - src/core/lib/security/context/security_context.cc + - src/core/lib/security/credentials/call_creds_util.cc - src/core/lib/security/credentials/composite/composite_credentials.cc - src/core/lib/security/credentials/credentials.cc - src/core/lib/security/credentials/fake/fake_credentials.cc @@ -4411,38 +4417,12 @@ targets: build: test language: c++ headers: - - src/core/ext/upb-generated/google/protobuf/any.upb.h - - src/core/ext/upb-generated/google/rpc/status.upb.h - - src/core/lib/gpr/alloc.h - - src/core/lib/gpr/env.h - - src/core/lib/gpr/murmur_hash.h - - src/core/lib/gpr/spinlock.h - - src/core/lib/gpr/string.h - - src/core/lib/gpr/string_windows.h - - src/core/lib/gpr/time_precise.h - - src/core/lib/gpr/tls.h - - src/core/lib/gpr/tmpfile.h - - src/core/lib/gpr/useful.h + - src/core/lib/debug/trace.h - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h - - src/core/lib/gprpp/construct_destruct.h - - src/core/lib/gprpp/debug_location.h - - src/core/lib/gprpp/examine_stack.h - - src/core/lib/gprpp/fork.h - - src/core/lib/gprpp/global_config.h - - src/core/lib/gprpp/global_config_custom.h - - src/core/lib/gprpp/global_config_env.h - - src/core/lib/gprpp/global_config_generic.h - - src/core/lib/gprpp/host_port.h - - src/core/lib/gprpp/manual_constructor.h - - src/core/lib/gprpp/memory.h - - src/core/lib/gprpp/mpscq.h - - src/core/lib/gprpp/stat.h - - src/core/lib/gprpp/status_helper.h - - src/core/lib/gprpp/sync.h - - src/core/lib/gprpp/thd.h - - src/core/lib/gprpp/time_util.h - - src/core/lib/profiling/timers.h + - src/core/lib/gprpp/orphanable.h + - src/core/lib/gprpp/ref_counted.h + - src/core/lib/gprpp/ref_counted_ptr.h - src/core/lib/promise/activity.h - src/core/lib/promise/context.h - src/core/lib/promise/detail/basic_join.h @@ -4458,70 +4438,14 @@ targets: - src/core/lib/promise/wait_set.h - test/core/promise/test_wakeup_schedulers.h src: - - src/core/ext/upb-generated/google/protobuf/any.upb.c - - src/core/ext/upb-generated/google/rpc/status.upb.c - - src/core/lib/gpr/alloc.cc - - src/core/lib/gpr/atm.cc - - src/core/lib/gpr/cpu_iphone.cc - - src/core/lib/gpr/cpu_linux.cc - - src/core/lib/gpr/cpu_posix.cc - - src/core/lib/gpr/cpu_windows.cc - - src/core/lib/gpr/env_linux.cc - - src/core/lib/gpr/env_posix.cc - - src/core/lib/gpr/env_windows.cc - - src/core/lib/gpr/log.cc - - src/core/lib/gpr/log_android.cc - - src/core/lib/gpr/log_linux.cc - - src/core/lib/gpr/log_posix.cc - - src/core/lib/gpr/log_windows.cc - - src/core/lib/gpr/murmur_hash.cc - - src/core/lib/gpr/string.cc - - src/core/lib/gpr/string_posix.cc - - src/core/lib/gpr/string_util_windows.cc - - src/core/lib/gpr/string_windows.cc - - src/core/lib/gpr/sync.cc - - src/core/lib/gpr/sync_abseil.cc - - src/core/lib/gpr/sync_posix.cc - - src/core/lib/gpr/sync_windows.cc - - src/core/lib/gpr/time.cc - - src/core/lib/gpr/time_posix.cc - - src/core/lib/gpr/time_precise.cc - - src/core/lib/gpr/time_windows.cc - - src/core/lib/gpr/tmpfile_msys.cc - - src/core/lib/gpr/tmpfile_posix.cc - - src/core/lib/gpr/tmpfile_windows.cc - - src/core/lib/gpr/wrap_memcpy.cc - - src/core/lib/gprpp/examine_stack.cc - - src/core/lib/gprpp/fork.cc - - src/core/lib/gprpp/global_config_env.cc - - src/core/lib/gprpp/host_port.cc - - src/core/lib/gprpp/mpscq.cc - - src/core/lib/gprpp/stat_posix.cc - - src/core/lib/gprpp/stat_windows.cc - - src/core/lib/gprpp/status_helper.cc - - src/core/lib/gprpp/thd_posix.cc - - src/core/lib/gprpp/thd_windows.cc - - src/core/lib/gprpp/time_util.cc - - src/core/lib/profiling/basic_timers.cc - - src/core/lib/profiling/stap_timers.cc + - src/core/lib/debug/trace.cc - src/core/lib/promise/activity.cc - test/core/promise/activity_test.cc deps: - - absl/base:base - - absl/base:core_headers - absl/container:flat_hash_set - - absl/memory:memory - - absl/random:random - - absl/status:status - absl/status:statusor - - absl/strings:cord - - absl/strings:str_format - - absl/strings:strings - - absl/synchronization:synchronization - - absl/time:time - - absl/types:optional - absl/types:variant - - upb + - gpr uses_polling: false - name: address_sorting_test gtest: true @@ -5690,6 +5614,7 @@ targets: headers: - src/core/lib/debug/trace.h - src/core/lib/gprpp/atomic_utils.h + - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted_ptr.h - src/core/lib/gprpp/time.h @@ -6516,38 +6441,12 @@ targets: build: test language: c++ headers: - - src/core/ext/upb-generated/google/protobuf/any.upb.h - - src/core/ext/upb-generated/google/rpc/status.upb.h - - src/core/lib/gpr/alloc.h - - src/core/lib/gpr/env.h - - src/core/lib/gpr/murmur_hash.h - - src/core/lib/gpr/spinlock.h - - src/core/lib/gpr/string.h - - src/core/lib/gpr/string_windows.h - - src/core/lib/gpr/time_precise.h - - src/core/lib/gpr/tls.h - - src/core/lib/gpr/tmpfile.h - - src/core/lib/gpr/useful.h + - src/core/lib/debug/trace.h - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h - - src/core/lib/gprpp/construct_destruct.h - - src/core/lib/gprpp/debug_location.h - - src/core/lib/gprpp/examine_stack.h - - src/core/lib/gprpp/fork.h - - src/core/lib/gprpp/global_config.h - - src/core/lib/gprpp/global_config_custom.h - - src/core/lib/gprpp/global_config_env.h - - src/core/lib/gprpp/global_config_generic.h - - src/core/lib/gprpp/host_port.h - - src/core/lib/gprpp/manual_constructor.h - - src/core/lib/gprpp/memory.h - - src/core/lib/gprpp/mpscq.h - - src/core/lib/gprpp/stat.h - - src/core/lib/gprpp/status_helper.h - - src/core/lib/gprpp/sync.h - - src/core/lib/gprpp/thd.h - - src/core/lib/gprpp/time_util.h - - src/core/lib/profiling/timers.h + - src/core/lib/gprpp/orphanable.h + - src/core/lib/gprpp/ref_counted.h + - src/core/lib/gprpp/ref_counted_ptr.h - src/core/lib/promise/activity.h - src/core/lib/promise/context.h - src/core/lib/promise/detail/basic_join.h @@ -6563,69 +6462,13 @@ targets: - src/core/lib/promise/seq.h - test/core/promise/test_wakeup_schedulers.h src: - - src/core/ext/upb-generated/google/protobuf/any.upb.c - - src/core/ext/upb-generated/google/rpc/status.upb.c - - src/core/lib/gpr/alloc.cc - - src/core/lib/gpr/atm.cc - - src/core/lib/gpr/cpu_iphone.cc - - src/core/lib/gpr/cpu_linux.cc - - src/core/lib/gpr/cpu_posix.cc - - src/core/lib/gpr/cpu_windows.cc - - src/core/lib/gpr/env_linux.cc - - src/core/lib/gpr/env_posix.cc - - src/core/lib/gpr/env_windows.cc - - src/core/lib/gpr/log.cc - - src/core/lib/gpr/log_android.cc - - src/core/lib/gpr/log_linux.cc - - src/core/lib/gpr/log_posix.cc - - src/core/lib/gpr/log_windows.cc - - src/core/lib/gpr/murmur_hash.cc - - src/core/lib/gpr/string.cc - - src/core/lib/gpr/string_posix.cc - - src/core/lib/gpr/string_util_windows.cc - - src/core/lib/gpr/string_windows.cc - - src/core/lib/gpr/sync.cc - - src/core/lib/gpr/sync_abseil.cc - - src/core/lib/gpr/sync_posix.cc - - src/core/lib/gpr/sync_windows.cc - - src/core/lib/gpr/time.cc - - src/core/lib/gpr/time_posix.cc - - src/core/lib/gpr/time_precise.cc - - src/core/lib/gpr/time_windows.cc - - src/core/lib/gpr/tmpfile_msys.cc - - src/core/lib/gpr/tmpfile_posix.cc - - src/core/lib/gpr/tmpfile_windows.cc - - src/core/lib/gpr/wrap_memcpy.cc - - src/core/lib/gprpp/examine_stack.cc - - src/core/lib/gprpp/fork.cc - - src/core/lib/gprpp/global_config_env.cc - - src/core/lib/gprpp/host_port.cc - - src/core/lib/gprpp/mpscq.cc - - src/core/lib/gprpp/stat_posix.cc - - src/core/lib/gprpp/stat_windows.cc - - src/core/lib/gprpp/status_helper.cc - - src/core/lib/gprpp/thd_posix.cc - - src/core/lib/gprpp/thd_windows.cc - - src/core/lib/gprpp/time_util.cc - - src/core/lib/profiling/basic_timers.cc - - src/core/lib/profiling/stap_timers.cc + - src/core/lib/debug/trace.cc - src/core/lib/promise/activity.cc - test/core/promise/latch_test.cc deps: - - absl/base:base - - absl/base:core_headers - - absl/memory:memory - - absl/random:random - - absl/status:status - absl/status:statusor - - absl/strings:cord - - absl/strings:str_format - - absl/strings:strings - - absl/synchronization:synchronization - - absl/time:time - - absl/types:optional - absl/types:variant - - upb + - gpr uses_polling: false - name: lb_get_cpu_stats_test gtest: true @@ -6851,37 +6694,11 @@ targets: build: test language: c++ headers: - - src/core/ext/upb-generated/google/protobuf/any.upb.h - - src/core/ext/upb-generated/google/rpc/status.upb.h - - src/core/lib/gpr/alloc.h - - src/core/lib/gpr/env.h - - src/core/lib/gpr/murmur_hash.h - - src/core/lib/gpr/spinlock.h - - src/core/lib/gpr/string.h - - src/core/lib/gpr/string_windows.h - - src/core/lib/gpr/time_precise.h - - src/core/lib/gpr/tls.h - - src/core/lib/gpr/tmpfile.h - - src/core/lib/gpr/useful.h + - src/core/lib/debug/trace.h - src/core/lib/gprpp/atomic_utils.h - - src/core/lib/gprpp/construct_destruct.h - - src/core/lib/gprpp/debug_location.h - - src/core/lib/gprpp/examine_stack.h - - src/core/lib/gprpp/fork.h - - src/core/lib/gprpp/global_config.h - - src/core/lib/gprpp/global_config_custom.h - - src/core/lib/gprpp/global_config_env.h - - src/core/lib/gprpp/global_config_generic.h - - src/core/lib/gprpp/host_port.h - - src/core/lib/gprpp/manual_constructor.h - - src/core/lib/gprpp/memory.h - - src/core/lib/gprpp/mpscq.h - - src/core/lib/gprpp/stat.h - - src/core/lib/gprpp/status_helper.h - - src/core/lib/gprpp/sync.h - - src/core/lib/gprpp/thd.h - - src/core/lib/gprpp/time_util.h - - src/core/lib/profiling/timers.h + - src/core/lib/gprpp/orphanable.h + - src/core/lib/gprpp/ref_counted.h + - src/core/lib/gprpp/ref_counted_ptr.h - src/core/lib/promise/activity.h - src/core/lib/promise/context.h - src/core/lib/promise/detail/basic_seq.h @@ -6896,70 +6713,14 @@ targets: - src/core/lib/promise/wait_set.h - test/core/promise/test_wakeup_schedulers.h src: - - src/core/ext/upb-generated/google/protobuf/any.upb.c - - src/core/ext/upb-generated/google/rpc/status.upb.c - - src/core/lib/gpr/alloc.cc - - src/core/lib/gpr/atm.cc - - src/core/lib/gpr/cpu_iphone.cc - - src/core/lib/gpr/cpu_linux.cc - - src/core/lib/gpr/cpu_posix.cc - - src/core/lib/gpr/cpu_windows.cc - - src/core/lib/gpr/env_linux.cc - - src/core/lib/gpr/env_posix.cc - - src/core/lib/gpr/env_windows.cc - - src/core/lib/gpr/log.cc - - src/core/lib/gpr/log_android.cc - - src/core/lib/gpr/log_linux.cc - - src/core/lib/gpr/log_posix.cc - - src/core/lib/gpr/log_windows.cc - - src/core/lib/gpr/murmur_hash.cc - - src/core/lib/gpr/string.cc - - src/core/lib/gpr/string_posix.cc - - src/core/lib/gpr/string_util_windows.cc - - src/core/lib/gpr/string_windows.cc - - src/core/lib/gpr/sync.cc - - src/core/lib/gpr/sync_abseil.cc - - src/core/lib/gpr/sync_posix.cc - - src/core/lib/gpr/sync_windows.cc - - src/core/lib/gpr/time.cc - - src/core/lib/gpr/time_posix.cc - - src/core/lib/gpr/time_precise.cc - - src/core/lib/gpr/time_windows.cc - - src/core/lib/gpr/tmpfile_msys.cc - - src/core/lib/gpr/tmpfile_posix.cc - - src/core/lib/gpr/tmpfile_windows.cc - - src/core/lib/gpr/wrap_memcpy.cc - - src/core/lib/gprpp/examine_stack.cc - - src/core/lib/gprpp/fork.cc - - src/core/lib/gprpp/global_config_env.cc - - src/core/lib/gprpp/host_port.cc - - src/core/lib/gprpp/mpscq.cc - - src/core/lib/gprpp/stat_posix.cc - - src/core/lib/gprpp/stat_windows.cc - - src/core/lib/gprpp/status_helper.cc - - src/core/lib/gprpp/thd_posix.cc - - src/core/lib/gprpp/thd_windows.cc - - src/core/lib/gprpp/time_util.cc - - src/core/lib/profiling/basic_timers.cc - - src/core/lib/profiling/stap_timers.cc + - src/core/lib/debug/trace.cc - src/core/lib/promise/activity.cc - test/core/promise/observable_test.cc deps: - - absl/base:base - - absl/base:core_headers - absl/container:flat_hash_set - - absl/memory:memory - - absl/random:random - - absl/status:status - absl/status:statusor - - absl/strings:cord - - absl/strings:str_format - - absl/strings:strings - - absl/synchronization:synchronization - - absl/time:time - - absl/types:optional - absl/types:variant - - upb + - gpr uses_polling: false - name: orphanable_test gtest: true @@ -7124,6 +6885,7 @@ targets: - test/core/promise/promise_factory_test.cc deps: - absl/functional:bind_front + - absl/status:status - absl/types:optional - absl/types:variant - absl/utility:utility @@ -7140,6 +6902,7 @@ targets: src: - test/core/promise/map_test.cc deps: + - absl/status:status - absl/types:optional - absl/types:variant uses_polling: false @@ -7154,6 +6917,7 @@ targets: src: - test/core/promise/promise_test.cc deps: + - absl/status:status - absl/types:optional - absl/types:variant uses_polling: false diff --git a/config.m4 b/config.m4 index b744d3e35d7..4553bec115e 100644 --- a/config.m4 +++ b/config.m4 @@ -447,6 +447,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/channel/connected_channel.cc \ src/core/lib/channel/handshaker.cc \ src/core/lib/channel/handshaker_registry.cc \ + src/core/lib/channel/promise_based_filter.cc \ src/core/lib/channel/status_util.cc \ src/core/lib/compression/compression.cc \ src/core/lib/compression/compression_internal.cc \ @@ -627,6 +628,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/security/credentials/alts/grpc_alts_credentials_client_options.cc \ src/core/lib/security/credentials/alts/grpc_alts_credentials_options.cc \ src/core/lib/security/credentials/alts/grpc_alts_credentials_server_options.cc \ + src/core/lib/security/credentials/call_creds_util.cc \ src/core/lib/security/credentials/channel_creds_registry_init.cc \ src/core/lib/security/credentials/composite/composite_credentials.cc \ src/core/lib/security/credentials/credentials.cc \ diff --git a/config.w32 b/config.w32 index 7c1cf958c8f..a6e3d6bce98 100644 --- a/config.w32 +++ b/config.w32 @@ -413,6 +413,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\channel\\connected_channel.cc " + "src\\core\\lib\\channel\\handshaker.cc " + "src\\core\\lib\\channel\\handshaker_registry.cc " + + "src\\core\\lib\\channel\\promise_based_filter.cc " + "src\\core\\lib\\channel\\status_util.cc " + "src\\core\\lib\\compression\\compression.cc " + "src\\core\\lib\\compression\\compression_internal.cc " + @@ -593,6 +594,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\security\\credentials\\alts\\grpc_alts_credentials_client_options.cc " + "src\\core\\lib\\security\\credentials\\alts\\grpc_alts_credentials_options.cc " + "src\\core\\lib\\security\\credentials\\alts\\grpc_alts_credentials_server_options.cc " + + "src\\core\\lib\\security\\credentials\\call_creds_util.cc " + "src\\core\\lib\\security\\credentials\\channel_creds_registry_init.cc " + "src\\core\\lib\\security\\credentials\\composite\\composite_credentials.cc " + "src\\core\\lib\\security\\credentials\\credentials.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index daa38fd9b43..35fd3f8286e 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -816,6 +816,7 @@ Pod::Spec.new do |s| 'src/core/lib/security/credentials/alts/alts_credentials.h', 'src/core/lib/security/credentials/alts/check_gcp_environment.h', 'src/core/lib/security/credentials/alts/grpc_alts_credentials_options.h', + 'src/core/lib/security/credentials/call_creds_util.h', 'src/core/lib/security/credentials/channel_creds_registry.h', 'src/core/lib/security/credentials/composite/composite_credentials.h', 'src/core/lib/security/credentials/credentials.h', @@ -1614,6 +1615,7 @@ Pod::Spec.new do |s| 'src/core/lib/security/credentials/alts/alts_credentials.h', 'src/core/lib/security/credentials/alts/check_gcp_environment.h', 'src/core/lib/security/credentials/alts/grpc_alts_credentials_options.h', + 'src/core/lib/security/credentials/call_creds_util.h', 'src/core/lib/security/credentials/channel_creds_registry.h', 'src/core/lib/security/credentials/composite/composite_credentials.h', 'src/core/lib/security/credentials/credentials.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 7ca112584b7..93b39a97698 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -993,6 +993,7 @@ Pod::Spec.new do |s| 'src/core/lib/channel/handshaker_factory.h', 'src/core/lib/channel/handshaker_registry.cc', 'src/core/lib/channel/handshaker_registry.h', + 'src/core/lib/channel/promise_based_filter.cc', 'src/core/lib/channel/promise_based_filter.h', 'src/core/lib/channel/status_util.cc', 'src/core/lib/channel/status_util.h', @@ -1342,6 +1343,8 @@ Pod::Spec.new do |s| 'src/core/lib/security/credentials/alts/grpc_alts_credentials_options.cc', 'src/core/lib/security/credentials/alts/grpc_alts_credentials_options.h', 'src/core/lib/security/credentials/alts/grpc_alts_credentials_server_options.cc', + 'src/core/lib/security/credentials/call_creds_util.cc', + 'src/core/lib/security/credentials/call_creds_util.h', 'src/core/lib/security/credentials/channel_creds_registry.h', 'src/core/lib/security/credentials/channel_creds_registry_init.cc', 'src/core/lib/security/credentials/composite/composite_credentials.cc', @@ -2209,6 +2212,7 @@ Pod::Spec.new do |s| 'src/core/lib/security/credentials/alts/alts_credentials.h', 'src/core/lib/security/credentials/alts/check_gcp_environment.h', 'src/core/lib/security/credentials/alts/grpc_alts_credentials_options.h', + 'src/core/lib/security/credentials/call_creds_util.h', 'src/core/lib/security/credentials/channel_creds_registry.h', 'src/core/lib/security/credentials/composite/composite_credentials.h', 'src/core/lib/security/credentials/credentials.h', diff --git a/grpc.gemspec b/grpc.gemspec index 1f9de679e9a..1a84d9eb853 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -912,6 +912,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/channel/handshaker_factory.h ) s.files += %w( src/core/lib/channel/handshaker_registry.cc ) s.files += %w( src/core/lib/channel/handshaker_registry.h ) + s.files += %w( src/core/lib/channel/promise_based_filter.cc ) s.files += %w( src/core/lib/channel/promise_based_filter.h ) s.files += %w( src/core/lib/channel/status_util.cc ) s.files += %w( src/core/lib/channel/status_util.h ) @@ -1261,6 +1262,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/security/credentials/alts/grpc_alts_credentials_options.cc ) s.files += %w( src/core/lib/security/credentials/alts/grpc_alts_credentials_options.h ) s.files += %w( src/core/lib/security/credentials/alts/grpc_alts_credentials_server_options.cc ) + s.files += %w( src/core/lib/security/credentials/call_creds_util.cc ) + s.files += %w( src/core/lib/security/credentials/call_creds_util.h ) s.files += %w( src/core/lib/security/credentials/channel_creds_registry.h ) s.files += %w( src/core/lib/security/credentials/channel_creds_registry_init.cc ) s.files += %w( src/core/lib/security/credentials/composite/composite_credentials.cc ) diff --git a/grpc.gyp b/grpc.gyp index be77bb59c99..2c89374f8aa 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -891,6 +891,7 @@ 'src/core/lib/channel/connected_channel.cc', 'src/core/lib/channel/handshaker.cc', 'src/core/lib/channel/handshaker_registry.cc', + 'src/core/lib/channel/promise_based_filter.cc', 'src/core/lib/channel/status_util.cc', 'src/core/lib/compression/compression.cc', 'src/core/lib/compression/compression_internal.cc', @@ -1027,6 +1028,7 @@ 'src/core/lib/security/credentials/alts/grpc_alts_credentials_client_options.cc', 'src/core/lib/security/credentials/alts/grpc_alts_credentials_options.cc', 'src/core/lib/security/credentials/alts/grpc_alts_credentials_server_options.cc', + 'src/core/lib/security/credentials/call_creds_util.cc', 'src/core/lib/security/credentials/channel_creds_registry_init.cc', 'src/core/lib/security/credentials/composite/composite_credentials.cc', 'src/core/lib/security/credentials/credentials.cc', @@ -1353,6 +1355,7 @@ 'src/core/lib/channel/connected_channel.cc', 'src/core/lib/channel/handshaker.cc', 'src/core/lib/channel/handshaker_registry.cc', + 'src/core/lib/channel/promise_based_filter.cc', 'src/core/lib/channel/status_util.cc', 'src/core/lib/compression/compression.cc', 'src/core/lib/compression/compression_internal.cc', @@ -1476,6 +1479,7 @@ 'src/core/lib/security/authorization/evaluate_args.cc', 'src/core/lib/security/authorization/grpc_server_authz_filter.cc', 'src/core/lib/security/context/security_context.cc', + 'src/core/lib/security/credentials/call_creds_util.cc', 'src/core/lib/security/credentials/composite/composite_credentials.cc', 'src/core/lib/security/credentials/credentials.cc', 'src/core/lib/security/credentials/fake/fake_credentials.cc', diff --git a/package.xml b/package.xml index 10e3e312618..013b15ebffa 100644 --- a/package.xml +++ b/package.xml @@ -892,6 +892,7 @@ + @@ -1241,6 +1242,8 @@ + + diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h index 0c0556abcdc..ad1021b822b 100644 --- a/src/core/lib/channel/context.h +++ b/src/core/lib/channel/context.h @@ -19,6 +19,10 @@ #ifndef GRPC_CORE_LIB_CHANNEL_CONTEXT_H #define GRPC_CORE_LIB_CHANNEL_CONTEXT_H +#include + +#include "src/core/lib/promise/context.h" + /// Call object context pointers. /// Call context is represented as an array of \a grpc_call_context_elements. @@ -49,4 +53,11 @@ struct grpc_call_context_element { void (*destroy)(void*) = nullptr; }; +namespace grpc_core { +// Bind the legacy context array into the new style structure +// TODO(ctiller): remove as we migrate these contexts to the new system. +template <> +struct ContextType {}; +} // namespace grpc_core + #endif /* GRPC_CORE_LIB_CHANNEL_CONTEXT_H */ diff --git a/src/core/lib/channel/promise_based_filter.cc b/src/core/lib/channel/promise_based_filter.cc new file mode 100644 index 00000000000..6ba9ef1de3d --- /dev/null +++ b/src/core/lib/channel/promise_based_filter.cc @@ -0,0 +1,50 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/channel/promise_based_filter.h" + +#include "src/core/lib/channel/channel_stack.h" + +namespace grpc_core { +namespace promise_filter_detail { + +// We don't form ActivityPtr's to this type, and consequently don't need +// Orphan(). +void BaseCallData::Orphan() { abort(); } + +// For now we don't care about owning/non-owning wakers, instead just share +// implementation. +Waker BaseCallData::MakeNonOwningWaker() { return MakeOwningWaker(); } + +Waker BaseCallData::MakeOwningWaker() { + GRPC_CALL_STACK_REF(call_stack_, "waker"); + return Waker(this); +} + +void BaseCallData::Wakeup() { + auto wakeup = [](void* p, grpc_error_handle) { + auto* self = static_cast(p); + self->OnWakeup(); + self->Drop(); + }; + auto* closure = GRPC_CLOSURE_CREATE(wakeup, this, nullptr); + GRPC_CALL_COMBINER_START(call_combiner_, closure, GRPC_ERROR_NONE, "wakeup"); +} + +void BaseCallData::Drop() { GRPC_CALL_STACK_UNREF(call_stack_, "waker"); } + +} // namespace promise_filter_detail +} // namespace grpc_core diff --git a/src/core/lib/channel/promise_based_filter.h b/src/core/lib/channel/promise_based_filter.h index 001eac288eb..c89f76d81f6 100644 --- a/src/core/lib/channel/promise_based_filter.h +++ b/src/core/lib/channel/promise_based_filter.h @@ -27,6 +27,7 @@ #include #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/context.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/promise/arena_promise.h" @@ -77,19 +78,34 @@ enum class FilterEndpoint { namespace promise_filter_detail { // Call data shared between all implementations of promise-based filters. -class BaseCallData { +class BaseCallData : public Activity, private Wakeable { public: BaseCallData(grpc_call_element* elem, const grpc_call_element_args* args) - : elem_(elem), + : call_stack_(args->call_stack), + elem_(elem), arena_(args->arena), call_combiner_(args->call_combiner), - deadline_(args->deadline) {} + deadline_(args->deadline), + context_(args->context) {} + + void set_pollent(grpc_polling_entity* pollent) { pollent_ = pollent; } + + // Activity implementation (partial). + void Orphan() final; + Waker MakeNonOwningWaker() final; + Waker MakeOwningWaker() final; protected: - class ScopedContext : public promise_detail::Context { + class ScopedContext + : public promise_detail::Context, + public promise_detail::Context, + public promise_detail::Context { public: explicit ScopedContext(BaseCallData* call_data) - : promise_detail::Context(call_data->arena_) {} + : promise_detail::Context(call_data->arena_), + promise_detail::Context( + call_data->context_), + promise_detail::Context(call_data->pollent_) {} }; static MetadataHandle WrapMetadata( @@ -105,12 +121,22 @@ class BaseCallData { grpc_call_element* elem() const { return elem_; } CallCombiner* call_combiner() const { return call_combiner_; } Timestamp deadline() const { return deadline_; } + grpc_call_stack* call_stack() const { return call_stack_; } private: + // Wakeable implementation. + void Wakeup() final; + void Drop() final; + + virtual void OnWakeup() = 0; + + grpc_call_stack* const call_stack_; grpc_call_element* const elem_; Arena* const arena_; CallCombiner* const call_combiner_; const Timestamp deadline_; + grpc_call_context_element* const context_; + grpc_polling_entity* pollent_ = nullptr; }; // Specific call data per channel filter. @@ -130,11 +156,17 @@ class CallData : public BaseCallData { grpc_schedule_on_exec_ctx); } - ~CallData() { + ~CallData() override { GPR_ASSERT(!is_polling_); GRPC_ERROR_UNREF(cancelled_error_); } + // Activity implementation. + void ForceImmediateRepoll() final { + GPR_ASSERT(is_polling_); + repoll_ = true; + } + // Handle one grpc_transport_stream_op_batch void StartBatch(grpc_transport_stream_op_batch* batch) { // Fake out the activity based context. @@ -235,9 +267,23 @@ class CallData : public BaseCallData { if (recv_trailing_state_ == RecvTrailingState::kQueued) { recv_trailing_state_ = RecvTrailingState::kCancelled; } - grpc_transport_stream_op_batch_finish_with_failure( - absl::exchange(send_initial_metadata_batch_, nullptr), - GRPC_ERROR_REF(cancelled_error_), call_combiner()); + struct FailBatch : public grpc_closure { + grpc_transport_stream_op_batch* batch; + CallCombiner* call_combiner; + }; + auto fail = [](void* p, grpc_error_handle error) { + auto* f = static_cast(p); + grpc_transport_stream_op_batch_finish_with_failure( + f->batch, GRPC_ERROR_REF(error), f->call_combiner); + delete f; + }; + auto* b = new FailBatch(); + GRPC_CLOSURE_INIT(b, fail, b, nullptr); + b->batch = absl::exchange(send_initial_metadata_batch_, nullptr); + b->call_combiner = call_combiner(); + GRPC_CALL_COMBINER_START(call_combiner(), b, + GRPC_ERROR_REF(cancelled_error_), + "cancel pending batch"); } else { send_initial_state_ = SendInitialState::kCancelled; } @@ -250,12 +296,15 @@ class CallData : public BaseCallData { ChannelFilter* filter = static_cast(elem()->channel_data); // Construct the promise. - promise_ = filter->MakeCallPromise( - WrapMetadata(send_initial_metadata_batch_->payload - ->send_initial_metadata.send_initial_metadata), - [this](ClientInitialMetadata initial_metadata) { - return MakeNextPromise(std::move(initial_metadata)); - }); + { + ScopedActivity activity(this); + promise_ = filter->MakeCallPromise( + WrapMetadata(send_initial_metadata_batch_->payload + ->send_initial_metadata.send_initial_metadata), + [this](ClientInitialMetadata initial_metadata) { + return MakeNextPromise(std::move(initial_metadata)); + }); + } // Poll once. WakeInsideCombiner(); } @@ -366,24 +415,75 @@ class CallData : public BaseCallData { GPR_ASSERT(!is_polling_); grpc_closure* call_closure = nullptr; is_polling_ = true; + grpc_error_handle cancel_send_initial_metadata_error = GRPC_ERROR_NONE; + grpc_transport_stream_op_batch* forward_batch = nullptr; switch (send_initial_state_) { case SendInitialState::kQueued: case SendInitialState::kForwarded: { // Poll the promise once since we're waiting for it. - Poll poll = promise_(); + Poll poll; + { + ScopedActivity activity(this); + poll = promise_(); + } if (auto* r = absl::get_if(&poll)) { - GPR_ASSERT(recv_trailing_state_ == RecvTrailingState::kComplete); - GPR_ASSERT(recv_trailing_metadata_ == UnwrapMetadata(std::move(*r))); - recv_trailing_state_ = RecvTrailingState::kResponded; - call_closure = - absl::exchange(original_recv_trailing_metadata_ready_, nullptr); + promise_ = ArenaPromise(); + auto* md = UnwrapMetadata(std::move(*r)); + bool destroy_md = true; + switch (recv_trailing_state_) { + case RecvTrailingState::kComplete: + if (recv_trailing_metadata_ != md) { + *recv_trailing_metadata_ = std::move(*md); + } else { + destroy_md = false; + } + recv_trailing_state_ = RecvTrailingState::kResponded; + call_closure = absl::exchange( + original_recv_trailing_metadata_ready_, nullptr); + break; + case RecvTrailingState::kQueued: + case RecvTrailingState::kForwarded: { + GPR_ASSERT(*md->get_pointer(GrpcStatusMetadata()) != + GRPC_STATUS_OK); + grpc_error_handle error = grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "early return from promise based filter"), + GRPC_ERROR_INT_GRPC_STATUS, + *md->get_pointer(GrpcStatusMetadata())); + if (auto* message = md->get_pointer(GrpcMessageMetadata())) { + error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, + message->as_string_view()); + } + if (recv_trailing_state_ == RecvTrailingState::kQueued) { + GPR_ASSERT(send_initial_state_ == SendInitialState::kQueued); + send_initial_state_ = SendInitialState::kCancelled; + cancel_send_initial_metadata_error = error; + } else { + call_combiner()->Cancel(GRPC_ERROR_REF(error)); + forward_batch = + grpc_make_transport_stream_op(GRPC_CLOSURE_CREATE( + [](void*, grpc_error_handle) {}, nullptr, nullptr)); + forward_batch->cancel_stream = true; + forward_batch->payload->cancel_stream.cancel_error = error; + } + recv_trailing_state_ = RecvTrailingState::kCancelled; + } break; + case RecvTrailingState::kInitial: + abort(); // unimplemented + case RecvTrailingState::kResponded: + case RecvTrailingState::kCancelled: + abort(); // unreachable + } + if (destroy_md) { + md->~grpc_metadata_batch(); + } } } break; case SendInitialState::kInitial: case SendInitialState::kCancelled: - // If we get a response without sending anything, we just propagate that - // up. (note: that situation isn't possible once we finish the promise - // transition). + // If we get a response without sending anything, we just propagate + // that up. (note: that situation isn't possible once we finish the + // promise transition). if (recv_trailing_state_ == RecvTrailingState::kComplete) { recv_trailing_state_ = RecvTrailingState::kResponded; call_closure = @@ -391,14 +491,64 @@ class CallData : public BaseCallData { } break; } + GRPC_CALL_STACK_REF(call_stack(), "finish_poll"); is_polling_ = false; + bool in_combiner = true; + bool repoll = absl::exchange(repoll_, false); + if (forward_batch != nullptr) { + GPR_ASSERT(in_combiner); + in_combiner = false; + forward_send_initial_metadata_ = false; + grpc_call_next_op(elem(), forward_batch); + } + if (cancel_send_initial_metadata_error != GRPC_ERROR_NONE) { + GPR_ASSERT(in_combiner); + forward_send_initial_metadata_ = false; + in_combiner = false; + grpc_transport_stream_op_batch_finish_with_failure( + absl::exchange(send_initial_metadata_batch_, nullptr), + cancel_send_initial_metadata_error, call_combiner()); + } if (absl::exchange(forward_send_initial_metadata_, false)) { + GPR_ASSERT(in_combiner); + in_combiner = false; grpc_call_next_op(elem(), absl::exchange(send_initial_metadata_batch_, nullptr)); } if (call_closure != nullptr) { + GPR_ASSERT(in_combiner); + in_combiner = false; Closure::Run(DEBUG_LOCATION, call_closure, GRPC_ERROR_NONE); } + if (repoll) { + if (in_combiner) { + WakeInsideCombiner(); + } else { + struct NextPoll : public grpc_closure { + grpc_call_stack* call_stack; + CallData* call_data; + }; + auto run = [](void* p, grpc_error_handle) { + auto* next_poll = static_cast(p); + next_poll->call_data->WakeInsideCombiner(); + GRPC_CALL_STACK_UNREF(next_poll->call_stack, "re-poll"); + delete next_poll; + }; + auto* p = new NextPoll; + GRPC_CALL_STACK_REF(call_stack(), "re-poll"); + GRPC_CLOSURE_INIT(p, run, p, nullptr); + GRPC_CALL_COMBINER_START(call_combiner(), p, GRPC_ERROR_NONE, + "re-poll"); + } + } else if (in_combiner) { + GRPC_CALL_COMBINER_STOP(call_combiner(), "poll paused"); + } + GRPC_CALL_STACK_UNREF(call_stack(), "finish_poll"); + } + + void OnWakeup() override { + ScopedContext context(this); + WakeInsideCombiner(); } // Contained promise @@ -419,6 +569,8 @@ class CallData : public BaseCallData { RecvTrailingState recv_trailing_state_ = RecvTrailingState::kInitial; // Whether we're currently polling the promise. bool is_polling_ = false; + // Should we repoll after completing polling? + bool repoll_ = false; // Whether we should forward send initial metadata after polling? bool forward_send_initial_metadata_ = false; }; @@ -434,18 +586,21 @@ class CallData : public BaseCallData { grpc_schedule_on_exec_ctx); } - ~CallData() { + ~CallData() override { GPR_ASSERT(!is_polling_); GRPC_ERROR_UNREF(cancelled_error_); } + // Activity implementation. + void ForceImmediateRepoll() final { abort(); } // Not implemented. + // Handle one grpc_transport_stream_op_batch void StartBatch(grpc_transport_stream_op_batch* batch) { // Fake out the activity based context. ScopedContext context(this); - // If this is a cancel stream, cancel anything we have pending and propagate - // the cancellation. + // If this is a cancel stream, cancel anything we have pending and + // propagate the cancellation. if (batch->cancel_stream) { GPR_ASSERT(!batch->send_initial_metadata && !batch->send_trailing_metadata && !batch->send_message && @@ -535,9 +690,23 @@ class CallData : public BaseCallData { promise_ = ArenaPromise(); if (send_trailing_state_ == SendTrailingState::kQueued) { send_trailing_state_ = SendTrailingState::kCancelled; - grpc_transport_stream_op_batch_finish_with_failure( - absl::exchange(send_trailing_metadata_batch_, nullptr), - GRPC_ERROR_REF(cancelled_error_), call_combiner()); + struct FailBatch : public grpc_closure { + grpc_transport_stream_op_batch* batch; + CallCombiner* call_combiner; + }; + auto fail = [](void* p, grpc_error_handle error) { + auto* f = static_cast(p); + grpc_transport_stream_op_batch_finish_with_failure( + f->batch, GRPC_ERROR_REF(error), f->call_combiner); + delete f; + }; + auto* b = new FailBatch(); + GRPC_CLOSURE_INIT(b, fail, b, nullptr); + b->batch = absl::exchange(send_trailing_metadata_batch_, nullptr); + b->call_combiner = call_combiner(); + GRPC_CALL_COMBINER_START(call_combiner(), b, + GRPC_ERROR_REF(cancelled_error_), + "cancel pending batch"); } else { send_trailing_state_ = SendTrailingState::kCancelled; } @@ -623,7 +792,11 @@ class CallData : public BaseCallData { bool forward_send_trailing_metadata = false; is_polling_ = true; if (recv_initial_state_ == RecvInitialState::kComplete) { - Poll poll = promise_(); + Poll poll; + { + ScopedActivity activity(this); + poll = promise_(); + } if (auto* r = absl::get_if(&poll)) { auto* md = UnwrapMetadata(std::move(*r)); bool destroy_md = true; @@ -672,6 +845,8 @@ class CallData : public BaseCallData { } } + void OnWakeup() override { abort(); } // not implemented + // Contained promise ArenaPromise promise_; // Pointer to where initial metadata will be stored. @@ -736,7 +911,9 @@ MakePromiseBasedFilter(const char* name) { return GRPC_ERROR_NONE; }, // set_pollset_or_pollset_set - grpc_call_stack_ignore_set_pollset_or_pollset_set, + [](grpc_call_element* elem, grpc_polling_entity* pollent) { + static_cast(elem->call_data)->set_pollent(pollent); + }, // destroy_call_elem [](grpc_call_element* elem, const grpc_call_final_info*, grpc_closure*) { static_cast(elem->call_data)->~CallData(); diff --git a/src/core/lib/http/httpcli_security_connector.cc b/src/core/lib/http/httpcli_security_connector.cc index 5defca225b6..6a3158906fb 100644 --- a/src/core/lib/http/httpcli_security_connector.cc +++ b/src/core/lib/http/httpcli_security_connector.cc @@ -32,6 +32,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/pollset.h" +#include "src/core/lib/promise/promise.h" #include "src/core/lib/security/credentials/credentials.h" #include "src/core/lib/security/security_connector/ssl_utils.h" #include "src/core/lib/security/transport/security_handshaker.h" @@ -116,17 +117,9 @@ class grpc_httpcli_ssl_channel_security_connector final return strcmp(secure_peer_name_, other->secure_peer_name_); } - bool check_call_host(absl::string_view /*host*/, - grpc_auth_context* /*auth_context*/, - grpc_closure* /*on_call_host_checked*/, - grpc_error_handle* error) override { - *error = GRPC_ERROR_NONE; - return true; - } - - void cancel_check_call_host(grpc_closure* /*on_call_host_checked*/, - grpc_error_handle error) override { - GRPC_ERROR_UNREF(error); + ArenaPromise CheckCallHost(absl::string_view, + grpc_auth_context*) override { + return ImmediateOkStatus(); } const char* secure_peer_name() const { return secure_peer_name_; } diff --git a/src/core/lib/iomgr/polling_entity.h b/src/core/lib/iomgr/polling_entity.h index 6f4c5bdd665..e177176d614 100644 --- a/src/core/lib/iomgr/polling_entity.h +++ b/src/core/lib/iomgr/polling_entity.h @@ -23,6 +23,7 @@ #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/promise/context.h" typedef enum grpc_pollset_tag { GRPC_POLLS_NONE, @@ -65,4 +66,9 @@ void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent, void grpc_polling_entity_del_from_pollset_set(grpc_polling_entity* pollent, grpc_pollset_set* pss_dst); +namespace grpc_core { +template <> +struct ContextType {}; +} // namespace grpc_core + #endif /* GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H */ diff --git a/src/core/lib/promise/activity.cc b/src/core/lib/promise/activity.cc index 00b11b05271..adb35272264 100644 --- a/src/core/lib/promise/activity.cc +++ b/src/core/lib/promise/activity.cc @@ -28,14 +28,16 @@ namespace grpc_core { GPR_THREAD_LOCAL(Activity*) Activity::g_current_activity_{nullptr}; Waker::Unwakeable Waker::unwakeable_; +namespace promise_detail { + /////////////////////////////////////////////////////////////////////////////// // HELPER TYPES // Weak handle to an Activity. // Handle can persist while Activity goes away. -class Activity::Handle final : public Wakeable { +class FreestandingActivity::Handle final : public Wakeable { public: - explicit Handle(Activity* activity) : activity_(activity) {} + explicit Handle(FreestandingActivity* activity) : activity_(activity) {} // Ref the Handle (not the activity). void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); } @@ -57,7 +59,7 @@ class Activity::Handle final : public Wakeable { // against DropActivity, so we need to only increase activities refcount if // it is non-zero. if (activity_ && activity_->RefIfNonzero()) { - Activity* activity = activity_; + FreestandingActivity* activity = activity_; mu_.Unlock(); // Activity still exists and we have a reference: wake it up, which will // drop the ref. @@ -85,15 +87,15 @@ class Activity::Handle final : public Wakeable { // activity. std::atomic refs_{2}; Mutex mu_ ABSL_ACQUIRED_AFTER(activity_->mu_); - Activity* activity_ ABSL_GUARDED_BY(mu_); + FreestandingActivity* activity_ ABSL_GUARDED_BY(mu_); }; /////////////////////////////////////////////////////////////////////////////// // ACTIVITY IMPLEMENTATION -bool Activity::RefIfNonzero() { return IncrementIfNonzero(&refs_); } +bool FreestandingActivity::RefIfNonzero() { return IncrementIfNonzero(&refs_); } -Activity::Handle* Activity::RefHandle() { +FreestandingActivity::Handle* FreestandingActivity::RefHandle() { if (handle_ == nullptr) { // No handle created yet - construct it and return it. handle_ = new Handle(this); @@ -105,11 +107,15 @@ Activity::Handle* Activity::RefHandle() { } } -void Activity::DropHandle() { +void FreestandingActivity::DropHandle() { handle_->DropActivity(); handle_ = nullptr; } -Waker Activity::MakeNonOwningWaker() { return Waker(RefHandle()); } +Waker FreestandingActivity::MakeNonOwningWaker() { + mu_.AssertHeld(); + return Waker(RefHandle()); +} +} // namespace promise_detail } // namespace grpc_core diff --git a/src/core/lib/promise/activity.h b/src/core/lib/promise/activity.h index 0743ec2d02e..01aed8b3e20 100644 --- a/src/core/lib/promise/activity.h +++ b/src/core/lib/promise/activity.h @@ -37,6 +37,7 @@ #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gprpp/construct_destruct.h" +#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/promise/detail/promise_factory.h" @@ -110,22 +111,8 @@ class Waker { // Activity execution may be cancelled by simply deleting the activity. In such // a case, if execution had not already finished, the done callback would be // called with absl::CancelledError(). -class Activity : private Wakeable { +class Activity : public Orphanable { public: - // Cancel execution of the underlying promise. - virtual void Cancel() ABSL_LOCKS_EXCLUDED(mu_) = 0; - - // Destroy the Activity - used for the type alias ActivityPtr. - struct Deleter { - void operator()(Activity* activity) { - activity->Cancel(); - activity->Unref(); - } - }; - - // Fetch the size of the implementation of this activity. - virtual size_t Size() = 0; - // Force wakeup from the outside. // This should be rarely needed, and usages should be accompanied with a note // on why it's not possible to wakeup with a Waker object. @@ -133,11 +120,8 @@ class Activity : private Wakeable { // an Activity to repoll. void ForceWakeup() { MakeOwningWaker().Wakeup(); } - // Wakeup the current threads activity - will force a subsequent poll after - // the one that's running. - static void WakeupCurrent() { - current()->SetActionDuringRun(ActionDuringRun::kWakeup); - } + // Force the current activity to immediately repoll if it doesn't complete. + virtual void ForceImmediateRepoll() = 0; // Return the current activity. // Additionally: @@ -146,58 +130,23 @@ class Activity : private Wakeable { // locked // - back up that assertation with a runtime check in debug builds (it's // prohibitively expensive in non-debug builds) - static Activity* current() ABSL_ASSERT_EXCLUSIVE_LOCK(current()->mu_) { -#ifndef NDEBUG - GPR_ASSERT(g_current_activity_); - if (g_current_activity_ != nullptr) { - g_current_activity_->mu_.AssertHeld(); - } -#endif - return g_current_activity_; - } + static Activity* current() { return g_current_activity_; } // Produce an activity-owning Waker. The produced waker will keep the activity // alive until it's awoken or dropped. - Waker MakeOwningWaker() { - Ref(); - return Waker(this); - } + virtual Waker MakeOwningWaker() = 0; // Produce a non-owning Waker. The waker will own a small heap allocated weak // pointer to this activity. This is more suitable for wakeups that may not be // delivered until long after the activity should be destroyed. - Waker MakeNonOwningWaker() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + virtual Waker MakeNonOwningWaker() = 0; protected: - // Action received during a run, in priority order. - // If more than one action is received during a run, we use max() to resolve - // which one to report (so Cancel overrides Wakeup). - enum class ActionDuringRun : uint8_t { - kNone, // No action occured during run. - kWakeup, // A wakeup occured during run. - kCancel, // Cancel was called during run. - }; - - inline virtual ~Activity() { - if (handle_) { - DropHandle(); - } - } - - // All promise execution occurs under this mutex. - Mutex mu_; - // Check if this activity is the current activity executing on the current // thread. bool is_current() const { return this == g_current_activity_; } // Check if there is an activity executing on the current thread. static bool have_current() { return g_current_activity_ != nullptr; } - // Check if we got an internal wakeup since the last time this function was - // called. - ActionDuringRun GotActionDuringRun() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { - return absl::exchange(action_during_run_, ActionDuringRun::kNone); - } - // Set the current activity at construction, clean it up at destruction. class ScopedActivity { public: @@ -210,58 +159,14 @@ class Activity : private Wakeable { ScopedActivity& operator=(const ScopedActivity&) = delete; }; - // Implementors of Wakeable::Wakeup should call this after the wakeup has - // completed. - void WakeupComplete() { Unref(); } - - // Mark the current activity as being cancelled (so we can actually cancel it - // after polling). - void CancelCurrent() { - current()->SetActionDuringRun(ActionDuringRun::kCancel); - } - private: - class Handle; - - void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); } - void Unref() { - if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) { - delete this; - } - } - - // Return a Handle instance with a ref so that it can be stored waiting for - // some wakeup. - Handle* RefHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - // If our refcount is non-zero, ref and return true. - // Otherwise, return false. - bool RefIfNonzero(); - // Drop the (proved existing) wait handle. - void DropHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - // Set the action that occured during this run. - // We use max to combine actions so that cancellation overrides wakeups. - void SetActionDuringRun(ActionDuringRun action) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { - action_during_run_ = std::max(action_during_run_, action); - } - - // Current refcount. - std::atomic refs_{1}; - // If wakeup is called during Promise polling, we set this to Wakeup and - // repoll. If cancel is called during Promise polling, we set this to Cancel - // and cancel at the end of polling. - ActionDuringRun action_during_run_ ABSL_GUARDED_BY(mu_) = - ActionDuringRun::kNone; - // Handle for long waits. Allows a very small weak pointer type object to - // queue for wakeups while Activity may be deleted earlier. - Handle* handle_ ABSL_GUARDED_BY(mu_) = nullptr; // Set during RunLoop to the Activity that's executing. // Being set implies that mu_ is held. static GPR_THREAD_LOCAL(Activity*) g_current_activity_; }; // Owned pointer to one Activity. -using ActivityPtr = std::unique_ptr; +using ActivityPtr = OrphanablePtr; namespace promise_detail { @@ -320,6 +225,107 @@ class ActivityContexts : public ContextHolder... { }; }; +// A free standing activity: an activity that owns its own synchronization and +// memory. +// The alternative is an activity that's somehow tied into another system, for +// instance the type seen in promise_based_filter.h as we're transitioning from +// the old filter stack to the new system. +// FreestandingActivity is-a Wakeable, but needs to increment a refcount before +// returning that Wakeable interface. Additionally, we want to keep +// FreestandingActivity as small as is possible, since it will be used +// everywhere. So we use inheritance to provide the Wakeable interface: this +// makes it zero sized, and we make the inheritance private to prevent +// accidental casting. +class FreestandingActivity : public Activity, private Wakeable { + public: + Waker MakeOwningWaker() final { + Ref(); + return Waker(this); + } + Waker MakeNonOwningWaker() final; + + void Orphan() final { + Cancel(); + Unref(); + } + + void ForceImmediateRepoll() final { + mu_.AssertHeld(); + SetActionDuringRun(ActionDuringRun::kWakeup); + } + + protected: + // Action received during a run, in priority order. + // If more than one action is received during a run, we use max() to resolve + // which one to report (so Cancel overrides Wakeup). + enum class ActionDuringRun : uint8_t { + kNone, // No action occured during run. + kWakeup, // A wakeup occured during run. + kCancel, // Cancel was called during run. + }; + + inline ~FreestandingActivity() override { + if (handle_) { + DropHandle(); + } + } + + // Check if we got an internal wakeup since the last time this function was + // called. + ActionDuringRun GotActionDuringRun() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + return absl::exchange(action_during_run_, ActionDuringRun::kNone); + } + + // Implementors of Wakeable::Wakeup should call this after the wakeup has + // completed. + void WakeupComplete() { Unref(); } + + // Set the action that occured during this run. + // We use max to combine actions so that cancellation overrides wakeups. + void SetActionDuringRun(ActionDuringRun action) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + action_during_run_ = std::max(action_during_run_, action); + } + + Mutex* mu() ABSL_LOCK_RETURNED(mu_) { return &mu_; } + + private: + class Handle; + + // Cancel execution of the underlying promise. + virtual void Cancel() = 0; + + void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); } + void Unref() { + if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) { + delete this; + } + } + + // Return a Handle instance with a ref so that it can be stored waiting for + // some wakeup. + Handle* RefHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + // If our refcount is non-zero, ref and return true. + // Otherwise, return false. + bool RefIfNonzero(); + // Drop the (proved existing) wait handle. + void DropHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + + // All promise execution occurs under this mutex. + Mutex mu_; + + // Current refcount. + std::atomic refs_{1}; + // If wakeup is called during Promise polling, we set this to Wakeup and + // repoll. If cancel is called during Promise polling, we set this to Cancel + // and cancel at the end of polling. + ActionDuringRun action_during_run_ ABSL_GUARDED_BY(mu_) = + ActionDuringRun::kNone; + // Handle for long waits. Allows a very small weak pointer type object to + // queue for wakeups while Activity may be deleted earlier. + Handle* handle_ ABSL_GUARDED_BY(mu_) = nullptr; +}; + // Implementation details for an Activity of an arbitrary type of promise. // There should exist a static function: // struct WakeupScheduler { @@ -331,8 +337,12 @@ class ActivityContexts : public ContextHolder... { // It can assume that activity will remain live until RunScheduledWakeup() is // invoked, and that a given activity will not be concurrently scheduled again // until its RunScheduledWakeup() has been invoked. +// We use private inheritance here as a way of getting private members for +// each of the contexts. +// TODO(ctiller): We can probably reconsider the private inheritance here +// when we move away from C++11 and have more powerful template features. template -class PromiseActivity final : public Activity, +class PromiseActivity final : public FreestandingActivity, private ActivityContexts { public: using Factory = PromiseFactory; @@ -340,7 +350,7 @@ class PromiseActivity final : public Activity, PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler, OnDone on_done, Contexts&&... contexts) - : Activity(), + : FreestandingActivity(), ActivityContexts(std::forward(contexts)...), wakeup_scheduler_(std::move(wakeup_scheduler)), on_done_(std::move(on_done)) { @@ -348,9 +358,9 @@ class PromiseActivity final : public Activity, // This may hit a waiter, which could expose our this pointer to other // threads, meaning we do need to hold this mutex even though we're still // constructing. - mu_.Lock(); + mu()->Lock(); auto status = Start(Factory(std::move(promise_factory))); - mu_.Unlock(); + mu()->Unlock(); // We may complete immediately. if (status.has_value()) { on_done_(std::move(*status)); @@ -364,16 +374,24 @@ class PromiseActivity final : public Activity, GPR_ASSERT(done_); } - size_t Size() override { return sizeof(*this); } + void RunScheduledWakeup() { + GPR_ASSERT(wakeup_scheduled_.exchange(false, std::memory_order_acq_rel)); + Step(); + WakeupComplete(); + } + + private: + using typename ActivityContexts::ScopedContext; void Cancel() final { if (Activity::is_current()) { - CancelCurrent(); + mu()->AssertHeld(); + SetActionDuringRun(ActionDuringRun::kCancel); return; } bool was_done; { - MutexLock lock(&mu_); + MutexLock lock(mu()); // Check if we were done, and flag done. was_done = done_; if (!done_) MarkDone(); @@ -384,15 +402,6 @@ class PromiseActivity final : public Activity, } } - void RunScheduledWakeup() { - GPR_ASSERT(wakeup_scheduled_.exchange(false, std::memory_order_acq_rel)); - Step(); - WakeupComplete(); - } - - private: - using typename ActivityContexts::ScopedContext; - // Wakeup this activity. Arrange to poll the activity again at a convenient // time: this could be inline if it's deemed safe, or it could be by passing // the activity to an external threadpool to run. If the activity is already @@ -402,7 +411,8 @@ class PromiseActivity final : public Activity, // If there is an active activity, but hey it's us, flag that and we'll loop // in RunLoop (that's calling from above here!). if (Activity::is_current()) { - WakeupCurrent(); + mu()->AssertHeld(); + SetActionDuringRun(ActionDuringRun::kWakeup); WakeupComplete(); return; } @@ -420,7 +430,7 @@ class PromiseActivity final : public Activity, // Notification that we're no longer executing - it's ok to destruct the // promise. - void MarkDone() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + void MarkDone() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { GPR_ASSERT(!done_); done_ = true; Destruct(&promise_holder_.promise); @@ -428,16 +438,16 @@ class PromiseActivity final : public Activity, // In response to Wakeup, run the Promise state machine again until it // settles. Then check for completion, and if we have completed, call on_done. - void Step() ABSL_LOCKS_EXCLUDED(mu_) { + void Step() ABSL_LOCKS_EXCLUDED(mu()) { // Poll the promise until things settle out under a lock. - mu_.Lock(); + mu()->Lock(); if (done_) { // We might get some spurious wakeups after finishing. - mu_.Unlock(); + mu()->Unlock(); return; } auto status = RunStep(); - mu_.Unlock(); + mu()->Unlock(); if (status.has_value()) { on_done_(std::move(*status)); } @@ -446,7 +456,7 @@ class PromiseActivity final : public Activity, // The main body of a step: set the current activity, and any contexts, and // then run the main polling loop. Contained in a function by itself in // order to keep the scoping rules a little easier in Step(). - absl::optional RunStep() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + absl::optional RunStep() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { ScopedActivity scoped_activity(this); ScopedContext contexts(this); return StepLoop(); @@ -456,7 +466,7 @@ class PromiseActivity final : public Activity, // promise factory before entering the main loop. Called once from the // constructor. absl::optional Start(Factory promise_factory) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { ScopedActivity scoped_activity(this); ScopedContext contexts(this); Construct(&promise_holder_.promise, promise_factory.Once()); @@ -465,7 +475,7 @@ class PromiseActivity final : public Activity, // Until there are no wakeups from within and the promise is incomplete: // poll the promise. - absl::optional StepLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + absl::optional StepLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { GPR_ASSERT(is_current()); while (true) { // Run the promise. @@ -495,7 +505,7 @@ class PromiseActivity final : public Activity, // Callback on completion of the promise. GPR_NO_UNIQUE_ADDRESS OnDone on_done_; // Has execution completed? - GPR_NO_UNIQUE_ADDRESS bool done_ ABSL_GUARDED_BY(mu_) = false; + GPR_NO_UNIQUE_ADDRESS bool done_ ABSL_GUARDED_BY(mu()) = false; // Is there a wakeup scheduled? GPR_NO_UNIQUE_ADDRESS std::atomic wakeup_scheduled_{false}; // We wrap the promise in a union to allow control over the construction @@ -506,7 +516,7 @@ class PromiseActivity final : public Activity, ~PromiseHolder() {} GPR_NO_UNIQUE_ADDRESS Promise promise; }; - GPR_NO_UNIQUE_ADDRESS PromiseHolder promise_holder_ ABSL_GUARDED_BY(mu_); + GPR_NO_UNIQUE_ADDRESS PromiseHolder promise_holder_ ABSL_GUARDED_BY(mu()); }; } // namespace promise_detail diff --git a/src/core/lib/promise/arena_promise.h b/src/core/lib/promise/arena_promise.h index 5a1bc3fa521..14db9843d9e 100644 --- a/src/core/lib/promise/arena_promise.h +++ b/src/core/lib/promise/arena_promise.h @@ -69,7 +69,7 @@ class CallableImpl final : public ImplInterface { public: explicit CallableImpl(Callable&& callable) : callable_(std::move(callable)) {} // Forward polls to the callable object. - Poll PollOnce() override { return callable_(); } + Poll PollOnce() override { return poll_cast(callable_()); } // Destroy destructs the callable object. void Destroy() override { this->~CallableImpl(); } @@ -152,7 +152,8 @@ class ArenaPromise { template ::value>> - explicit ArenaPromise(Callable&& callable) + // NOLINTNEXTLINE(google-explicit-constructor) + ArenaPromise(Callable&& callable) : impl_(arena_promise_detail::MakeImplForCallable( std::forward(callable))) {} @@ -164,6 +165,7 @@ class ArenaPromise { other.impl_ = arena_promise_detail::NullImpl::Get(); } ArenaPromise& operator=(ArenaPromise&& other) noexcept { + impl_->Destroy(); impl_ = other.impl_; other.impl_ = arena_promise_detail::NullImpl::Get(); return *this; diff --git a/src/core/lib/promise/detail/basic_seq.h b/src/core/lib/promise/detail/basic_seq.h index 59ac80f79dc..d3029f10ee1 100644 --- a/src/core/lib/promise/detail/basic_seq.h +++ b/src/core/lib/promise/detail/basic_seq.h @@ -401,6 +401,95 @@ class BasicSeq { } }; +// As above, but models a sequence of unknown size +// At each element, the accumulator A and the current value V is passed to some +// function of type F as f(V, A); f is expected to return a promise that +// resolves to Traits::WrappedType. +template