From 67d6b8ea748433109a6d1c91ea9a8d513bf3e99d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 18 Jan 2024 16:00:28 -0800 Subject: [PATCH] [promise] Context improvements (#35592) A few improvements to the promise context system (more coming) Allow subclassed contexts: If we have multiple different kinds of a base context, allow `GetContext()` to mean `down_cast(GetContext())` everywhere for brevity. Allow custom context lookup: For a base context type, allow customization of how that context is looked up. These two together allow: 1. normalization of activity lookup and context lookup to the same syntax (so we can write `GetContext()` everywhere now 2. Party & Activity to share a context, so that anywhere we need to do a party specific operation we can write `GetContext()->...` and safely know that it's the current activity *and* it's a party. Closes #35592 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35592 from ctiller:contextual-types 37ef948a36eb1864333f6bfa6ce3632e30934993 PiperOrigin-RevId: 599651708 --- CMakeLists.txt | 26 ++++++++-- Package.swift | 1 + build_autogenerated.yaml | 35 +++++++++++++ gRPC-C++.podspec | 3 ++ gRPC-Core.podspec | 3 ++ grpc.gemspec | 1 + grpc.gyp | 3 ++ package.xml | 1 + src/core/BUILD | 6 ++- .../filters/client_channel/client_channel.cc | 10 ++-- .../legacy_compression_filter.cc | 4 +- .../filters/http/server/http_server_filter.cc | 2 +- .../message_size/message_size_filter.cc | 4 +- src/core/lib/channel/connected_channel.cc | 25 ++++----- src/core/lib/channel/promise_based_filter.cc | 8 +-- src/core/lib/promise/activity.h | 10 +++- src/core/lib/promise/context.h | 52 ++++++++++++++++--- src/core/lib/promise/for_each.h | 2 +- src/core/lib/promise/inter_activity_latch.h | 5 +- src/core/lib/promise/inter_activity_pipe.h | 4 +- src/core/lib/promise/latch.h | 6 +-- src/core/lib/promise/mpsc.h | 4 +- src/core/lib/promise/party.h | 11 ++-- src/core/lib/promise/pipe.h | 4 +- src/core/lib/promise/sleep.cc | 2 +- src/core/lib/promise/wait_for_callback.h | 2 +- src/core/lib/resource_quota/memory_quota.cc | 6 +-- .../credentials/oauth2/oauth2_credentials.cc | 3 +- .../credentials/plugin/plugin_credentials.h | 2 +- .../transport/legacy_server_auth_filter.cc | 4 +- .../security/transport/server_auth_filter.cc | 4 +- src/core/lib/surface/call_trace.cc | 18 +++---- src/core/lib/surface/server.cc | 2 +- src/core/lib/surface/wait_for_cq_end_op.h | 2 +- src/core/lib/transport/batch_builder.cc | 5 +- src/core/lib/transport/batch_builder.h | 2 +- src/core/lib/transport/promise_endpoint.h | 4 +- src/core/lib/transport/transport.h | 32 ++++++------ test/core/filters/filter_test.cc | 8 +-- test/core/filters/filter_test_test.cc | 2 +- test/core/promise/activity_test.cc | 4 +- test/core/promise/context_test.cc | 39 ++++++++++++++ test/core/promise/for_each_test.cc | 2 +- test/core/promise/map_pipe_test.cc | 2 +- test/core/promise/party_test.cc | 36 ++++++------- test/core/promise/promise_fuzzer.cc | 4 +- tools/doxygen/Doxyfile.c++.internal | 1 + tools/doxygen/Doxyfile.core.internal | 1 + 48 files changed, 289 insertions(+), 128 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index effff7e6f67..6e6c48f8c9a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2617,6 +2617,7 @@ target_link_libraries(grpc ${_gRPC_RE2_LIBRARIES} ${_gRPC_ZLIB_LIBRARIES} absl::algorithm_container + absl::config absl::cleanup absl::flat_hash_map absl::flat_hash_set @@ -3317,6 +3318,7 @@ target_link_libraries(grpc_unsecure utf8_range_lib ${_gRPC_ZLIB_LIBRARIES} absl::algorithm_container + absl::config absl::cleanup absl::flat_hash_map absl::flat_hash_set @@ -5395,6 +5397,7 @@ target_link_libraries(grpc_authorization_provider ${_gRPC_RE2_LIBRARIES} utf8_range_lib ${_gRPC_ZLIB_LIBRARIES} + absl::config absl::cleanup absl::flat_hash_map absl::flat_hash_set @@ -5906,6 +5909,7 @@ target_include_directories(activity_test target_link_libraries(activity_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + absl::config absl::hash absl::type_traits absl::statusor @@ -8475,6 +8479,7 @@ target_link_libraries(call_filters_test gtest upb_message_lib utf8_range_lib + absl::config absl::inlined_vector absl::function_ref absl::hash @@ -10059,6 +10064,7 @@ target_link_libraries(chunked_vector_test gtest upb_message_lib utf8_range_lib + absl::config absl::function_ref absl::hash absl::type_traits @@ -11425,6 +11431,8 @@ target_include_directories(context_test target_link_libraries(context_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + absl::config + absl::type_traits gpr ) @@ -13146,6 +13154,7 @@ target_link_libraries(exec_ctx_wakeup_scheduler_test gtest upb_message_lib utf8_range_lib + absl::config absl::hash absl::type_traits absl::statusor @@ -13973,6 +13982,7 @@ target_link_libraries(flow_control_test gtest upb_message_lib utf8_range_lib + absl::config absl::function_ref absl::hash absl::type_traits @@ -14058,6 +14068,7 @@ target_link_libraries(for_each_test gtest upb_message_lib utf8_range_lib + absl::config absl::function_ref absl::hash absl::type_traits @@ -17331,6 +17342,7 @@ target_include_directories(inter_activity_pipe_test target_link_libraries(inter_activity_pipe_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + absl::config absl::hash absl::type_traits absl::statusor @@ -17414,6 +17426,7 @@ target_link_libraries(interceptor_list_test gtest upb_message_lib utf8_range_lib + absl::config absl::function_ref absl::hash absl::type_traits @@ -18144,6 +18157,7 @@ target_include_directories(latch_test target_link_libraries(latch_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + absl::config absl::hash absl::type_traits absl::statusor @@ -18536,6 +18550,7 @@ target_link_libraries(map_pipe_test gtest upb_message_lib utf8_range_lib + absl::config absl::function_ref absl::hash absl::type_traits @@ -19408,6 +19423,7 @@ target_include_directories(mpsc_test target_link_libraries(mpsc_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + absl::config absl::hash absl::type_traits absl::statusor @@ -21658,6 +21674,7 @@ target_include_directories(promise_mutex_test target_link_libraries(promise_mutex_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + absl::config absl::hash absl::type_traits absl::statusor @@ -30760,6 +30777,7 @@ target_include_directories(wait_for_callback_test target_link_libraries(wait_for_callback_test ${_gRPC_ALLTARGETS_LIBRARIES} gtest + absl::config absl::hash absl::type_traits absl::statusor @@ -35274,7 +35292,7 @@ generate_pkgconfig( "gRPC" "high performance general RPC framework" "${gRPC_CORE_VERSION}" - "absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr" + "absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr" "libcares openssl re2 zlib" "-lgrpc" "-laddress_sorting -lupb_textformat_lib -lupb_json_lib -lutf8_range_lib -lupb_message_lib -lupb_mem_lib -lupb_base_lib" @@ -35285,7 +35303,7 @@ generate_pkgconfig( "gRPC unsecure" "high performance general RPC framework without SSL" "${gRPC_CORE_VERSION}" - "absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr" + "absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr" "libcares zlib" "-lgrpc_unsecure" "-laddress_sorting -lutf8_range_lib -lupb_message_lib -lupb_mem_lib -lupb_base_lib" @@ -35296,7 +35314,7 @@ generate_pkgconfig( "gRPC++" "C++ wrapper for gRPC" "${gRPC_CPP_VERSION}" - "absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr grpc" + "absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr grpc" "libcares openssl re2 zlib" "-lgrpc++" "-laddress_sorting -lupb_textformat_lib -lupb_json_lib -lutf8_range_lib -lupb_message_lib -lupb_mem_lib -lupb_base_lib" @@ -35307,7 +35325,7 @@ generate_pkgconfig( "gRPC++ unsecure" "C++ wrapper for gRPC without SSL" "${gRPC_CPP_VERSION}" - "absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr grpc_unsecure" + "absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr grpc_unsecure" "libcares zlib" "-lgrpc++_unsecure" "-laddress_sorting -lutf8_range_lib -lupb_message_lib -lupb_mem_lib -lupb_base_lib" diff --git a/Package.swift b/Package.swift index d4406696907..b6963fe3701 100644 --- a/Package.swift +++ b/Package.swift @@ -1408,6 +1408,7 @@ let package = Package( "src/core/lib/gprpp/crash.h", "src/core/lib/gprpp/debug_location.h", "src/core/lib/gprpp/directory_reader.h", + "src/core/lib/gprpp/down_cast.h", "src/core/lib/gprpp/dual_ref_counted.h", "src/core/lib/gprpp/env.h", "src/core/lib/gprpp/examine_stack.cc", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index bf7935fd4ba..d44d1f16f40 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -930,6 +930,7 @@ libs: - src/core/lib/gprpp/chunked_vector.h - src/core/lib/gprpp/cpp_impl_of.h - src/core/lib/gprpp/directory_reader.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/dual_ref_counted.h - src/core/lib/gprpp/if_list.h - src/core/lib/gprpp/load_file.h @@ -2024,6 +2025,7 @@ libs: - re2 - z - absl/algorithm:container + - absl/base:config - absl/cleanup:cleanup - absl/container:flat_hash_map - absl/container:flat_hash_set @@ -2406,6 +2408,7 @@ libs: - src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/chunked_vector.h - src/core/lib/gprpp/cpp_impl_of.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/dual_ref_counted.h - src/core/lib/gprpp/if_list.h - src/core/lib/gprpp/load_file.h @@ -3069,6 +3072,7 @@ libs: - utf8_range_lib - z - absl/algorithm:container + - absl/base:config - absl/cleanup:cleanup - absl/container:flat_hash_map - absl/container:flat_hash_set @@ -4447,6 +4451,7 @@ libs: - src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/chunked_vector.h - src/core/lib/gprpp/cpp_impl_of.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/dual_ref_counted.h - src/core/lib/gprpp/if_list.h - src/core/lib/gprpp/load_file.h @@ -4974,6 +4979,7 @@ libs: - re2 - utf8_range_lib - z + - absl/base:config - absl/cleanup:cleanup - absl/container:flat_hash_map - absl/container:flat_hash_set @@ -5158,6 +5164,7 @@ targets: - src/core/lib/debug/trace.h - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted_ptr.h @@ -5184,6 +5191,7 @@ targets: - test/core/promise/activity_test.cc deps: - gtest + - absl/base:config - absl/hash:hash - absl/meta:type_traits - absl/status:statusor @@ -6256,6 +6264,7 @@ targets: - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/chunked_vector.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/dual_ref_counted.h - src/core/lib/gprpp/if_list.h - src/core/lib/gprpp/manual_constructor.h @@ -6391,6 +6400,7 @@ targets: - gtest - upb_message_lib - utf8_range_lib + - absl/base:config - absl/container:inlined_vector - absl/functional:function_ref - absl/hash:hash @@ -7334,6 +7344,7 @@ targets: - src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/chunked_vector.h - src/core/lib/gprpp/cpp_impl_of.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h @@ -7435,6 +7446,7 @@ targets: - gtest - upb_message_lib - utf8_range_lib + - absl/base:config - absl/functional:function_ref - absl/hash:hash - absl/meta:type_traits @@ -7960,11 +7972,14 @@ targets: build: test language: c++ headers: + - src/core/lib/gprpp/down_cast.h - src/core/lib/promise/context.h src: - test/core/promise/context_test.cc deps: - gtest + - absl/base:config + - absl/meta:type_traits - gpr uses_polling: false - name: core_configuration_test @@ -8683,6 +8698,7 @@ targets: - src/core/lib/gpr/spinlock.h - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h @@ -8764,6 +8780,7 @@ targets: - gtest - upb_message_lib - utf8_range_lib + - absl/base:config - absl/hash:hash - absl/meta:type_traits - absl/status:statusor @@ -9190,6 +9207,7 @@ targets: - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/cpp_impl_of.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h @@ -9297,6 +9315,7 @@ targets: - gtest - upb_message_lib - utf8_range_lib + - absl/base:config - absl/functional:function_ref - absl/hash:hash - absl/meta:type_traits @@ -9321,6 +9340,7 @@ targets: - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/cpp_impl_of.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h @@ -9432,6 +9452,7 @@ targets: - gtest - upb_message_lib - utf8_range_lib + - absl/base:config - absl/functional:function_ref - absl/hash:hash - absl/meta:type_traits @@ -10807,6 +10828,7 @@ targets: headers: - src/core/lib/debug/trace.h - src/core/lib/gprpp/atomic_utils.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted_ptr.h @@ -10829,6 +10851,7 @@ targets: - test/core/promise/inter_activity_pipe_test.cc deps: - gtest + - absl/base:config - absl/hash:hash - absl/meta:type_traits - absl/status:statusor @@ -10851,6 +10874,7 @@ targets: - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/cpp_impl_of.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h @@ -10954,6 +10978,7 @@ targets: - gtest - upb_message_lib - utf8_range_lib + - absl/base:config - absl/functional:function_ref - absl/hash:hash - absl/meta:type_traits @@ -11281,6 +11306,7 @@ targets: - src/core/lib/debug/trace.h - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted_ptr.h @@ -11306,6 +11332,7 @@ targets: - test/core/promise/latch_test.cc deps: - gtest + - absl/base:config - absl/hash:hash - absl/meta:type_traits - absl/status:statusor @@ -11435,6 +11462,7 @@ targets: - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/cpp_impl_of.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h @@ -11546,6 +11574,7 @@ targets: - gtest - upb_message_lib - utf8_range_lib + - absl/base:config - absl/functional:function_ref - absl/hash:hash - absl/meta:type_traits @@ -11925,6 +11954,7 @@ targets: language: c++ headers: - src/core/lib/gprpp/atomic_utils.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted_ptr.h @@ -11942,6 +11972,7 @@ targets: - test/core/promise/mpsc_test.cc deps: - gtest + - absl/base:config - absl/hash:hash - absl/meta:type_traits - absl/status:statusor @@ -12954,6 +12985,7 @@ targets: - src/core/lib/debug/trace.h - src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/bitset.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted_ptr.h @@ -12980,6 +13012,7 @@ targets: - test/core/promise/promise_mutex_test.cc deps: - gtest + - absl/base:config - absl/hash:hash - absl/meta:type_traits - absl/status:statusor @@ -17274,6 +17307,7 @@ targets: language: c++ headers: - src/core/lib/gprpp/atomic_utils.h + - src/core/lib/gprpp/down_cast.h - src/core/lib/gprpp/notification.h - src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/ref_counted.h @@ -17292,6 +17326,7 @@ targets: - test/core/promise/wait_for_callback_test.cc deps: - gtest + - absl/base:config - absl/hash:hash - absl/meta:type_traits - absl/status:statusor diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 77da26b8a61..8c312148ece 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -219,6 +219,7 @@ Pod::Spec.new do |s| abseil_version = '1.20230802.0' ss.dependency 'abseil/algorithm/container', abseil_version ss.dependency 'abseil/base/base', abseil_version + ss.dependency 'abseil/base/config', abseil_version ss.dependency 'abseil/base/core_headers', abseil_version ss.dependency 'abseil/cleanup/cleanup', abseil_version ss.dependency 'abseil/container/flat_hash_map', abseil_version @@ -1012,6 +1013,7 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/crash.h', 'src/core/lib/gprpp/debug_location.h', 'src/core/lib/gprpp/directory_reader.h', + 'src/core/lib/gprpp/down_cast.h', 'src/core/lib/gprpp/dual_ref_counted.h', 'src/core/lib/gprpp/env.h', 'src/core/lib/gprpp/examine_stack.h', @@ -2259,6 +2261,7 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/crash.h', 'src/core/lib/gprpp/debug_location.h', 'src/core/lib/gprpp/directory_reader.h', + 'src/core/lib/gprpp/down_cast.h', 'src/core/lib/gprpp/dual_ref_counted.h', 'src/core/lib/gprpp/env.h', 'src/core/lib/gprpp/examine_stack.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 06bb6be590d..0690efc7710 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -188,6 +188,7 @@ Pod::Spec.new do |s| ss.dependency 'BoringSSL-GRPC', '0.0.31' ss.dependency 'abseil/algorithm/container', abseil_version ss.dependency 'abseil/base/base', abseil_version + ss.dependency 'abseil/base/config', abseil_version ss.dependency 'abseil/base/core_headers', abseil_version ss.dependency 'abseil/cleanup/cleanup', abseil_version ss.dependency 'abseil/container/flat_hash_map', abseil_version @@ -1511,6 +1512,7 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/crash.h', 'src/core/lib/gprpp/debug_location.h', 'src/core/lib/gprpp/directory_reader.h', + 'src/core/lib/gprpp/down_cast.h', 'src/core/lib/gprpp/dual_ref_counted.h', 'src/core/lib/gprpp/env.h', 'src/core/lib/gprpp/examine_stack.cc', @@ -3033,6 +3035,7 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/crash.h', 'src/core/lib/gprpp/debug_location.h', 'src/core/lib/gprpp/directory_reader.h', + 'src/core/lib/gprpp/down_cast.h', 'src/core/lib/gprpp/dual_ref_counted.h', 'src/core/lib/gprpp/env.h', 'src/core/lib/gprpp/examine_stack.h', diff --git a/grpc.gemspec b/grpc.gemspec index 333f2086d0f..359496daf73 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1414,6 +1414,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/gprpp/crash.h ) s.files += %w( src/core/lib/gprpp/debug_location.h ) s.files += %w( src/core/lib/gprpp/directory_reader.h ) + s.files += %w( src/core/lib/gprpp/down_cast.h ) s.files += %w( src/core/lib/gprpp/dual_ref_counted.h ) s.files += %w( src/core/lib/gprpp/env.h ) s.files += %w( src/core/lib/gprpp/examine_stack.cc ) diff --git a/grpc.gyp b/grpc.gyp index 1cd48e8b612..8d6fd1b67a1 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -254,6 +254,7 @@ 're2', 'z', 'absl/algorithm:container', + 'absl/base:config', 'absl/cleanup:cleanup', 'absl/container:flat_hash_map', 'absl/container:flat_hash_set', @@ -1120,6 +1121,7 @@ 'utf8_range_lib', 'z', 'absl/algorithm:container', + 'absl/base:config', 'absl/cleanup:cleanup', 'absl/container:flat_hash_map', 'absl/container:flat_hash_set', @@ -2008,6 +2010,7 @@ 're2', 'utf8_range_lib', 'z', + 'absl/base:config', 'absl/cleanup:cleanup', 'absl/container:flat_hash_map', 'absl/container:flat_hash_set', diff --git a/package.xml b/package.xml index 45453be6bf2..7e911eb1e7c 100644 --- a/package.xml +++ b/package.xml @@ -1396,6 +1396,7 @@ + diff --git a/src/core/BUILD b/src/core/BUILD index bd1798f7e1e..70d1333d6df 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -495,11 +495,15 @@ grpc_cc_library( grpc_cc_library( name = "context", + external_deps = ["absl/meta:type_traits"], language = "c++", public_hdrs = [ "lib/promise/context.h", ], - deps = ["//:gpr"], + deps = [ + "down_cast", + "//:gpr", + ], ) grpc_cc_library( diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index e7583e10ed3..29b5d7ec4d1 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -332,7 +332,7 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData { GRPC_CHANNEL_IDLE)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: %striggering exit idle", chand_, - this, Activity::current()->DebugTag().c_str()); + this, GetContext()->DebugTag().c_str()); } // Bounce into the control plane work serializer to start resolving. GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExitIdle"); @@ -349,7 +349,7 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData { auto result = CheckResolution(was_queued_); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: %sCheckResolution returns %s", - chand_, this, Activity::current()->DebugTag().c_str(), + chand_, this, GetContext()->DebugTag().c_str(), result.has_value() ? result->ToString().c_str() : "Pending"); } if (!result.has_value()) return Pending{}; @@ -372,7 +372,7 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData { void OnAddToQueueLocked() override ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) { - waker_ = Activity::current()->MakeNonOwningWaker(); + waker_ = GetContext()->MakeNonOwningWaker(); was_queued_ = true; } @@ -3541,7 +3541,7 @@ ClientChannel::PromiseBasedLoadBalancedCall::MakeCallPromise( gpr_log(GPR_INFO, "chand=%p lb_call=%p: %sPickSubchannel() returns %s", chand(), this, - Activity::current()->DebugTag().c_str(), + GetContext()->DebugTag().c_str(), result.has_value() ? result->ToString().c_str() : "Pending"); } @@ -3624,7 +3624,7 @@ ClientChannel::PromiseBasedLoadBalancedCall::send_initial_metadata() const { } void ClientChannel::PromiseBasedLoadBalancedCall::OnAddToQueueLocked() { - waker_ = Activity::current()->MakeNonOwningWaker(); + waker_ = GetContext()->MakeNonOwningWaker(); was_queued_ = true; } diff --git a/src/core/ext/filters/http/message_compress/legacy_compression_filter.cc b/src/core/ext/filters/http/message_compress/legacy_compression_filter.cc index 71f9a6d9207..8f62d610b33 100644 --- a/src/core/ext/filters/http/message_compress/legacy_compression_filter.cc +++ b/src/core/ext/filters/http/message_compress/legacy_compression_filter.cc @@ -291,7 +291,7 @@ LegacyServerCompressionFilter::MakeCallPromise( auto r = DecompressMessage(std::move(message), decompress_args); if (grpc_call_trace.enabled()) { gpr_log(GPR_DEBUG, "%s[compression] DecompressMessage returned %s", - Activity::current()->DebugTag().c_str(), + GetContext()->DebugTag().c_str(), r.status().ToString().c_str()); } if (!r.ok()) { @@ -306,7 +306,7 @@ LegacyServerCompressionFilter::MakeCallPromise( [this, compression_algorithm](ServerMetadataHandle md) { if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[compression] Write metadata", - Activity::current()->DebugTag().c_str()); + GetContext()->DebugTag().c_str()); } // Find the compression algorithm. *compression_algorithm = HandleOutgoingMetadata(*md); diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 38c2dda0e16..9769e6de89c 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -141,7 +141,7 @@ ServerMetadataHandle HttpServerFilter::Call::OnClientInitialMetadata( void HttpServerFilter::Call::OnServerInitialMetadata(ServerMetadata& md) { if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[http-server] Write metadata", - Activity::current()->DebugTag().c_str()); + GetContext()->DebugTag().c_str()); } FilterOutgoingMetadata(&md); md.Set(HttpStatusMetadata(), 200); diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index ac010d40e1a..d4b58307512 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -164,8 +164,8 @@ ServerMetadataHandle CheckPayload(const Message& msg, if (!max_length.has_value()) return nullptr; if (GRPC_TRACE_FLAG_ENABLED(grpc_call_trace)) { gpr_log(GPR_INFO, "%s[message_size] %s len:%" PRIdPTR " max:%d", - Activity::current()->DebugTag().c_str(), is_send ? "send" : "recv", - msg.payload()->Length(), *max_length); + GetContext()->DebugTag().c_str(), + is_send ? "send" : "recv", msg.payload()->Length(), *max_length); } if (msg.payload()->Length() <= *max_length) return nullptr; auto r = GetContext()->MakePooled(GetContext()); diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index 9ae7e14ee90..2e0689bd54f 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -364,7 +364,7 @@ class ConnectedChannelStream : public Orphanable { grpc_stream_refcount stream_refcount_; StreamPtr stream_; Arena* arena_ = GetContext(); - Party* const party_ = static_cast(Activity::current()); + Party* const party_ = GetContext(); ExternallyObservableLatch finished_; }; @@ -383,17 +383,18 @@ auto ConnectedChannelStream::RecvMessages( gpr_log(GPR_INFO, "%s[connected] RecvMessage: received payload of %" PRIdPTR " bytes", - Activity::current()->DebugTag().c_str(), + GetContext()->DebugTag().c_str(), pending_message->payload()->Length()); } return Map(incoming_messages.Push(std::move(pending_message)), [](bool ok) -> LoopCtl { if (!ok) { if (grpc_call_trace.enabled()) { - gpr_log(GPR_INFO, - "%s[connected] RecvMessage: failed to " - "push message towards the application", - Activity::current()->DebugTag().c_str()); + gpr_log( + GPR_INFO, + "%s[connected] RecvMessage: failed to " + "push message towards the application", + GetContext()->DebugTag().c_str()); } return absl::OkStatus(); } @@ -406,7 +407,7 @@ auto ConnectedChannelStream::RecvMessages( gpr_log(GPR_INFO, "%s[connected] RecvMessage: reached end of stream with " "status:%s", - Activity::current()->DebugTag().c_str(), + GetContext()->DebugTag().c_str(), status.status().ToString().c_str()); } if (cancel_on_error && !status.ok()) { @@ -444,7 +445,7 @@ ArenaPromise MakeClientCallPromise(Transport* transport, transport->filter_stack_transport()->InitStream(stream->stream(), stream->stream_refcount(), nullptr, GetContext()); - auto* party = static_cast(Activity::current()); + auto* party = GetContext(); party->Spawn("set_polling_entity", call_args.polling_entity->Wait(), [transport, stream = stream->InternalRef()]( grpc_polling_entity polling_entity) { @@ -474,7 +475,7 @@ ArenaPromise MakeClientCallPromise(Transport* transport, if (grpc_call_trace.enabled()) { gpr_log(GPR_DEBUG, "%s[connected] Publish client initial metadata: %s", - Activity::current()->DebugTag().c_str(), + GetContext()->DebugTag().c_str(), server_initial_metadata->DebugString().c_str()); } return Map(pipe->Push(std::move(server_initial_metadata)), @@ -581,7 +582,7 @@ ArenaPromise MakeServerCallPromise( stream->stream(), stream->stream_refcount(), GetContext()->server_call_context()->server_stream_data(), GetContext()); - auto* party = static_cast(Activity::current()); + auto* party = GetContext(); // Arifacts we need for the lifetime of the call. struct CallData { @@ -747,7 +748,7 @@ ArenaPromise MakeServerCallPromise( : nullptr; if (md != nullptr) { call_data->sent_initial_metadata = true; - auto* party = static_cast(Activity::current()); + auto* party = GetContext(); party->Spawn("connected/send_initial_metadata", GetContext()->SendServerInitialMetadata( stream->batch_target(), std::move(md)), @@ -777,7 +778,7 @@ ArenaPromise MakeServerCallPromise( gpr_log( GPR_DEBUG, "%s[connected] Got trailing metadata; status=%s metadata=%s", - Activity::current()->DebugTag().c_str(), + GetContext()->DebugTag().c_str(), status.status().ToString().c_str(), status.ok() ? (*status)->DebugString().c_str() : ""); } diff --git a/src/core/lib/channel/promise_based_filter.cc b/src/core/lib/channel/promise_based_filter.cc index 8e9632e8b30..96a22a2fb0d 100644 --- a/src/core/lib/channel/promise_based_filter.cc +++ b/src/core/lib/channel/promise_based_filter.cc @@ -374,11 +374,11 @@ void BaseCallData::SendMessage::GotPipe(T* pipe_end) { switch (state_) { case State::kInitial: state_ = State::kIdle; - Activity::current()->ForceImmediateRepoll(); + GetContext()->ForceImmediateRepoll(); break; case State::kGotBatchNoPipe: state_ = State::kGotBatch; - Activity::current()->ForceImmediateRepoll(); + GetContext()->ForceImmediateRepoll(); break; case State::kIdle: case State::kGotBatch: @@ -567,7 +567,7 @@ void BaseCallData::SendMessage::WakeInsideCombiner(Flusher* flusher, } if (completed_status_.ok()) { state_ = State::kIdle; - Activity::current()->ForceImmediateRepoll(); + GetContext()->ForceImmediateRepoll(); } else { state_ = State::kCancelled; } @@ -675,7 +675,7 @@ void BaseCallData::ReceiveMessage::GotPipe(T* pipe_end) { break; case State::kBatchCompletedNoPipe: state_ = State::kBatchCompleted; - Activity::current()->ForceImmediateRepoll(); + GetContext()->ForceImmediateRepoll(); break; case State::kIdle: case State::kForwardedBatch: diff --git a/src/core/lib/promise/activity.h b/src/core/lib/promise/activity.h index a05ddab8afb..65b644902c7 100644 --- a/src/core/lib/promise/activity.h +++ b/src/core/lib/promise/activity.h @@ -282,6 +282,12 @@ class ContextHolder> { std::unique_ptr value_; }; +template <> +class Context { + public: + static Activity* get() { return Activity::current(); } +}; + template using ContextTypeFromHeld = typename ContextHolder::ContextType; @@ -632,13 +638,13 @@ ActivityPtr MakeActivity(Factory promise_factory, } inline Pending IntraActivityWaiter::pending() { - wakeups_ |= Activity::current()->CurrentParticipant(); + wakeups_ |= GetContext()->CurrentParticipant(); return Pending(); } inline void IntraActivityWaiter::Wake() { if (wakeups_ == 0) return; - Activity::current()->ForceImmediateRepoll(std::exchange(wakeups_, 0)); + GetContext()->ForceImmediateRepoll(std::exchange(wakeups_, 0)); } } // namespace grpc_core diff --git a/src/core/lib/promise/context.h b/src/core/lib/promise/context.h index 594c2d911fc..ef0d59c9884 100644 --- a/src/core/lib/promise/context.h +++ b/src/core/lib/promise/context.h @@ -19,8 +19,12 @@ #include +#include "absl/meta/type_traits.h" + #include +#include "src/core/lib/gprpp/down_cast.h" + namespace grpc_core { // To avoid accidentally creating context types, we require an explicit @@ -28,17 +32,35 @@ namespace grpc_core { // not contain any members, only exist. // The reason for avoiding this is that context types each use a thread local. template -struct ContextType; // IWYU pragma: keep +struct ContextType; + +// Some contexts can be subclassed. If the subclass is set as that context +// then GetContext() will return the base, and GetContext() will +// down_cast to the derived type. +// Specializations of this type should be created for each derived type, and +// should have a single using statement Base pointing to the derived base class. +// Example: +// class SomeContext {}; +// class SomeDerivedContext : public SomeContext {}; +// template <> struct ContextType {}; +// template <> struct ContextSubclass { +// using Base = SomeContext; +// }; +template +struct ContextSubclass; namespace promise_detail { +template +class Context; + template -class Context : public ContextType { +class ThreadLocalContext : public ContextType { public: - explicit Context(T* p) : old_(current_) { current_ = p; } - ~Context() { current_ = old_; } - Context(const Context&) = delete; - Context& operator=(const Context&) = delete; + explicit ThreadLocalContext(T* p) : old_(current_) { current_ = p; } + ~ThreadLocalContext() { current_ = old_; } + ThreadLocalContext(const ThreadLocalContext&) = delete; + ThreadLocalContext& operator=(const ThreadLocalContext&) = delete; static T* get() { return current_; } @@ -48,7 +70,23 @@ class Context : public ContextType { }; template -thread_local T* Context::current_; +thread_local T* ThreadLocalContext::current_; + +template +class Context())>> + : public ThreadLocalContext { + using ThreadLocalContext::ThreadLocalContext; +}; + +template +class Context::Base>> + : public Context::Base> { + public: + using Context::Base>::Context; + static T* get() { + return down_cast(Context::Base>::get()); + } +}; template class WithContext { diff --git a/src/core/lib/promise/for_each.h b/src/core/lib/promise/for_each.h index 2b8e9b10cc9..1dd6e271ccc 100644 --- a/src/core/lib/promise/for_each.h +++ b/src/core/lib/promise/for_each.h @@ -118,7 +118,7 @@ class ForEach { }; std::string DebugTag() { - return absl::StrCat(Activity::current()->DebugTag(), " FOR_EACH[0x", + return absl::StrCat(GetContext()->DebugTag(), " FOR_EACH[0x", reinterpret_cast(this), "]: "); } diff --git a/src/core/lib/promise/inter_activity_latch.h b/src/core/lib/promise/inter_activity_latch.h index 8c31c8a1625..e27a28e63e2 100644 --- a/src/core/lib/promise/inter_activity_latch.h +++ b/src/core/lib/promise/inter_activity_latch.h @@ -56,7 +56,8 @@ class InterActivityLatch { if (is_set_) { return Empty{}; } else { - return waiters_.AddPending(Activity::current()->MakeNonOwningWaker()); + return waiters_.AddPending( + GetContext()->MakeNonOwningWaker()); } }; } @@ -78,7 +79,7 @@ class InterActivityLatch { private: std::string DebugTag() { - return absl::StrCat(Activity::current()->DebugTag(), + return absl::StrCat(GetContext()->DebugTag(), " INTER_ACTIVITY_LATCH[0x", reinterpret_cast(this), "]: "); } diff --git a/src/core/lib/promise/inter_activity_pipe.h b/src/core/lib/promise/inter_activity_pipe.h index 4578cbb3c62..d9a2d899637 100644 --- a/src/core/lib/promise/inter_activity_pipe.h +++ b/src/core/lib/promise/inter_activity_pipe.h @@ -42,7 +42,7 @@ class InterActivityPipe { ReleasableMutexLock lock(&mu_); if (closed_) return false; if (count_ == kQueueSize) { - on_available_ = Activity::current()->MakeNonOwningWaker(); + on_available_ = GetContext()->MakeNonOwningWaker(); return Pending{}; } queue_[(first_ + count_) % kQueueSize] = std::move(value); @@ -59,7 +59,7 @@ class InterActivityPipe { ReleasableMutexLock lock(&mu_); if (count_ == 0) { if (closed_) return absl::nullopt; - on_occupied_ = Activity::current()->MakeNonOwningWaker(); + on_occupied_ = GetContext()->MakeNonOwningWaker(); return Pending{}; } auto value = std::move(queue_[first_]); diff --git a/src/core/lib/promise/latch.h b/src/core/lib/promise/latch.h index 67002234b7c..e1252b99d92 100644 --- a/src/core/lib/promise/latch.h +++ b/src/core/lib/promise/latch.h @@ -113,7 +113,7 @@ class Latch { private: std::string DebugTag() { - return absl::StrCat(Activity::current()->DebugTag(), " LATCH[0x", + return absl::StrCat(GetContext()->DebugTag(), " LATCH[0x", reinterpret_cast(this), "]: "); } @@ -189,7 +189,7 @@ class Latch { private: std::string DebugTag() { - return absl::StrCat(Activity::current()->DebugTag(), " LATCH(void)[0x", + return absl::StrCat(GetContext()->DebugTag(), " LATCH(void)[0x", reinterpret_cast(this), "]: "); } @@ -259,7 +259,7 @@ class ExternallyObservableLatch { private: std::string DebugTag() { - return absl::StrCat(Activity::current()->DebugTag(), " LATCH(void)[0x", + return absl::StrCat(GetContext()->DebugTag(), " LATCH(void)[0x", reinterpret_cast(this), "]: "); } diff --git a/src/core/lib/promise/mpsc.h b/src/core/lib/promise/mpsc.h index 8bbbfc4c8ec..38bcf2536b2 100644 --- a/src/core/lib/promise/mpsc.h +++ b/src/core/lib/promise/mpsc.h @@ -62,7 +62,7 @@ class Center : public RefCounted> { bool PollReceiveBatch(std::vector& dest) { ReleasableMutexLock lock(&mu_); if (queue_.empty()) { - receive_waker_ = Activity::current()->MakeNonOwningWaker(); + receive_waker_ = GetContext()->MakeNonOwningWaker(); return false; } dest.swap(queue_); @@ -87,7 +87,7 @@ class Center : public RefCounted> { receive_waker.Wakeup(); return Poll(true); } - send_wakers_.AddPending(Activity::current()->MakeNonOwningWaker()); + send_wakers_.AddPending(GetContext()->MakeNonOwningWaker()); return Pending{}; } diff --git a/src/core/lib/promise/party.h b/src/core/lib/promise/party.h index a6c0aca1789..d3fd0588f50 100644 --- a/src/core/lib/promise/party.h +++ b/src/core/lib/promise/party.h @@ -385,10 +385,10 @@ class Party : public Activity, private Wakeable { // This is useful for implementing batching and the like: we can hold some // action until the rest of the party resolves itself. auto AfterCurrentPoll() { - GPR_DEBUG_ASSERT(Activity::current() == this); + GPR_DEBUG_ASSERT(GetContext() == this); sync_.WakeAfterPoll(CurrentParticipant()); return [this, iteration = sync_.iteration()]() -> Poll { - GPR_DEBUG_ASSERT(Activity::current() == this); + GPR_DEBUG_ASSERT(GetContext() == this); if (iteration == sync_.iteration()) return Pending{}; return Empty{}; }; @@ -561,7 +561,7 @@ class Party : public Activity, private Wakeable { GPR_NO_UNIQUE_ADDRESS Promise promise_; GPR_NO_UNIQUE_ADDRESS Result result_; }; - Waker waiter_{Activity::current()->MakeOwningWaker()}; + Waker waiter_{GetContext()->MakeOwningWaker()}; std::atomic state_{State::kFactory}; }; @@ -608,6 +608,11 @@ class Party : public Activity, private Wakeable { std::atomic participants_[party_detail::kMaxParticipants] = {}; }; +template <> +struct ContextSubclass { + using Base = Activity; +}; + template void Party::BulkSpawner::Spawn(absl::string_view name, Factory promise_factory, OnComplete on_complete) { diff --git a/src/core/lib/promise/pipe.h b/src/core/lib/promise/pipe.h index 973a46d5d71..d904557ab9f 100644 --- a/src/core/lib/promise/pipe.h +++ b/src/core/lib/promise/pipe.h @@ -374,7 +374,7 @@ class Center : public InterceptorList { const T& value() const { return value_; } std::string DebugTag() { - if (auto* activity = Activity::current()) { + if (auto* activity = GetContext()) { return absl::StrCat(activity->DebugTag(), " PIPE[0x", absl::Hex(this), "]: "); } else { @@ -661,7 +661,7 @@ class Push { if (center_ == nullptr) { if (grpc_trace_promise_primitives.enabled()) { gpr_log(GPR_DEBUG, "%s Pipe push has a null center", - Activity::current()->DebugTag().c_str()); + GetContext()->DebugTag().c_str()); } return false; } diff --git a/src/core/lib/promise/sleep.cc b/src/core/lib/promise/sleep.cc index 8288dc14fdd..4ddae17e0fd 100644 --- a/src/core/lib/promise/sleep.cc +++ b/src/core/lib/promise/sleep.cc @@ -54,7 +54,7 @@ Poll Sleep::operator()() { } Sleep::ActiveClosure::ActiveClosure(Timestamp deadline) - : waker_(Activity::current()->MakeOwningWaker()), + : waker_(GetContext()->MakeOwningWaker()), timer_handle_(GetContext()->RunAfter( deadline - Timestamp::Now(), this)) {} diff --git a/src/core/lib/promise/wait_for_callback.h b/src/core/lib/promise/wait_for_callback.h index 7ebf451fd7a..6dc966c7b19 100644 --- a/src/core/lib/promise/wait_for_callback.h +++ b/src/core/lib/promise/wait_for_callback.h @@ -39,7 +39,7 @@ class WaitForCallback { return [state = state_]() -> Poll { MutexLock lock(&state->mutex); if (state->done) return Empty{}; - state->waker = Activity::current()->MakeNonOwningWaker(); + state->waker = GetContext()->MakeNonOwningWaker(); return Pending{}; }; } diff --git a/src/core/lib/resource_quota/memory_quota.cc b/src/core/lib/resource_quota/memory_quota.cc index fb3542b9dc4..ef4fdacb6a6 100644 --- a/src/core/lib/resource_quota/memory_quota.cc +++ b/src/core/lib/resource_quota/memory_quota.cc @@ -232,10 +232,10 @@ Poll> ReclaimerQueue::PollNext() { if (!empty) { // If we don't, but the queue is probably not empty, schedule an immediate // repoll. - Activity::current()->ForceImmediateRepoll(); + GetContext()->ForceImmediateRepoll(); } else { // Otherwise, schedule a wakeup for whenever something is pushed. - state_->waker = Activity::current()->MakeNonOwningWaker(); + state_->waker = GetContext()->MakeNonOwningWaker(); } return Pending{}; } @@ -465,7 +465,7 @@ void BasicMemoryQuota::Start() { self->reclamation_counter_.fetch_add(1, std::memory_order_relaxed) + 1; reclaimer->Run(ReclamationSweep( - self, token, Activity::current()->MakeNonOwningWaker())); + self, token, GetContext()->MakeNonOwningWaker())); // Return a promise that will wait for our barrier. This will be // awoken by the token above being destroyed. So, once that token is // destroyed, we'll be able to proceed. diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc index 79b0cce1e54..e0533faaf34 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc @@ -314,7 +314,8 @@ grpc_oauth2_token_fetcher_credentials::GetRequestMetadata( auto pending_request = grpc_core::MakeRefCounted(); pending_request->pollent = grpc_core::GetContext(); - pending_request->waker = grpc_core::Activity::current()->MakeNonOwningWaker(); + pending_request->waker = + grpc_core::GetContext()->MakeNonOwningWaker(); grpc_polling_entity_add_to_pollset_set( pending_request->pollent, grpc_polling_entity_pollset_set(&pollent_)); pending_request->next = pending_requests_; diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.h b/src/core/lib/security/credentials/plugin/plugin_credentials.h index 56a53a7c080..848d970bc6c 100644 --- a/src/core/lib/security/credentials/plugin/plugin_credentials.h +++ b/src/core/lib/security/credentials/plugin/plugin_credentials.h @@ -103,7 +103,7 @@ struct grpc_plugin_credentials final : public grpc_call_credentials { private: std::atomic ready_{false}; grpc_core::Waker waker_{ - grpc_core::Activity::current()->MakeNonOwningWaker()}; + grpc_core::GetContext()->MakeNonOwningWaker()}; grpc_core::RefCountedPtr call_creds_; grpc_auth_metadata_context context_; grpc_core::ClientMetadataHandle md_; diff --git a/src/core/lib/security/transport/legacy_server_auth_filter.cc b/src/core/lib/security/transport/legacy_server_auth_filter.cc index 7b8da39f2dd..acb10ba0335 100644 --- a/src/core/lib/security/transport/legacy_server_auth_filter.cc +++ b/src/core/lib/security/transport/legacy_server_auth_filter.cc @@ -124,7 +124,7 @@ class LegacyServerAuthFilter::RunApplicationCode { gpr_log(GPR_ERROR, "%s[server-auth]: Delegate to application: filter=%p this=%p " "auth_ctx=%p", - Activity::current()->DebugTag().c_str(), filter, this, + GetContext()->DebugTag().c_str(), filter, this, filter->auth_context_.get()); } filter->server_credentials_->auth_metadata_processor().process( @@ -152,7 +152,7 @@ class LegacyServerAuthFilter::RunApplicationCode { private: struct State { explicit State(CallArgs call_args) : call_args(std::move(call_args)) {} - Waker waker{Activity::current()->MakeOwningWaker()}; + Waker waker{GetContext()->MakeOwningWaker()}; absl::StatusOr call_args; grpc_metadata_array md = MetadataBatchToMetadataArray(call_args->client_initial_metadata.get()); diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc index b0713f41770..1ac686a8159 100644 --- a/src/core/lib/security/transport/server_auth_filter.cc +++ b/src/core/lib/security/transport/server_auth_filter.cc @@ -123,7 +123,7 @@ grpc_metadata_array MetadataBatchToMetadataArray( struct ServerAuthFilter::RunApplicationCode::State { explicit State(ClientMetadata& client_metadata) : client_metadata(&client_metadata) {} - Waker waker{Activity::current()->MakeOwningWaker()}; + Waker waker{GetContext()->MakeOwningWaker()}; absl::StatusOr client_metadata; grpc_metadata_array md = MetadataBatchToMetadataArray(*client_metadata); std::atomic done{false}; @@ -136,7 +136,7 @@ ServerAuthFilter::RunApplicationCode::RunApplicationCode( gpr_log(GPR_ERROR, "%s[server-auth]: Delegate to application: filter=%p this=%p " "auth_ctx=%p", - Activity::current()->DebugTag().c_str(), filter, this, + GetContext()->DebugTag().c_str(), filter, this, filter->auth_context_.get()); } filter->server_credentials_->auth_metadata_processor().process( diff --git a/src/core/lib/surface/call_trace.cc b/src/core/lib/surface/call_trace.cc index 9b198f4abc8..78b703313ef 100644 --- a/src/core/lib/surface/call_trace.cc +++ b/src/core/lib/surface/call_trace.cc @@ -56,22 +56,22 @@ const grpc_channel_filter* PromiseTracingFilterFor( gpr_log( GPR_DEBUG, "%s[%s] CreateCallPromise: client_initial_metadata=%s", - Activity::current()->DebugTag().c_str(), + GetContext()->DebugTag().c_str(), source_filter->name, call_args.client_initial_metadata->DebugString().c_str()); return [source_filter, child = next_promise_factory( std::move(call_args))]() mutable { gpr_log(GPR_DEBUG, "%s[%s] PollCallPromise: begin", - Activity::current()->DebugTag().c_str(), + GetContext()->DebugTag().c_str(), source_filter->name); auto r = child(); if (auto* p = r.value_if_ready()) { gpr_log(GPR_DEBUG, "%s[%s] PollCallPromise: done: %s", - Activity::current()->DebugTag().c_str(), + GetContext()->DebugTag().c_str(), source_filter->name, (*p)->DebugString().c_str()); } else { gpr_log(GPR_DEBUG, "%s[%s] PollCallPromise: <>", - Activity::current()->DebugTag().c_str(), + GetContext()->DebugTag().c_str(), source_filter->name); } return r; @@ -84,35 +84,35 @@ const grpc_channel_filter* PromiseTracingFilterFor( call->client_initial_metadata().receiver.InterceptAndMap( [source_filter](ClientMetadataHandle md) { gpr_log(GPR_DEBUG, "%s[%s] OnClientInitialMetadata: %s", - Activity::current()->DebugTag().c_str(), + GetContext()->DebugTag().c_str(), source_filter->name, md->DebugString().c_str()); return md; }); call->client_to_server_messages().receiver.InterceptAndMap( [source_filter](MessageHandle msg) { gpr_log(GPR_DEBUG, "%s[%s] OnClientToServerMessage: %s", - Activity::current()->DebugTag().c_str(), + GetContext()->DebugTag().c_str(), source_filter->name, msg->DebugString().c_str()); return msg; }); call->server_initial_metadata().sender.InterceptAndMap( [source_filter](ServerMetadataHandle md) { gpr_log(GPR_DEBUG, "%s[%s] OnServerInitialMetadata: %s", - Activity::current()->DebugTag().c_str(), + GetContext()->DebugTag().c_str(), source_filter->name, md->DebugString().c_str()); return md; }); call->server_to_client_messages().sender.InterceptAndMap( [source_filter](MessageHandle msg) { gpr_log(GPR_DEBUG, "%s[%s] OnServerToClientMessage: %s", - Activity::current()->DebugTag().c_str(), + GetContext()->DebugTag().c_str(), source_filter->name, msg->DebugString().c_str()); return msg; }); call->server_trailing_metadata().sender.InterceptAndMap( [source_filter](ServerMetadataHandle md) { gpr_log(GPR_DEBUG, "%s[%s] OnServerTrailingMetadata: %s", - Activity::current()->DebugTag().c_str(), + GetContext()->DebugTag().c_str(), source_filter->name, md->DebugString().c_str()); return md; }); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index e53a609cfb1..bbb0aeff773 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -525,7 +525,7 @@ class Server::RealRequestMatcherPromises : public RequestMatcherInterface { "Too many pending requests for this server")); } auto w = std::make_shared( - Activity::current()->MakeOwningWaker()); + GetContext()->MakeOwningWaker()); pending_.push(w); return OnCancel( [w]() -> Poll> { diff --git a/src/core/lib/surface/wait_for_cq_end_op.h b/src/core/lib/surface/wait_for_cq_end_op.h index 896e629503c..345ececc581 100644 --- a/src/core/lib/surface/wait_for_cq_end_op.h +++ b/src/core/lib/surface/wait_for_cq_end_op.h @@ -41,7 +41,7 @@ class WaitForCqEndOp { } else { auto not_started = std::move(*n); auto& started = - state_.emplace(Activity::current()->MakeOwningWaker()); + state_.emplace(GetContext()->MakeOwningWaker()); grpc_cq_end_op( not_started.cq, not_started.tag, std::move(not_started.error), [](void* p, grpc_cq_completion*) { diff --git a/src/core/lib/transport/batch_builder.cc b/src/core/lib/transport/batch_builder.cc index b87e7dcf91b..4bfccd3df99 100644 --- a/src/core/lib/transport/batch_builder.cc +++ b/src/core/lib/transport/batch_builder.cc @@ -55,8 +55,7 @@ BatchBuilder::PendingCompletion::PendingCompletion(RefCountedPtr batch) BatchBuilder::Batch::Batch(grpc_transport_stream_op_batch_payload* payload, grpc_stream_refcount* stream_refcount) - : party(static_cast(Activity::current())->Ref()), - stream_refcount(stream_refcount) { + : party(GetContext()->Ref()), stream_refcount(stream_refcount) { batch.payload = payload; batch.is_traced = GetContext()->traced(); #ifndef NDEBUG @@ -70,7 +69,7 @@ BatchBuilder::Batch::~Batch() { auto* arena = party->arena(); if (grpc_call_trace.enabled()) { gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Destroy", - Activity::current()->DebugTag().c_str(), this); + GetContext()->DebugTag().c_str(), this); } if (pending_receive_message != nullptr) { arena->DeletePooled(pending_receive_message); diff --git a/src/core/lib/transport/batch_builder.h b/src/core/lib/transport/batch_builder.h index 70d0b7554c2..427de2305c1 100644 --- a/src/core/lib/transport/batch_builder.h +++ b/src/core/lib/transport/batch_builder.h @@ -202,7 +202,7 @@ class BatchBuilder { ~Batch(); Batch(const Batch&) = delete; Batch& operator=(const Batch&) = delete; - std::string DebugPrefix(Activity* activity = Activity::current()) const { + std::string DebugPrefix(Activity* activity = GetContext()) const { return absl::StrFormat("%s[connected] [batch %p] ", activity->DebugTag(), this); } diff --git a/src/core/lib/transport/promise_endpoint.h b/src/core/lib/transport/promise_endpoint.h index 9e90a90e87a..6346962cfe3 100644 --- a/src/core/lib/transport/promise_endpoint.h +++ b/src/core/lib/transport/promise_endpoint.h @@ -80,7 +80,7 @@ class PromiseEndpoint { data.c_slice_buffer()); // If `Write()` returns true immediately, the callback will not be called. // We still need to call our callback to pick up the result. - write_state_->waker = Activity::current()->MakeNonOwningWaker(); + write_state_->waker = GetContext()->MakeNonOwningWaker(); completed = endpoint_->Write( [write_state = write_state_](absl::Status status) { ApplicationCallbackExecCtx callback_exec_ctx; @@ -124,7 +124,7 @@ class PromiseEndpoint { static_cast(num_bytes - read_state_->buffer.Length())}; // If `Read()` returns true immediately, the callback will not be // called. - read_state_->waker = Activity::current()->MakeNonOwningWaker(); + read_state_->waker = GetContext()->MakeNonOwningWaker(); if (endpoint_->Read( [read_state = read_state_, num_bytes](absl::Status status) { ApplicationCallbackExecCtx callback_exec_ctx; diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index b448eece6e0..0815e2f72be 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -203,7 +203,7 @@ class CallSpineInterface { // and this construction supports that (and has helped the author not write // some bugs). GRPC_MUST_USE_RESULT absl::nullopt_t Cancel(ServerMetadataHandle metadata) { - GPR_DEBUG_ASSERT(Activity::current() == &party()); + GPR_DEBUG_ASSERT(GetContext() == &party()); auto& c = cancel_latch(); if (c.is_set()) return absl::nullopt; c.Set(std::move(metadata)); @@ -216,7 +216,7 @@ class CallSpineInterface { } auto WaitForCancel() { - GPR_DEBUG_ASSERT(Activity::current() == &party()); + GPR_DEBUG_ASSERT(GetContext() == &party()); return cancel_latch().Wait(); } @@ -225,7 +225,7 @@ class CallSpineInterface { // The resulting (returned) promise will resolve to Empty. template auto CancelIfFails(Promise promise) { - GPR_DEBUG_ASSERT(Activity::current() == &party()); + GPR_DEBUG_ASSERT(GetContext() == &party()); using P = promise_detail::PromiseLike; using ResultType = typename P::Result; return Map(std::move(promise), [this](ResultType r) { @@ -349,13 +349,13 @@ class CallInitiator { : spine_(std::move(spine)) {} auto PushClientInitialMetadata(ClientMetadataHandle md) { - GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); return Map(spine_->client_initial_metadata().sender.Push(std::move(md)), [](bool ok) { return StatusFlag(ok); }); } auto PullServerInitialMetadata() { - GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); return Map(spine_->server_initial_metadata().receiver.Next(), [](NextResult md) -> ValueOrFailure { @@ -365,7 +365,7 @@ class CallInitiator { } auto PullServerTrailingMetadata() { - GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); return Race(spine_->WaitForCancel(), Map(spine_->server_trailing_metadata().receiver.Next(), [spine = spine_](NextResult md) @@ -376,19 +376,19 @@ class CallInitiator { } auto PullMessage() { - GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); return spine_->server_to_client_messages().receiver.Next(); } auto PushMessage(MessageHandle message) { - GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); return Map( spine_->client_to_server_messages().sender.Push(std::move(message)), [](bool r) { return StatusFlag(r); }); } void FinishSends() { - GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); spine_->client_to_server_messages().sender.Close(); } @@ -398,7 +398,7 @@ class CallInitiator { } void Cancel() { - GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); std::ignore = spine_->Cancel(ServerMetadataFromStatus(absl::CancelledError())); } @@ -430,7 +430,7 @@ class CallHandler { : spine_(std::move(spine)) {} auto PullClientInitialMetadata() { - GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); return Map(spine_->client_initial_metadata().receiver.Next(), [](NextResult md) -> ValueOrFailure { @@ -440,13 +440,13 @@ class CallHandler { } auto PushServerInitialMetadata(ServerMetadataHandle md) { - GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); return Map(spine_->server_initial_metadata().sender.Push(std::move(md)), [](bool ok) { return StatusFlag(ok); }); } auto PushServerTrailingMetadata(ServerMetadataHandle md) { - GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); spine_->server_to_client_messages().sender.Close(); spine_->CallOnDone(); return Map(spine_->server_trailing_metadata().sender.Push(std::move(md)), @@ -454,19 +454,19 @@ class CallHandler { } auto PullMessage() { - GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); return spine_->client_to_server_messages().receiver.Next(); } auto PushMessage(MessageHandle message) { - GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); return Map( spine_->server_to_client_messages().sender.Push(std::move(message)), [](bool ok) { return StatusFlag(ok); }); } void Cancel(ServerMetadataHandle status) { - GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); + GPR_DEBUG_ASSERT(GetContext() == &spine_->party()); std::ignore = spine_->Cancel(std::move(status)); } diff --git a/test/core/filters/filter_test.cc b/test/core/filters/filter_test.cc index cb2ec168b80..9089933336c 100644 --- a/test/core/filters/filter_test.cc +++ b/test/core/filters/filter_test.cc @@ -219,7 +219,7 @@ bool FilterTestBase::Call::Impl::StepOnce() { events().ForwardedMessageServerToClient(call_, *p->value()); } next_server_to_client_messages_.reset(); - Activity::current()->ForceImmediateRepoll(); + GetContext()->ForceImmediateRepoll(); } } @@ -229,7 +229,7 @@ bool FilterTestBase::Call::Impl::StepOnce() { server_to_client_messages_sender_->Push( std::move(forward_server_to_client_messages_.front()))); forward_server_to_client_messages_.pop(); - Activity::current()->ForceImmediateRepoll(); + GetContext()->ForceImmediateRepoll(); } } @@ -251,7 +251,7 @@ bool FilterTestBase::Call::Impl::StepOnce() { events().ForwardedMessageClientToServer(call_, *p->value()); } next_client_to_server_messages_.reset(); - Activity::current()->ForceImmediateRepoll(); + GetContext()->ForceImmediateRepoll(); } } @@ -261,7 +261,7 @@ bool FilterTestBase::Call::Impl::StepOnce() { pipe_client_to_server_messages_.sender.Push( std::move(forward_client_to_server_messages_.front()))); forward_client_to_server_messages_.pop(); - Activity::current()->ForceImmediateRepoll(); + GetContext()->ForceImmediateRepoll(); } } diff --git a/test/core/filters/filter_test_test.cc b/test/core/filters/filter_test_test.cc index 565914dd038..e0dad7ad671 100644 --- a/test/core/filters/filter_test_test.cc +++ b/test/core/filters/filter_test_test.cc @@ -64,7 +64,7 @@ class DelayStartFilter final : public ChannelFilter { [args = std::move(args), i = 10]() mutable -> Poll { --i; if (i == 0) return std::move(args); - Activity::current()->ForceImmediateRepoll(); + GetContext()->ForceImmediateRepoll(); return Pending{}; }, next); diff --git a/test/core/promise/activity_test.cc b/test/core/promise/activity_test.cc index 3f2726dfa53..eccd2c8c9bc 100644 --- a/test/core/promise/activity_test.cc +++ b/test/core/promise/activity_test.cc @@ -49,7 +49,7 @@ class Barrier { if (cleared_) { return Result{}; } else { - return wait_set_.AddPending(Activity::current()->MakeOwningWaker()); + return wait_set_.AddPending(GetContext()->MakeOwningWaker()); } }; } @@ -80,7 +80,7 @@ class SingleBarrier { if (cleared_) { return Result{}; } else { - waker_ = Activity::current()->MakeOwningWaker(); + waker_ = GetContext()->MakeOwningWaker(); return Pending(); } }; diff --git a/test/core/promise/context_test.cc b/test/core/promise/context_test.cc index 71d09aba858..6c2457ccd7e 100644 --- a/test/core/promise/context_test.cc +++ b/test/core/promise/context_test.cc @@ -42,6 +42,45 @@ TEST(Context, WithContext) { EXPECT_TRUE(test.done); } +class BaseContext { + public: + virtual int Answer() = 0; + + protected: + ~BaseContext() = default; +}; + +class CorrectContext final : public BaseContext { + public: + int Answer() override { return 42; } +}; + +class IncorrectContext final : public BaseContext { + public: + int Answer() override { return 0; } +}; + +template <> +struct ContextType {}; +template <> +struct ContextSubclass { + using Base = BaseContext; +}; +template <> +struct ContextSubclass { + using Base = BaseContext; +}; + +TEST(Context, ContextSubclass) { + CorrectContext correct; + IncorrectContext incorrect; + EXPECT_EQ(42, + WithContext([]() { return GetContext()->Answer(); }, + &correct)()); + EXPECT_EQ(0, WithContext([]() { return GetContext()->Answer(); }, + &incorrect)()); +} + } // namespace grpc_core int main(int argc, char** argv) { diff --git a/test/core/promise/for_each_test.cc b/test/core/promise/for_each_test.cc index 94287697d50..b251176a332 100644 --- a/test/core/promise/for_each_test.cc +++ b/test/core/promise/for_each_test.cc @@ -138,7 +138,7 @@ class MoveableUntilPolled { } Poll operator()() { - Activity::current()->ForceImmediateRepoll(); + GetContext()->ForceImmediateRepoll(); ++polls_; if (polls_ == 10) return absl::OkStatus(); return Pending(); diff --git a/test/core/promise/map_pipe_test.cc b/test/core/promise/map_pipe_test.cc index 8cd88aa6187..f8faa3d01bd 100644 --- a/test/core/promise/map_pipe_test.cc +++ b/test/core/promise/map_pipe_test.cc @@ -47,7 +47,7 @@ class Delayed { explicit Delayed(T x) : x_(x) {} Poll operator()() { - Activity::current()->ForceImmediateRepoll(); + GetContext()->ForceImmediateRepoll(); ++polls_; if (polls_ == 10) return std::move(x_); return Pending(); diff --git a/test/core/promise/party_test.cc b/test/core/promise/party_test.cc index 6f915c26805..6bdf6696da7 100644 --- a/test/core/promise/party_test.cc +++ b/test/core/promise/party_test.cc @@ -284,10 +284,10 @@ TEST_F(PartyTest, CanSpawnAndRun) { party->Spawn( "TestSpawn", [i = 10]() mutable -> Poll { - EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); + EXPECT_EQ(GetContext()->DebugTag(), "TestParty"); gpr_log(GPR_DEBUG, "i=%d", i); GPR_ASSERT(i > 0); - Activity::current()->ForceImmediateRepoll(); + GetContext()->ForceImmediateRepoll(); --i; if (i == 0) return 42; return Pending{}; @@ -331,12 +331,12 @@ TEST_F(PartyTest, CanSpawnFromSpawn) { party->Spawn( "TestSpawn", [party, &n2]() -> Poll { - EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); + EXPECT_EQ(GetContext()->DebugTag(), "TestParty"); party->Spawn( "TestSpawnInner", [i = 10]() mutable -> Poll { - EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); - Activity::current()->ForceImmediateRepoll(); + EXPECT_EQ(GetContext()->DebugTag(), "TestParty"); + GetContext()->ForceImmediateRepoll(); --i; if (i == 0) return 42; return Pending{}; @@ -363,8 +363,8 @@ TEST_F(PartyTest, CanWakeupWithOwningWaker) { party->Spawn( "TestSpawn", [i = 0, &waker, &n]() mutable -> Poll { - EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); - waker = Activity::current()->MakeOwningWaker(); + EXPECT_EQ(GetContext()->DebugTag(), "TestParty"); + waker = GetContext()->MakeOwningWaker(); n[i].Notify(); i++; if (i == 10) return 42; @@ -389,8 +389,8 @@ TEST_F(PartyTest, CanWakeupWithNonOwningWaker) { party->Spawn( "TestSpawn", [i = 10, &waker, &n]() mutable -> Poll { - EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); - waker = Activity::current()->MakeNonOwningWaker(); + EXPECT_EQ(GetContext()->DebugTag(), "TestParty"); + waker = GetContext()->MakeNonOwningWaker(); --i; n[9 - i].Notify(); if (i == 0) return 42; @@ -416,8 +416,8 @@ TEST_F(PartyTest, CanWakeupWithNonOwningWakerAfterOrphaning) { "TestSpawn", [&waker, &set_waker]() mutable -> Poll { EXPECT_FALSE(set_waker.HasBeenNotified()); - EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); - waker = Activity::current()->MakeNonOwningWaker(); + EXPECT_EQ(GetContext()->DebugTag(), "TestParty"); + waker = GetContext()->MakeNonOwningWaker(); set_waker.Notify(); return Pending{}; }, @@ -437,9 +437,9 @@ TEST_F(PartyTest, CanDropNonOwningWakeAfterOrphaning) { "TestSpawn", [&waker, &set_waker]() mutable -> Poll { EXPECT_FALSE(set_waker.HasBeenNotified()); - EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); - waker = - std::make_unique(Activity::current()->MakeNonOwningWaker()); + EXPECT_EQ(GetContext()->DebugTag(), "TestParty"); + waker = std::make_unique( + GetContext()->MakeNonOwningWaker()); set_waker.Notify(); return Pending{}; }, @@ -458,8 +458,8 @@ TEST_F(PartyTest, CanWakeupNonOwningOrphanedWakerWithNoEffect) { "TestSpawn", [&waker, &set_waker]() mutable -> Poll { EXPECT_FALSE(set_waker.HasBeenNotified()); - EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); - waker = Activity::current()->MakeNonOwningWaker(); + EXPECT_EQ(GetContext()->DebugTag(), "TestParty"); + waker = GetContext()->MakeNonOwningWaker(); set_waker.Notify(); return Pending{}; }, @@ -574,9 +574,9 @@ class PromiseNotification { if (done_) return 42; if (!polled_) { if (owning_waker_) { - waker_ = Activity::current()->MakeOwningWaker(); + waker_ = GetContext()->MakeOwningWaker(); } else { - waker_ = Activity::current()->MakeNonOwningWaker(); + waker_ = GetContext()->MakeNonOwningWaker(); } polled_ = true; } diff --git a/test/core/promise/promise_fuzzer.cc b/test/core/promise/promise_fuzzer.cc index db13409435f..b87efba4003 100644 --- a/test/core/promise/promise_fuzzer.cc +++ b/test/core/promise/promise_fuzzer.cc @@ -304,10 +304,10 @@ class Fuzzer { if (!called) { if (config.owning()) { wakers_[config.waker()].push_back( - Activity::current()->MakeOwningWaker()); + GetContext()->MakeOwningWaker()); } else { wakers_[config.waker()].push_back( - Activity::current()->MakeNonOwningWaker()); + GetContext()->MakeNonOwningWaker()); } return Pending(); } diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 0aae5307e93..6879f853694 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2413,6 +2413,7 @@ src/core/lib/gprpp/crash.cc \ src/core/lib/gprpp/crash.h \ src/core/lib/gprpp/debug_location.h \ src/core/lib/gprpp/directory_reader.h \ +src/core/lib/gprpp/down_cast.h \ src/core/lib/gprpp/dual_ref_counted.h \ src/core/lib/gprpp/env.h \ src/core/lib/gprpp/examine_stack.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 50073e7a7e1..312ff9a1b6d 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -2191,6 +2191,7 @@ src/core/lib/gprpp/crash.cc \ src/core/lib/gprpp/crash.h \ src/core/lib/gprpp/debug_location.h \ src/core/lib/gprpp/directory_reader.h \ +src/core/lib/gprpp/down_cast.h \ src/core/lib/gprpp/dual_ref_counted.h \ src/core/lib/gprpp/env.h \ src/core/lib/gprpp/examine_stack.cc \