[promise] Context improvements (#35592)

A few improvements to the promise context system (more coming)

Allow subclassed contexts:
If we have multiple different kinds of a base context, allow `GetContext<Derived>()` to mean `down_cast<Derived*>(GetContext<Base>())` everywhere for brevity.

Allow custom context lookup:
For a base context type, allow customization of how that context is looked up.

These two together allow:
1. normalization of activity lookup and context lookup to the same syntax (so we can write `GetContext<Activity>()` everywhere now
2. Party & Activity to share a context, so that anywhere we need to do a party specific operation we can write `GetContext<Party>()->...` and safely know that it's the current activity *and* it's a party.

Closes #35592

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35592 from ctiller:contextual-types 37ef948a36
PiperOrigin-RevId: 599651708
pull/35604/head
Craig Tiller 11 months ago committed by Copybara-Service
parent f37f7c9b4b
commit 67d6b8ea74
  1. 26
      CMakeLists.txt
  2. 1
      Package.swift
  3. 35
      build_autogenerated.yaml
  4. 3
      gRPC-C++.podspec
  5. 3
      gRPC-Core.podspec
  6. 1
      grpc.gemspec
  7. 3
      grpc.gyp
  8. 1
      package.xml
  9. 6
      src/core/BUILD
  10. 10
      src/core/ext/filters/client_channel/client_channel.cc
  11. 4
      src/core/ext/filters/http/message_compress/legacy_compression_filter.cc
  12. 2
      src/core/ext/filters/http/server/http_server_filter.cc
  13. 4
      src/core/ext/filters/message_size/message_size_filter.cc
  14. 25
      src/core/lib/channel/connected_channel.cc
  15. 8
      src/core/lib/channel/promise_based_filter.cc
  16. 10
      src/core/lib/promise/activity.h
  17. 52
      src/core/lib/promise/context.h
  18. 2
      src/core/lib/promise/for_each.h
  19. 5
      src/core/lib/promise/inter_activity_latch.h
  20. 4
      src/core/lib/promise/inter_activity_pipe.h
  21. 6
      src/core/lib/promise/latch.h
  22. 4
      src/core/lib/promise/mpsc.h
  23. 11
      src/core/lib/promise/party.h
  24. 4
      src/core/lib/promise/pipe.h
  25. 2
      src/core/lib/promise/sleep.cc
  26. 2
      src/core/lib/promise/wait_for_callback.h
  27. 6
      src/core/lib/resource_quota/memory_quota.cc
  28. 3
      src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
  29. 2
      src/core/lib/security/credentials/plugin/plugin_credentials.h
  30. 4
      src/core/lib/security/transport/legacy_server_auth_filter.cc
  31. 4
      src/core/lib/security/transport/server_auth_filter.cc
  32. 18
      src/core/lib/surface/call_trace.cc
  33. 2
      src/core/lib/surface/server.cc
  34. 2
      src/core/lib/surface/wait_for_cq_end_op.h
  35. 5
      src/core/lib/transport/batch_builder.cc
  36. 2
      src/core/lib/transport/batch_builder.h
  37. 4
      src/core/lib/transport/promise_endpoint.h
  38. 32
      src/core/lib/transport/transport.h
  39. 8
      test/core/filters/filter_test.cc
  40. 2
      test/core/filters/filter_test_test.cc
  41. 4
      test/core/promise/activity_test.cc
  42. 39
      test/core/promise/context_test.cc
  43. 2
      test/core/promise/for_each_test.cc
  44. 2
      test/core/promise/map_pipe_test.cc
  45. 36
      test/core/promise/party_test.cc
  46. 4
      test/core/promise/promise_fuzzer.cc
  47. 1
      tools/doxygen/Doxyfile.c++.internal
  48. 1
      tools/doxygen/Doxyfile.core.internal

26
CMakeLists.txt generated

@ -2617,6 +2617,7 @@ target_link_libraries(grpc
${_gRPC_RE2_LIBRARIES} ${_gRPC_RE2_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES} ${_gRPC_ZLIB_LIBRARIES}
absl::algorithm_container absl::algorithm_container
absl::config
absl::cleanup absl::cleanup
absl::flat_hash_map absl::flat_hash_map
absl::flat_hash_set absl::flat_hash_set
@ -3317,6 +3318,7 @@ target_link_libraries(grpc_unsecure
utf8_range_lib utf8_range_lib
${_gRPC_ZLIB_LIBRARIES} ${_gRPC_ZLIB_LIBRARIES}
absl::algorithm_container absl::algorithm_container
absl::config
absl::cleanup absl::cleanup
absl::flat_hash_map absl::flat_hash_map
absl::flat_hash_set absl::flat_hash_set
@ -5395,6 +5397,7 @@ target_link_libraries(grpc_authorization_provider
${_gRPC_RE2_LIBRARIES} ${_gRPC_RE2_LIBRARIES}
utf8_range_lib utf8_range_lib
${_gRPC_ZLIB_LIBRARIES} ${_gRPC_ZLIB_LIBRARIES}
absl::config
absl::cleanup absl::cleanup
absl::flat_hash_map absl::flat_hash_map
absl::flat_hash_set absl::flat_hash_set
@ -5906,6 +5909,7 @@ target_include_directories(activity_test
target_link_libraries(activity_test target_link_libraries(activity_test
${_gRPC_ALLTARGETS_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES}
gtest gtest
absl::config
absl::hash absl::hash
absl::type_traits absl::type_traits
absl::statusor absl::statusor
@ -8475,6 +8479,7 @@ target_link_libraries(call_filters_test
gtest gtest
upb_message_lib upb_message_lib
utf8_range_lib utf8_range_lib
absl::config
absl::inlined_vector absl::inlined_vector
absl::function_ref absl::function_ref
absl::hash absl::hash
@ -10059,6 +10064,7 @@ target_link_libraries(chunked_vector_test
gtest gtest
upb_message_lib upb_message_lib
utf8_range_lib utf8_range_lib
absl::config
absl::function_ref absl::function_ref
absl::hash absl::hash
absl::type_traits absl::type_traits
@ -11425,6 +11431,8 @@ target_include_directories(context_test
target_link_libraries(context_test target_link_libraries(context_test
${_gRPC_ALLTARGETS_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES}
gtest gtest
absl::config
absl::type_traits
gpr gpr
) )
@ -13146,6 +13154,7 @@ target_link_libraries(exec_ctx_wakeup_scheduler_test
gtest gtest
upb_message_lib upb_message_lib
utf8_range_lib utf8_range_lib
absl::config
absl::hash absl::hash
absl::type_traits absl::type_traits
absl::statusor absl::statusor
@ -13973,6 +13982,7 @@ target_link_libraries(flow_control_test
gtest gtest
upb_message_lib upb_message_lib
utf8_range_lib utf8_range_lib
absl::config
absl::function_ref absl::function_ref
absl::hash absl::hash
absl::type_traits absl::type_traits
@ -14058,6 +14068,7 @@ target_link_libraries(for_each_test
gtest gtest
upb_message_lib upb_message_lib
utf8_range_lib utf8_range_lib
absl::config
absl::function_ref absl::function_ref
absl::hash absl::hash
absl::type_traits absl::type_traits
@ -17331,6 +17342,7 @@ target_include_directories(inter_activity_pipe_test
target_link_libraries(inter_activity_pipe_test target_link_libraries(inter_activity_pipe_test
${_gRPC_ALLTARGETS_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES}
gtest gtest
absl::config
absl::hash absl::hash
absl::type_traits absl::type_traits
absl::statusor absl::statusor
@ -17414,6 +17426,7 @@ target_link_libraries(interceptor_list_test
gtest gtest
upb_message_lib upb_message_lib
utf8_range_lib utf8_range_lib
absl::config
absl::function_ref absl::function_ref
absl::hash absl::hash
absl::type_traits absl::type_traits
@ -18144,6 +18157,7 @@ target_include_directories(latch_test
target_link_libraries(latch_test target_link_libraries(latch_test
${_gRPC_ALLTARGETS_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES}
gtest gtest
absl::config
absl::hash absl::hash
absl::type_traits absl::type_traits
absl::statusor absl::statusor
@ -18536,6 +18550,7 @@ target_link_libraries(map_pipe_test
gtest gtest
upb_message_lib upb_message_lib
utf8_range_lib utf8_range_lib
absl::config
absl::function_ref absl::function_ref
absl::hash absl::hash
absl::type_traits absl::type_traits
@ -19408,6 +19423,7 @@ target_include_directories(mpsc_test
target_link_libraries(mpsc_test target_link_libraries(mpsc_test
${_gRPC_ALLTARGETS_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES}
gtest gtest
absl::config
absl::hash absl::hash
absl::type_traits absl::type_traits
absl::statusor absl::statusor
@ -21658,6 +21674,7 @@ target_include_directories(promise_mutex_test
target_link_libraries(promise_mutex_test target_link_libraries(promise_mutex_test
${_gRPC_ALLTARGETS_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES}
gtest gtest
absl::config
absl::hash absl::hash
absl::type_traits absl::type_traits
absl::statusor absl::statusor
@ -30760,6 +30777,7 @@ target_include_directories(wait_for_callback_test
target_link_libraries(wait_for_callback_test target_link_libraries(wait_for_callback_test
${_gRPC_ALLTARGETS_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES}
gtest gtest
absl::config
absl::hash absl::hash
absl::type_traits absl::type_traits
absl::statusor absl::statusor
@ -35274,7 +35292,7 @@ generate_pkgconfig(
"gRPC" "gRPC"
"high performance general RPC framework" "high performance general RPC framework"
"${gRPC_CORE_VERSION}" "${gRPC_CORE_VERSION}"
"absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr" "absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr"
"libcares openssl re2 zlib" "libcares openssl re2 zlib"
"-lgrpc" "-lgrpc"
"-laddress_sorting -lupb_textformat_lib -lupb_json_lib -lutf8_range_lib -lupb_message_lib -lupb_mem_lib -lupb_base_lib" "-laddress_sorting -lupb_textformat_lib -lupb_json_lib -lutf8_range_lib -lupb_message_lib -lupb_mem_lib -lupb_base_lib"
@ -35285,7 +35303,7 @@ generate_pkgconfig(
"gRPC unsecure" "gRPC unsecure"
"high performance general RPC framework without SSL" "high performance general RPC framework without SSL"
"${gRPC_CORE_VERSION}" "${gRPC_CORE_VERSION}"
"absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr" "absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr"
"libcares zlib" "libcares zlib"
"-lgrpc_unsecure" "-lgrpc_unsecure"
"-laddress_sorting -lutf8_range_lib -lupb_message_lib -lupb_mem_lib -lupb_base_lib" "-laddress_sorting -lutf8_range_lib -lupb_message_lib -lupb_mem_lib -lupb_base_lib"
@ -35296,7 +35314,7 @@ generate_pkgconfig(
"gRPC++" "gRPC++"
"C++ wrapper for gRPC" "C++ wrapper for gRPC"
"${gRPC_CPP_VERSION}" "${gRPC_CPP_VERSION}"
"absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr grpc" "absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr grpc"
"libcares openssl re2 zlib" "libcares openssl re2 zlib"
"-lgrpc++" "-lgrpc++"
"-laddress_sorting -lupb_textformat_lib -lupb_json_lib -lutf8_range_lib -lupb_message_lib -lupb_mem_lib -lupb_base_lib" "-laddress_sorting -lupb_textformat_lib -lupb_json_lib -lutf8_range_lib -lupb_message_lib -lupb_mem_lib -lupb_base_lib"
@ -35307,7 +35325,7 @@ generate_pkgconfig(
"gRPC++ unsecure" "gRPC++ unsecure"
"C++ wrapper for gRPC without SSL" "C++ wrapper for gRPC without SSL"
"${gRPC_CPP_VERSION}" "${gRPC_CPP_VERSION}"
"absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr grpc_unsecure" "absl_algorithm_container absl_any_invocable absl_base absl_bind_front absl_cleanup absl_config absl_cord absl_core_headers absl_flags absl_flags_marshalling absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_bit_gen_ref absl_random_distributions absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant gpr grpc_unsecure"
"libcares zlib" "libcares zlib"
"-lgrpc++_unsecure" "-lgrpc++_unsecure"
"-laddress_sorting -lutf8_range_lib -lupb_message_lib -lupb_mem_lib -lupb_base_lib" "-laddress_sorting -lutf8_range_lib -lupb_message_lib -lupb_mem_lib -lupb_base_lib"

1
Package.swift generated

@ -1408,6 +1408,7 @@ let package = Package(
"src/core/lib/gprpp/crash.h", "src/core/lib/gprpp/crash.h",
"src/core/lib/gprpp/debug_location.h", "src/core/lib/gprpp/debug_location.h",
"src/core/lib/gprpp/directory_reader.h", "src/core/lib/gprpp/directory_reader.h",
"src/core/lib/gprpp/down_cast.h",
"src/core/lib/gprpp/dual_ref_counted.h", "src/core/lib/gprpp/dual_ref_counted.h",
"src/core/lib/gprpp/env.h", "src/core/lib/gprpp/env.h",
"src/core/lib/gprpp/examine_stack.cc", "src/core/lib/gprpp/examine_stack.cc",

@ -930,6 +930,7 @@ libs:
- src/core/lib/gprpp/chunked_vector.h - src/core/lib/gprpp/chunked_vector.h
- src/core/lib/gprpp/cpp_impl_of.h - src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/directory_reader.h - src/core/lib/gprpp/directory_reader.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dual_ref_counted.h - src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/if_list.h - src/core/lib/gprpp/if_list.h
- src/core/lib/gprpp/load_file.h - src/core/lib/gprpp/load_file.h
@ -2024,6 +2025,7 @@ libs:
- re2 - re2
- z - z
- absl/algorithm:container - absl/algorithm:container
- absl/base:config
- absl/cleanup:cleanup - absl/cleanup:cleanup
- absl/container:flat_hash_map - absl/container:flat_hash_map
- absl/container:flat_hash_set - absl/container:flat_hash_set
@ -2406,6 +2408,7 @@ libs:
- src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/chunked_vector.h - src/core/lib/gprpp/chunked_vector.h
- src/core/lib/gprpp/cpp_impl_of.h - src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dual_ref_counted.h - src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/if_list.h - src/core/lib/gprpp/if_list.h
- src/core/lib/gprpp/load_file.h - src/core/lib/gprpp/load_file.h
@ -3069,6 +3072,7 @@ libs:
- utf8_range_lib - utf8_range_lib
- z - z
- absl/algorithm:container - absl/algorithm:container
- absl/base:config
- absl/cleanup:cleanup - absl/cleanup:cleanup
- absl/container:flat_hash_map - absl/container:flat_hash_map
- absl/container:flat_hash_set - absl/container:flat_hash_set
@ -4447,6 +4451,7 @@ libs:
- src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/chunked_vector.h - src/core/lib/gprpp/chunked_vector.h
- src/core/lib/gprpp/cpp_impl_of.h - src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dual_ref_counted.h - src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/if_list.h - src/core/lib/gprpp/if_list.h
- src/core/lib/gprpp/load_file.h - src/core/lib/gprpp/load_file.h
@ -4974,6 +4979,7 @@ libs:
- re2 - re2
- utf8_range_lib - utf8_range_lib
- z - z
- absl/base:config
- absl/cleanup:cleanup - absl/cleanup:cleanup
- absl/container:flat_hash_map - absl/container:flat_hash_map
- absl/container:flat_hash_set - absl/container:flat_hash_set
@ -5158,6 +5164,7 @@ targets:
- src/core/lib/debug/trace.h - src/core/lib/debug/trace.h
- src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h - src/core/lib/gprpp/ref_counted_ptr.h
@ -5184,6 +5191,7 @@ targets:
- test/core/promise/activity_test.cc - test/core/promise/activity_test.cc
deps: deps:
- gtest - gtest
- absl/base:config
- absl/hash:hash - absl/hash:hash
- absl/meta:type_traits - absl/meta:type_traits
- absl/status:statusor - absl/status:statusor
@ -6256,6 +6264,7 @@ targets:
- src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/chunked_vector.h - src/core/lib/gprpp/chunked_vector.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/dual_ref_counted.h - src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/if_list.h - src/core/lib/gprpp/if_list.h
- src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/manual_constructor.h
@ -6391,6 +6400,7 @@ targets:
- gtest - gtest
- upb_message_lib - upb_message_lib
- utf8_range_lib - utf8_range_lib
- absl/base:config
- absl/container:inlined_vector - absl/container:inlined_vector
- absl/functional:function_ref - absl/functional:function_ref
- absl/hash:hash - absl/hash:hash
@ -7334,6 +7344,7 @@ targets:
- src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/chunked_vector.h - src/core/lib/gprpp/chunked_vector.h
- src/core/lib/gprpp/cpp_impl_of.h - src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted.h
@ -7435,6 +7446,7 @@ targets:
- gtest - gtest
- upb_message_lib - upb_message_lib
- utf8_range_lib - utf8_range_lib
- absl/base:config
- absl/functional:function_ref - absl/functional:function_ref
- absl/hash:hash - absl/hash:hash
- absl/meta:type_traits - absl/meta:type_traits
@ -7960,11 +7972,14 @@ targets:
build: test build: test
language: c++ language: c++
headers: headers:
- src/core/lib/gprpp/down_cast.h
- src/core/lib/promise/context.h - src/core/lib/promise/context.h
src: src:
- test/core/promise/context_test.cc - test/core/promise/context_test.cc
deps: deps:
- gtest - gtest
- absl/base:config
- absl/meta:type_traits
- gpr - gpr
uses_polling: false uses_polling: false
- name: core_configuration_test - name: core_configuration_test
@ -8683,6 +8698,7 @@ targets:
- src/core/lib/gpr/spinlock.h - src/core/lib/gpr/spinlock.h
- src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted.h
@ -8764,6 +8780,7 @@ targets:
- gtest - gtest
- upb_message_lib - upb_message_lib
- utf8_range_lib - utf8_range_lib
- absl/base:config
- absl/hash:hash - absl/hash:hash
- absl/meta:type_traits - absl/meta:type_traits
- absl/status:statusor - absl/status:statusor
@ -9190,6 +9207,7 @@ targets:
- src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h - src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted.h
@ -9297,6 +9315,7 @@ targets:
- gtest - gtest
- upb_message_lib - upb_message_lib
- utf8_range_lib - utf8_range_lib
- absl/base:config
- absl/functional:function_ref - absl/functional:function_ref
- absl/hash:hash - absl/hash:hash
- absl/meta:type_traits - absl/meta:type_traits
@ -9321,6 +9340,7 @@ targets:
- src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h - src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted.h
@ -9432,6 +9452,7 @@ targets:
- gtest - gtest
- upb_message_lib - upb_message_lib
- utf8_range_lib - utf8_range_lib
- absl/base:config
- absl/functional:function_ref - absl/functional:function_ref
- absl/hash:hash - absl/hash:hash
- absl/meta:type_traits - absl/meta:type_traits
@ -10807,6 +10828,7 @@ targets:
headers: headers:
- src/core/lib/debug/trace.h - src/core/lib/debug/trace.h
- src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h - src/core/lib/gprpp/ref_counted_ptr.h
@ -10829,6 +10851,7 @@ targets:
- test/core/promise/inter_activity_pipe_test.cc - test/core/promise/inter_activity_pipe_test.cc
deps: deps:
- gtest - gtest
- absl/base:config
- absl/hash:hash - absl/hash:hash
- absl/meta:type_traits - absl/meta:type_traits
- absl/status:statusor - absl/status:statusor
@ -10851,6 +10874,7 @@ targets:
- src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h - src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted.h
@ -10954,6 +10978,7 @@ targets:
- gtest - gtest
- upb_message_lib - upb_message_lib
- utf8_range_lib - utf8_range_lib
- absl/base:config
- absl/functional:function_ref - absl/functional:function_ref
- absl/hash:hash - absl/hash:hash
- absl/meta:type_traits - absl/meta:type_traits
@ -11281,6 +11306,7 @@ targets:
- src/core/lib/debug/trace.h - src/core/lib/debug/trace.h
- src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h - src/core/lib/gprpp/ref_counted_ptr.h
@ -11306,6 +11332,7 @@ targets:
- test/core/promise/latch_test.cc - test/core/promise/latch_test.cc
deps: deps:
- gtest - gtest
- absl/base:config
- absl/hash:hash - absl/hash:hash
- absl/meta:type_traits - absl/meta:type_traits
- absl/status:statusor - absl/status:statusor
@ -11435,6 +11462,7 @@ targets:
- src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/cpp_impl_of.h - src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted.h
@ -11546,6 +11574,7 @@ targets:
- gtest - gtest
- upb_message_lib - upb_message_lib
- utf8_range_lib - utf8_range_lib
- absl/base:config
- absl/functional:function_ref - absl/functional:function_ref
- absl/hash:hash - absl/hash:hash
- absl/meta:type_traits - absl/meta:type_traits
@ -11925,6 +11954,7 @@ targets:
language: c++ language: c++
headers: headers:
- src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h - src/core/lib/gprpp/ref_counted_ptr.h
@ -11942,6 +11972,7 @@ targets:
- test/core/promise/mpsc_test.cc - test/core/promise/mpsc_test.cc
deps: deps:
- gtest - gtest
- absl/base:config
- absl/hash:hash - absl/hash:hash
- absl/meta:type_traits - absl/meta:type_traits
- absl/status:statusor - absl/status:statusor
@ -12954,6 +12985,7 @@ targets:
- src/core/lib/debug/trace.h - src/core/lib/debug/trace.h
- src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h - src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted.h
- src/core/lib/gprpp/ref_counted_ptr.h - src/core/lib/gprpp/ref_counted_ptr.h
@ -12980,6 +13012,7 @@ targets:
- test/core/promise/promise_mutex_test.cc - test/core/promise/promise_mutex_test.cc
deps: deps:
- gtest - gtest
- absl/base:config
- absl/hash:hash - absl/hash:hash
- absl/meta:type_traits - absl/meta:type_traits
- absl/status:statusor - absl/status:statusor
@ -17274,6 +17307,7 @@ targets:
language: c++ language: c++
headers: headers:
- src/core/lib/gprpp/atomic_utils.h - src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/down_cast.h
- src/core/lib/gprpp/notification.h - src/core/lib/gprpp/notification.h
- src/core/lib/gprpp/orphanable.h - src/core/lib/gprpp/orphanable.h
- src/core/lib/gprpp/ref_counted.h - src/core/lib/gprpp/ref_counted.h
@ -17292,6 +17326,7 @@ targets:
- test/core/promise/wait_for_callback_test.cc - test/core/promise/wait_for_callback_test.cc
deps: deps:
- gtest - gtest
- absl/base:config
- absl/hash:hash - absl/hash:hash
- absl/meta:type_traits - absl/meta:type_traits
- absl/status:statusor - absl/status:statusor

3
gRPC-C++.podspec generated

@ -219,6 +219,7 @@ Pod::Spec.new do |s|
abseil_version = '1.20230802.0' abseil_version = '1.20230802.0'
ss.dependency 'abseil/algorithm/container', abseil_version ss.dependency 'abseil/algorithm/container', abseil_version
ss.dependency 'abseil/base/base', abseil_version ss.dependency 'abseil/base/base', abseil_version
ss.dependency 'abseil/base/config', abseil_version
ss.dependency 'abseil/base/core_headers', abseil_version ss.dependency 'abseil/base/core_headers', abseil_version
ss.dependency 'abseil/cleanup/cleanup', abseil_version ss.dependency 'abseil/cleanup/cleanup', abseil_version
ss.dependency 'abseil/container/flat_hash_map', abseil_version ss.dependency 'abseil/container/flat_hash_map', abseil_version
@ -1012,6 +1013,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/crash.h', 'src/core/lib/gprpp/crash.h',
'src/core/lib/gprpp/debug_location.h', 'src/core/lib/gprpp/debug_location.h',
'src/core/lib/gprpp/directory_reader.h', 'src/core/lib/gprpp/directory_reader.h',
'src/core/lib/gprpp/down_cast.h',
'src/core/lib/gprpp/dual_ref_counted.h', 'src/core/lib/gprpp/dual_ref_counted.h',
'src/core/lib/gprpp/env.h', 'src/core/lib/gprpp/env.h',
'src/core/lib/gprpp/examine_stack.h', 'src/core/lib/gprpp/examine_stack.h',
@ -2259,6 +2261,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/crash.h', 'src/core/lib/gprpp/crash.h',
'src/core/lib/gprpp/debug_location.h', 'src/core/lib/gprpp/debug_location.h',
'src/core/lib/gprpp/directory_reader.h', 'src/core/lib/gprpp/directory_reader.h',
'src/core/lib/gprpp/down_cast.h',
'src/core/lib/gprpp/dual_ref_counted.h', 'src/core/lib/gprpp/dual_ref_counted.h',
'src/core/lib/gprpp/env.h', 'src/core/lib/gprpp/env.h',
'src/core/lib/gprpp/examine_stack.h', 'src/core/lib/gprpp/examine_stack.h',

3
gRPC-Core.podspec generated

@ -188,6 +188,7 @@ Pod::Spec.new do |s|
ss.dependency 'BoringSSL-GRPC', '0.0.31' ss.dependency 'BoringSSL-GRPC', '0.0.31'
ss.dependency 'abseil/algorithm/container', abseil_version ss.dependency 'abseil/algorithm/container', abseil_version
ss.dependency 'abseil/base/base', abseil_version ss.dependency 'abseil/base/base', abseil_version
ss.dependency 'abseil/base/config', abseil_version
ss.dependency 'abseil/base/core_headers', abseil_version ss.dependency 'abseil/base/core_headers', abseil_version
ss.dependency 'abseil/cleanup/cleanup', abseil_version ss.dependency 'abseil/cleanup/cleanup', abseil_version
ss.dependency 'abseil/container/flat_hash_map', abseil_version ss.dependency 'abseil/container/flat_hash_map', abseil_version
@ -1511,6 +1512,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/crash.h', 'src/core/lib/gprpp/crash.h',
'src/core/lib/gprpp/debug_location.h', 'src/core/lib/gprpp/debug_location.h',
'src/core/lib/gprpp/directory_reader.h', 'src/core/lib/gprpp/directory_reader.h',
'src/core/lib/gprpp/down_cast.h',
'src/core/lib/gprpp/dual_ref_counted.h', 'src/core/lib/gprpp/dual_ref_counted.h',
'src/core/lib/gprpp/env.h', 'src/core/lib/gprpp/env.h',
'src/core/lib/gprpp/examine_stack.cc', 'src/core/lib/gprpp/examine_stack.cc',
@ -3033,6 +3035,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/crash.h', 'src/core/lib/gprpp/crash.h',
'src/core/lib/gprpp/debug_location.h', 'src/core/lib/gprpp/debug_location.h',
'src/core/lib/gprpp/directory_reader.h', 'src/core/lib/gprpp/directory_reader.h',
'src/core/lib/gprpp/down_cast.h',
'src/core/lib/gprpp/dual_ref_counted.h', 'src/core/lib/gprpp/dual_ref_counted.h',
'src/core/lib/gprpp/env.h', 'src/core/lib/gprpp/env.h',
'src/core/lib/gprpp/examine_stack.h', 'src/core/lib/gprpp/examine_stack.h',

1
grpc.gemspec generated

@ -1414,6 +1414,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/gprpp/crash.h ) s.files += %w( src/core/lib/gprpp/crash.h )
s.files += %w( src/core/lib/gprpp/debug_location.h ) s.files += %w( src/core/lib/gprpp/debug_location.h )
s.files += %w( src/core/lib/gprpp/directory_reader.h ) s.files += %w( src/core/lib/gprpp/directory_reader.h )
s.files += %w( src/core/lib/gprpp/down_cast.h )
s.files += %w( src/core/lib/gprpp/dual_ref_counted.h ) s.files += %w( src/core/lib/gprpp/dual_ref_counted.h )
s.files += %w( src/core/lib/gprpp/env.h ) s.files += %w( src/core/lib/gprpp/env.h )
s.files += %w( src/core/lib/gprpp/examine_stack.cc ) s.files += %w( src/core/lib/gprpp/examine_stack.cc )

3
grpc.gyp generated

@ -254,6 +254,7 @@
're2', 're2',
'z', 'z',
'absl/algorithm:container', 'absl/algorithm:container',
'absl/base:config',
'absl/cleanup:cleanup', 'absl/cleanup:cleanup',
'absl/container:flat_hash_map', 'absl/container:flat_hash_map',
'absl/container:flat_hash_set', 'absl/container:flat_hash_set',
@ -1120,6 +1121,7 @@
'utf8_range_lib', 'utf8_range_lib',
'z', 'z',
'absl/algorithm:container', 'absl/algorithm:container',
'absl/base:config',
'absl/cleanup:cleanup', 'absl/cleanup:cleanup',
'absl/container:flat_hash_map', 'absl/container:flat_hash_map',
'absl/container:flat_hash_set', 'absl/container:flat_hash_set',
@ -2008,6 +2010,7 @@
're2', 're2',
'utf8_range_lib', 'utf8_range_lib',
'z', 'z',
'absl/base:config',
'absl/cleanup:cleanup', 'absl/cleanup:cleanup',
'absl/container:flat_hash_map', 'absl/container:flat_hash_map',
'absl/container:flat_hash_set', 'absl/container:flat_hash_set',

1
package.xml generated

@ -1396,6 +1396,7 @@
<file baseinstalldir="/" name="src/core/lib/gprpp/crash.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/crash.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/debug_location.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/debug_location.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/directory_reader.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/directory_reader.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/down_cast.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/dual_ref_counted.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/dual_ref_counted.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/env.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/env.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/examine_stack.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/examine_stack.cc" role="src" />

@ -495,11 +495,15 @@ grpc_cc_library(
grpc_cc_library( grpc_cc_library(
name = "context", name = "context",
external_deps = ["absl/meta:type_traits"],
language = "c++", language = "c++",
public_hdrs = [ public_hdrs = [
"lib/promise/context.h", "lib/promise/context.h",
], ],
deps = ["//:gpr"], deps = [
"down_cast",
"//:gpr",
],
) )
grpc_cc_library( grpc_cc_library(

@ -332,7 +332,7 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData {
GRPC_CHANNEL_IDLE)) { GRPC_CHANNEL_IDLE)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: %striggering exit idle", chand_, gpr_log(GPR_INFO, "chand=%p calld=%p: %striggering exit idle", chand_,
this, Activity::current()->DebugTag().c_str()); this, GetContext<Activity>()->DebugTag().c_str());
} }
// Bounce into the control plane work serializer to start resolving. // Bounce into the control plane work serializer to start resolving.
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExitIdle"); GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExitIdle");
@ -349,7 +349,7 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData {
auto result = CheckResolution(was_queued_); auto result = CheckResolution(was_queued_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: %sCheckResolution returns %s", gpr_log(GPR_INFO, "chand=%p calld=%p: %sCheckResolution returns %s",
chand_, this, Activity::current()->DebugTag().c_str(), chand_, this, GetContext<Activity>()->DebugTag().c_str(),
result.has_value() ? result->ToString().c_str() : "Pending"); result.has_value() ? result->ToString().c_str() : "Pending");
} }
if (!result.has_value()) return Pending{}; if (!result.has_value()) return Pending{};
@ -372,7 +372,7 @@ class ClientChannel::PromiseBasedCallData : public ClientChannel::CallData {
void OnAddToQueueLocked() override void OnAddToQueueLocked() override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) { ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_) {
waker_ = Activity::current()->MakeNonOwningWaker(); waker_ = GetContext<Activity>()->MakeNonOwningWaker();
was_queued_ = true; was_queued_ = true;
} }
@ -3541,7 +3541,7 @@ ClientChannel::PromiseBasedLoadBalancedCall::MakeCallPromise(
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"chand=%p lb_call=%p: %sPickSubchannel() returns %s", "chand=%p lb_call=%p: %sPickSubchannel() returns %s",
chand(), this, chand(), this,
Activity::current()->DebugTag().c_str(), GetContext<Activity>()->DebugTag().c_str(),
result.has_value() ? result->ToString().c_str() result.has_value() ? result->ToString().c_str()
: "Pending"); : "Pending");
} }
@ -3624,7 +3624,7 @@ ClientChannel::PromiseBasedLoadBalancedCall::send_initial_metadata() const {
} }
void ClientChannel::PromiseBasedLoadBalancedCall::OnAddToQueueLocked() { void ClientChannel::PromiseBasedLoadBalancedCall::OnAddToQueueLocked() {
waker_ = Activity::current()->MakeNonOwningWaker(); waker_ = GetContext<Activity>()->MakeNonOwningWaker();
was_queued_ = true; was_queued_ = true;
} }

@ -291,7 +291,7 @@ LegacyServerCompressionFilter::MakeCallPromise(
auto r = DecompressMessage(std::move(message), decompress_args); auto r = DecompressMessage(std::move(message), decompress_args);
if (grpc_call_trace.enabled()) { if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[compression] DecompressMessage returned %s", gpr_log(GPR_DEBUG, "%s[compression] DecompressMessage returned %s",
Activity::current()->DebugTag().c_str(), GetContext<Activity>()->DebugTag().c_str(),
r.status().ToString().c_str()); r.status().ToString().c_str());
} }
if (!r.ok()) { if (!r.ok()) {
@ -306,7 +306,7 @@ LegacyServerCompressionFilter::MakeCallPromise(
[this, compression_algorithm](ServerMetadataHandle md) { [this, compression_algorithm](ServerMetadataHandle md) {
if (grpc_call_trace.enabled()) { if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%s[compression] Write metadata", gpr_log(GPR_INFO, "%s[compression] Write metadata",
Activity::current()->DebugTag().c_str()); GetContext<Activity>()->DebugTag().c_str());
} }
// Find the compression algorithm. // Find the compression algorithm.
*compression_algorithm = HandleOutgoingMetadata(*md); *compression_algorithm = HandleOutgoingMetadata(*md);

@ -141,7 +141,7 @@ ServerMetadataHandle HttpServerFilter::Call::OnClientInitialMetadata(
void HttpServerFilter::Call::OnServerInitialMetadata(ServerMetadata& md) { void HttpServerFilter::Call::OnServerInitialMetadata(ServerMetadata& md) {
if (grpc_call_trace.enabled()) { if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%s[http-server] Write metadata", gpr_log(GPR_INFO, "%s[http-server] Write metadata",
Activity::current()->DebugTag().c_str()); GetContext<Activity>()->DebugTag().c_str());
} }
FilterOutgoingMetadata(&md); FilterOutgoingMetadata(&md);
md.Set(HttpStatusMetadata(), 200); md.Set(HttpStatusMetadata(), 200);

@ -164,8 +164,8 @@ ServerMetadataHandle CheckPayload(const Message& msg,
if (!max_length.has_value()) return nullptr; if (!max_length.has_value()) return nullptr;
if (GRPC_TRACE_FLAG_ENABLED(grpc_call_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_call_trace)) {
gpr_log(GPR_INFO, "%s[message_size] %s len:%" PRIdPTR " max:%d", gpr_log(GPR_INFO, "%s[message_size] %s len:%" PRIdPTR " max:%d",
Activity::current()->DebugTag().c_str(), is_send ? "send" : "recv", GetContext<Activity>()->DebugTag().c_str(),
msg.payload()->Length(), *max_length); is_send ? "send" : "recv", msg.payload()->Length(), *max_length);
} }
if (msg.payload()->Length() <= *max_length) return nullptr; if (msg.payload()->Length() <= *max_length) return nullptr;
auto r = GetContext<Arena>()->MakePooled<ServerMetadata>(GetContext<Arena>()); auto r = GetContext<Arena>()->MakePooled<ServerMetadata>(GetContext<Arena>());

@ -364,7 +364,7 @@ class ConnectedChannelStream : public Orphanable {
grpc_stream_refcount stream_refcount_; grpc_stream_refcount stream_refcount_;
StreamPtr stream_; StreamPtr stream_;
Arena* arena_ = GetContext<Arena>(); Arena* arena_ = GetContext<Arena>();
Party* const party_ = static_cast<Party*>(Activity::current()); Party* const party_ = GetContext<Party>();
ExternallyObservableLatch<void> finished_; ExternallyObservableLatch<void> finished_;
}; };
@ -383,17 +383,18 @@ auto ConnectedChannelStream::RecvMessages(
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"%s[connected] RecvMessage: received payload of %" PRIdPTR "%s[connected] RecvMessage: received payload of %" PRIdPTR
" bytes", " bytes",
Activity::current()->DebugTag().c_str(), GetContext<Activity>()->DebugTag().c_str(),
pending_message->payload()->Length()); pending_message->payload()->Length());
} }
return Map(incoming_messages.Push(std::move(pending_message)), return Map(incoming_messages.Push(std::move(pending_message)),
[](bool ok) -> LoopCtl<absl::Status> { [](bool ok) -> LoopCtl<absl::Status> {
if (!ok) { if (!ok) {
if (grpc_call_trace.enabled()) { if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(
"%s[connected] RecvMessage: failed to " GPR_INFO,
"push message towards the application", "%s[connected] RecvMessage: failed to "
Activity::current()->DebugTag().c_str()); "push message towards the application",
GetContext<Activity>()->DebugTag().c_str());
} }
return absl::OkStatus(); return absl::OkStatus();
} }
@ -406,7 +407,7 @@ auto ConnectedChannelStream::RecvMessages(
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"%s[connected] RecvMessage: reached end of stream with " "%s[connected] RecvMessage: reached end of stream with "
"status:%s", "status:%s",
Activity::current()->DebugTag().c_str(), GetContext<Activity>()->DebugTag().c_str(),
status.status().ToString().c_str()); status.status().ToString().c_str());
} }
if (cancel_on_error && !status.ok()) { if (cancel_on_error && !status.ok()) {
@ -444,7 +445,7 @@ ArenaPromise<ServerMetadataHandle> MakeClientCallPromise(Transport* transport,
transport->filter_stack_transport()->InitStream(stream->stream(), transport->filter_stack_transport()->InitStream(stream->stream(),
stream->stream_refcount(), stream->stream_refcount(),
nullptr, GetContext<Arena>()); nullptr, GetContext<Arena>());
auto* party = static_cast<Party*>(Activity::current()); auto* party = GetContext<Party>();
party->Spawn("set_polling_entity", call_args.polling_entity->Wait(), party->Spawn("set_polling_entity", call_args.polling_entity->Wait(),
[transport, stream = stream->InternalRef()]( [transport, stream = stream->InternalRef()](
grpc_polling_entity polling_entity) { grpc_polling_entity polling_entity) {
@ -474,7 +475,7 @@ ArenaPromise<ServerMetadataHandle> MakeClientCallPromise(Transport* transport,
if (grpc_call_trace.enabled()) { if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"%s[connected] Publish client initial metadata: %s", "%s[connected] Publish client initial metadata: %s",
Activity::current()->DebugTag().c_str(), GetContext<Activity>()->DebugTag().c_str(),
server_initial_metadata->DebugString().c_str()); server_initial_metadata->DebugString().c_str());
} }
return Map(pipe->Push(std::move(server_initial_metadata)), return Map(pipe->Push(std::move(server_initial_metadata)),
@ -581,7 +582,7 @@ ArenaPromise<ServerMetadataHandle> MakeServerCallPromise(
stream->stream(), stream->stream_refcount(), stream->stream(), stream->stream_refcount(),
GetContext<CallContext>()->server_call_context()->server_stream_data(), GetContext<CallContext>()->server_call_context()->server_stream_data(),
GetContext<Arena>()); GetContext<Arena>());
auto* party = static_cast<Party*>(Activity::current()); auto* party = GetContext<Party>();
// Arifacts we need for the lifetime of the call. // Arifacts we need for the lifetime of the call.
struct CallData { struct CallData {
@ -747,7 +748,7 @@ ArenaPromise<ServerMetadataHandle> MakeServerCallPromise(
: nullptr; : nullptr;
if (md != nullptr) { if (md != nullptr) {
call_data->sent_initial_metadata = true; call_data->sent_initial_metadata = true;
auto* party = static_cast<Party*>(Activity::current()); auto* party = GetContext<Party>();
party->Spawn("connected/send_initial_metadata", party->Spawn("connected/send_initial_metadata",
GetContext<BatchBuilder>()->SendServerInitialMetadata( GetContext<BatchBuilder>()->SendServerInitialMetadata(
stream->batch_target(), std::move(md)), stream->batch_target(), std::move(md)),
@ -777,7 +778,7 @@ ArenaPromise<ServerMetadataHandle> MakeServerCallPromise(
gpr_log( gpr_log(
GPR_DEBUG, GPR_DEBUG,
"%s[connected] Got trailing metadata; status=%s metadata=%s", "%s[connected] Got trailing metadata; status=%s metadata=%s",
Activity::current()->DebugTag().c_str(), GetContext<Activity>()->DebugTag().c_str(),
status.status().ToString().c_str(), status.status().ToString().c_str(),
status.ok() ? (*status)->DebugString().c_str() : "<none>"); status.ok() ? (*status)->DebugString().c_str() : "<none>");
} }

@ -374,11 +374,11 @@ void BaseCallData::SendMessage::GotPipe(T* pipe_end) {
switch (state_) { switch (state_) {
case State::kInitial: case State::kInitial:
state_ = State::kIdle; state_ = State::kIdle;
Activity::current()->ForceImmediateRepoll(); GetContext<Activity>()->ForceImmediateRepoll();
break; break;
case State::kGotBatchNoPipe: case State::kGotBatchNoPipe:
state_ = State::kGotBatch; state_ = State::kGotBatch;
Activity::current()->ForceImmediateRepoll(); GetContext<Activity>()->ForceImmediateRepoll();
break; break;
case State::kIdle: case State::kIdle:
case State::kGotBatch: case State::kGotBatch:
@ -567,7 +567,7 @@ void BaseCallData::SendMessage::WakeInsideCombiner(Flusher* flusher,
} }
if (completed_status_.ok()) { if (completed_status_.ok()) {
state_ = State::kIdle; state_ = State::kIdle;
Activity::current()->ForceImmediateRepoll(); GetContext<Activity>()->ForceImmediateRepoll();
} else { } else {
state_ = State::kCancelled; state_ = State::kCancelled;
} }
@ -675,7 +675,7 @@ void BaseCallData::ReceiveMessage::GotPipe(T* pipe_end) {
break; break;
case State::kBatchCompletedNoPipe: case State::kBatchCompletedNoPipe:
state_ = State::kBatchCompleted; state_ = State::kBatchCompleted;
Activity::current()->ForceImmediateRepoll(); GetContext<Activity>()->ForceImmediateRepoll();
break; break;
case State::kIdle: case State::kIdle:
case State::kForwardedBatch: case State::kForwardedBatch:

@ -282,6 +282,12 @@ class ContextHolder<std::unique_ptr<Context, Deleter>> {
std::unique_ptr<Context, Deleter> value_; std::unique_ptr<Context, Deleter> value_;
}; };
template <>
class Context<Activity> {
public:
static Activity* get() { return Activity::current(); }
};
template <typename HeldContext> template <typename HeldContext>
using ContextTypeFromHeld = typename ContextHolder<HeldContext>::ContextType; using ContextTypeFromHeld = typename ContextHolder<HeldContext>::ContextType;
@ -632,13 +638,13 @@ ActivityPtr MakeActivity(Factory promise_factory,
} }
inline Pending IntraActivityWaiter::pending() { inline Pending IntraActivityWaiter::pending() {
wakeups_ |= Activity::current()->CurrentParticipant(); wakeups_ |= GetContext<Activity>()->CurrentParticipant();
return Pending(); return Pending();
} }
inline void IntraActivityWaiter::Wake() { inline void IntraActivityWaiter::Wake() {
if (wakeups_ == 0) return; if (wakeups_ == 0) return;
Activity::current()->ForceImmediateRepoll(std::exchange(wakeups_, 0)); GetContext<Activity>()->ForceImmediateRepoll(std::exchange(wakeups_, 0));
} }
} // namespace grpc_core } // namespace grpc_core

@ -19,8 +19,12 @@
#include <utility> #include <utility>
#include "absl/meta/type_traits.h"
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/lib/gprpp/down_cast.h"
namespace grpc_core { namespace grpc_core {
// To avoid accidentally creating context types, we require an explicit // To avoid accidentally creating context types, we require an explicit
@ -28,17 +32,35 @@ namespace grpc_core {
// not contain any members, only exist. // not contain any members, only exist.
// The reason for avoiding this is that context types each use a thread local. // The reason for avoiding this is that context types each use a thread local.
template <typename T> template <typename T>
struct ContextType; // IWYU pragma: keep struct ContextType;
// Some contexts can be subclassed. If the subclass is set as that context
// then GetContext<Base>() will return the base, and GetContext<Derived>() will
// down_cast to the derived type.
// Specializations of this type should be created for each derived type, and
// should have a single using statement Base pointing to the derived base class.
// Example:
// class SomeContext {};
// class SomeDerivedContext : public SomeContext {};
// template <> struct ContextType<SomeContext> {};
// template <> struct ContextSubclass<SomeDerivedContext> {
// using Base = SomeContext;
// };
template <typename Derived>
struct ContextSubclass;
namespace promise_detail { namespace promise_detail {
template <typename T, typename = void>
class Context;
template <typename T> template <typename T>
class Context : public ContextType<T> { class ThreadLocalContext : public ContextType<T> {
public: public:
explicit Context(T* p) : old_(current_) { current_ = p; } explicit ThreadLocalContext(T* p) : old_(current_) { current_ = p; }
~Context() { current_ = old_; } ~ThreadLocalContext() { current_ = old_; }
Context(const Context&) = delete; ThreadLocalContext(const ThreadLocalContext&) = delete;
Context& operator=(const Context&) = delete; ThreadLocalContext& operator=(const ThreadLocalContext&) = delete;
static T* get() { return current_; } static T* get() { return current_; }
@ -48,7 +70,23 @@ class Context : public ContextType<T> {
}; };
template <typename T> template <typename T>
thread_local T* Context<T>::current_; thread_local T* ThreadLocalContext<T>::current_;
template <typename T>
class Context<T, absl::void_t<decltype(ContextType<T>())>>
: public ThreadLocalContext<T> {
using ThreadLocalContext<T>::ThreadLocalContext;
};
template <typename T>
class Context<T, absl::void_t<typename ContextSubclass<T>::Base>>
: public Context<typename ContextSubclass<T>::Base> {
public:
using Context<typename ContextSubclass<T>::Base>::Context;
static T* get() {
return down_cast<T*>(Context<typename ContextSubclass<T>::Base>::get());
}
};
template <typename T, typename F> template <typename T, typename F>
class WithContext { class WithContext {

@ -118,7 +118,7 @@ class ForEach {
}; };
std::string DebugTag() { std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(), " FOR_EACH[0x", return absl::StrCat(GetContext<Activity>()->DebugTag(), " FOR_EACH[0x",
reinterpret_cast<uintptr_t>(this), "]: "); reinterpret_cast<uintptr_t>(this), "]: ");
} }

@ -56,7 +56,8 @@ class InterActivityLatch<void> {
if (is_set_) { if (is_set_) {
return Empty{}; return Empty{};
} else { } else {
return waiters_.AddPending(Activity::current()->MakeNonOwningWaker()); return waiters_.AddPending(
GetContext<Activity>()->MakeNonOwningWaker());
} }
}; };
} }
@ -78,7 +79,7 @@ class InterActivityLatch<void> {
private: private:
std::string DebugTag() { std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(), return absl::StrCat(GetContext<Activity>()->DebugTag(),
" INTER_ACTIVITY_LATCH[0x", " INTER_ACTIVITY_LATCH[0x",
reinterpret_cast<uintptr_t>(this), "]: "); reinterpret_cast<uintptr_t>(this), "]: ");
} }

@ -42,7 +42,7 @@ class InterActivityPipe {
ReleasableMutexLock lock(&mu_); ReleasableMutexLock lock(&mu_);
if (closed_) return false; if (closed_) return false;
if (count_ == kQueueSize) { if (count_ == kQueueSize) {
on_available_ = Activity::current()->MakeNonOwningWaker(); on_available_ = GetContext<Activity>()->MakeNonOwningWaker();
return Pending{}; return Pending{};
} }
queue_[(first_ + count_) % kQueueSize] = std::move(value); queue_[(first_ + count_) % kQueueSize] = std::move(value);
@ -59,7 +59,7 @@ class InterActivityPipe {
ReleasableMutexLock lock(&mu_); ReleasableMutexLock lock(&mu_);
if (count_ == 0) { if (count_ == 0) {
if (closed_) return absl::nullopt; if (closed_) return absl::nullopt;
on_occupied_ = Activity::current()->MakeNonOwningWaker(); on_occupied_ = GetContext<Activity>()->MakeNonOwningWaker();
return Pending{}; return Pending{};
} }
auto value = std::move(queue_[first_]); auto value = std::move(queue_[first_]);

@ -113,7 +113,7 @@ class Latch {
private: private:
std::string DebugTag() { std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(), " LATCH[0x", return absl::StrCat(GetContext<Activity>()->DebugTag(), " LATCH[0x",
reinterpret_cast<uintptr_t>(this), "]: "); reinterpret_cast<uintptr_t>(this), "]: ");
} }
@ -189,7 +189,7 @@ class Latch<void> {
private: private:
std::string DebugTag() { std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(), " LATCH(void)[0x", return absl::StrCat(GetContext<Activity>()->DebugTag(), " LATCH(void)[0x",
reinterpret_cast<uintptr_t>(this), "]: "); reinterpret_cast<uintptr_t>(this), "]: ");
} }
@ -259,7 +259,7 @@ class ExternallyObservableLatch<void> {
private: private:
std::string DebugTag() { std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(), " LATCH(void)[0x", return absl::StrCat(GetContext<Activity>()->DebugTag(), " LATCH(void)[0x",
reinterpret_cast<uintptr_t>(this), "]: "); reinterpret_cast<uintptr_t>(this), "]: ");
} }

@ -62,7 +62,7 @@ class Center : public RefCounted<Center<T>> {
bool PollReceiveBatch(std::vector<T>& dest) { bool PollReceiveBatch(std::vector<T>& dest) {
ReleasableMutexLock lock(&mu_); ReleasableMutexLock lock(&mu_);
if (queue_.empty()) { if (queue_.empty()) {
receive_waker_ = Activity::current()->MakeNonOwningWaker(); receive_waker_ = GetContext<Activity>()->MakeNonOwningWaker();
return false; return false;
} }
dest.swap(queue_); dest.swap(queue_);
@ -87,7 +87,7 @@ class Center : public RefCounted<Center<T>> {
receive_waker.Wakeup(); receive_waker.Wakeup();
return Poll<bool>(true); return Poll<bool>(true);
} }
send_wakers_.AddPending(Activity::current()->MakeNonOwningWaker()); send_wakers_.AddPending(GetContext<Activity>()->MakeNonOwningWaker());
return Pending{}; return Pending{};
} }

@ -385,10 +385,10 @@ class Party : public Activity, private Wakeable {
// This is useful for implementing batching and the like: we can hold some // This is useful for implementing batching and the like: we can hold some
// action until the rest of the party resolves itself. // action until the rest of the party resolves itself.
auto AfterCurrentPoll() { auto AfterCurrentPoll() {
GPR_DEBUG_ASSERT(Activity::current() == this); GPR_DEBUG_ASSERT(GetContext<Activity>() == this);
sync_.WakeAfterPoll(CurrentParticipant()); sync_.WakeAfterPoll(CurrentParticipant());
return [this, iteration = sync_.iteration()]() -> Poll<Empty> { return [this, iteration = sync_.iteration()]() -> Poll<Empty> {
GPR_DEBUG_ASSERT(Activity::current() == this); GPR_DEBUG_ASSERT(GetContext<Activity>() == this);
if (iteration == sync_.iteration()) return Pending{}; if (iteration == sync_.iteration()) return Pending{};
return Empty{}; return Empty{};
}; };
@ -561,7 +561,7 @@ class Party : public Activity, private Wakeable {
GPR_NO_UNIQUE_ADDRESS Promise promise_; GPR_NO_UNIQUE_ADDRESS Promise promise_;
GPR_NO_UNIQUE_ADDRESS Result result_; GPR_NO_UNIQUE_ADDRESS Result result_;
}; };
Waker waiter_{Activity::current()->MakeOwningWaker()}; Waker waiter_{GetContext<Activity>()->MakeOwningWaker()};
std::atomic<State> state_{State::kFactory}; std::atomic<State> state_{State::kFactory};
}; };
@ -608,6 +608,11 @@ class Party : public Activity, private Wakeable {
std::atomic<Participant*> participants_[party_detail::kMaxParticipants] = {}; std::atomic<Participant*> participants_[party_detail::kMaxParticipants] = {};
}; };
template <>
struct ContextSubclass<Party> {
using Base = Activity;
};
template <typename Factory, typename OnComplete> template <typename Factory, typename OnComplete>
void Party::BulkSpawner::Spawn(absl::string_view name, Factory promise_factory, void Party::BulkSpawner::Spawn(absl::string_view name, Factory promise_factory,
OnComplete on_complete) { OnComplete on_complete) {

@ -374,7 +374,7 @@ class Center : public InterceptorList<T> {
const T& value() const { return value_; } const T& value() const { return value_; }
std::string DebugTag() { std::string DebugTag() {
if (auto* activity = Activity::current()) { if (auto* activity = GetContext<Activity>()) {
return absl::StrCat(activity->DebugTag(), " PIPE[0x", absl::Hex(this), return absl::StrCat(activity->DebugTag(), " PIPE[0x", absl::Hex(this),
"]: "); "]: ");
} else { } else {
@ -661,7 +661,7 @@ class Push {
if (center_ == nullptr) { if (center_ == nullptr) {
if (grpc_trace_promise_primitives.enabled()) { if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_DEBUG, "%s Pipe push has a null center", gpr_log(GPR_DEBUG, "%s Pipe push has a null center",
Activity::current()->DebugTag().c_str()); GetContext<Activity>()->DebugTag().c_str());
} }
return false; return false;
} }

@ -54,7 +54,7 @@ Poll<absl::Status> Sleep::operator()() {
} }
Sleep::ActiveClosure::ActiveClosure(Timestamp deadline) Sleep::ActiveClosure::ActiveClosure(Timestamp deadline)
: waker_(Activity::current()->MakeOwningWaker()), : waker_(GetContext<Activity>()->MakeOwningWaker()),
timer_handle_(GetContext<EventEngine>()->RunAfter( timer_handle_(GetContext<EventEngine>()->RunAfter(
deadline - Timestamp::Now(), this)) {} deadline - Timestamp::Now(), this)) {}

@ -39,7 +39,7 @@ class WaitForCallback {
return [state = state_]() -> Poll<Empty> { return [state = state_]() -> Poll<Empty> {
MutexLock lock(&state->mutex); MutexLock lock(&state->mutex);
if (state->done) return Empty{}; if (state->done) return Empty{};
state->waker = Activity::current()->MakeNonOwningWaker(); state->waker = GetContext<Activity>()->MakeNonOwningWaker();
return Pending{}; return Pending{};
}; };
} }

@ -232,10 +232,10 @@ Poll<RefCountedPtr<ReclaimerQueue::Handle>> ReclaimerQueue::PollNext() {
if (!empty) { if (!empty) {
// If we don't, but the queue is probably not empty, schedule an immediate // If we don't, but the queue is probably not empty, schedule an immediate
// repoll. // repoll.
Activity::current()->ForceImmediateRepoll(); GetContext<Activity>()->ForceImmediateRepoll();
} else { } else {
// Otherwise, schedule a wakeup for whenever something is pushed. // Otherwise, schedule a wakeup for whenever something is pushed.
state_->waker = Activity::current()->MakeNonOwningWaker(); state_->waker = GetContext<Activity>()->MakeNonOwningWaker();
} }
return Pending{}; return Pending{};
} }
@ -465,7 +465,7 @@ void BasicMemoryQuota::Start() {
self->reclamation_counter_.fetch_add(1, std::memory_order_relaxed) + self->reclamation_counter_.fetch_add(1, std::memory_order_relaxed) +
1; 1;
reclaimer->Run(ReclamationSweep( reclaimer->Run(ReclamationSweep(
self, token, Activity::current()->MakeNonOwningWaker())); self, token, GetContext<Activity>()->MakeNonOwningWaker()));
// Return a promise that will wait for our barrier. This will be // Return a promise that will wait for our barrier. This will be
// awoken by the token above being destroyed. So, once that token is // awoken by the token above being destroyed. So, once that token is
// destroyed, we'll be able to proceed. // destroyed, we'll be able to proceed.

@ -314,7 +314,8 @@ grpc_oauth2_token_fetcher_credentials::GetRequestMetadata(
auto pending_request = auto pending_request =
grpc_core::MakeRefCounted<grpc_oauth2_pending_get_request_metadata>(); grpc_core::MakeRefCounted<grpc_oauth2_pending_get_request_metadata>();
pending_request->pollent = grpc_core::GetContext<grpc_polling_entity>(); pending_request->pollent = grpc_core::GetContext<grpc_polling_entity>();
pending_request->waker = grpc_core::Activity::current()->MakeNonOwningWaker(); pending_request->waker =
grpc_core::GetContext<grpc_core::Activity>()->MakeNonOwningWaker();
grpc_polling_entity_add_to_pollset_set( grpc_polling_entity_add_to_pollset_set(
pending_request->pollent, grpc_polling_entity_pollset_set(&pollent_)); pending_request->pollent, grpc_polling_entity_pollset_set(&pollent_));
pending_request->next = pending_requests_; pending_request->next = pending_requests_;

@ -103,7 +103,7 @@ struct grpc_plugin_credentials final : public grpc_call_credentials {
private: private:
std::atomic<bool> ready_{false}; std::atomic<bool> ready_{false};
grpc_core::Waker waker_{ grpc_core::Waker waker_{
grpc_core::Activity::current()->MakeNonOwningWaker()}; grpc_core::GetContext<grpc_core::Activity>()->MakeNonOwningWaker()};
grpc_core::RefCountedPtr<grpc_plugin_credentials> call_creds_; grpc_core::RefCountedPtr<grpc_plugin_credentials> call_creds_;
grpc_auth_metadata_context context_; grpc_auth_metadata_context context_;
grpc_core::ClientMetadataHandle md_; grpc_core::ClientMetadataHandle md_;

@ -124,7 +124,7 @@ class LegacyServerAuthFilter::RunApplicationCode {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"%s[server-auth]: Delegate to application: filter=%p this=%p " "%s[server-auth]: Delegate to application: filter=%p this=%p "
"auth_ctx=%p", "auth_ctx=%p",
Activity::current()->DebugTag().c_str(), filter, this, GetContext<Activity>()->DebugTag().c_str(), filter, this,
filter->auth_context_.get()); filter->auth_context_.get());
} }
filter->server_credentials_->auth_metadata_processor().process( filter->server_credentials_->auth_metadata_processor().process(
@ -152,7 +152,7 @@ class LegacyServerAuthFilter::RunApplicationCode {
private: private:
struct State { struct State {
explicit State(CallArgs call_args) : call_args(std::move(call_args)) {} explicit State(CallArgs call_args) : call_args(std::move(call_args)) {}
Waker waker{Activity::current()->MakeOwningWaker()}; Waker waker{GetContext<Activity>()->MakeOwningWaker()};
absl::StatusOr<CallArgs> call_args; absl::StatusOr<CallArgs> call_args;
grpc_metadata_array md = grpc_metadata_array md =
MetadataBatchToMetadataArray(call_args->client_initial_metadata.get()); MetadataBatchToMetadataArray(call_args->client_initial_metadata.get());

@ -123,7 +123,7 @@ grpc_metadata_array MetadataBatchToMetadataArray(
struct ServerAuthFilter::RunApplicationCode::State { struct ServerAuthFilter::RunApplicationCode::State {
explicit State(ClientMetadata& client_metadata) explicit State(ClientMetadata& client_metadata)
: client_metadata(&client_metadata) {} : client_metadata(&client_metadata) {}
Waker waker{Activity::current()->MakeOwningWaker()}; Waker waker{GetContext<Activity>()->MakeOwningWaker()};
absl::StatusOr<ClientMetadata*> client_metadata; absl::StatusOr<ClientMetadata*> client_metadata;
grpc_metadata_array md = MetadataBatchToMetadataArray(*client_metadata); grpc_metadata_array md = MetadataBatchToMetadataArray(*client_metadata);
std::atomic<bool> done{false}; std::atomic<bool> done{false};
@ -136,7 +136,7 @@ ServerAuthFilter::RunApplicationCode::RunApplicationCode(
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"%s[server-auth]: Delegate to application: filter=%p this=%p " "%s[server-auth]: Delegate to application: filter=%p this=%p "
"auth_ctx=%p", "auth_ctx=%p",
Activity::current()->DebugTag().c_str(), filter, this, GetContext<Activity>()->DebugTag().c_str(), filter, this,
filter->auth_context_.get()); filter->auth_context_.get());
} }
filter->server_credentials_->auth_metadata_processor().process( filter->server_credentials_->auth_metadata_processor().process(

@ -56,22 +56,22 @@ const grpc_channel_filter* PromiseTracingFilterFor(
gpr_log( gpr_log(
GPR_DEBUG, GPR_DEBUG,
"%s[%s] CreateCallPromise: client_initial_metadata=%s", "%s[%s] CreateCallPromise: client_initial_metadata=%s",
Activity::current()->DebugTag().c_str(), GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, source_filter->name,
call_args.client_initial_metadata->DebugString().c_str()); call_args.client_initial_metadata->DebugString().c_str());
return [source_filter, child = next_promise_factory( return [source_filter, child = next_promise_factory(
std::move(call_args))]() mutable { std::move(call_args))]() mutable {
gpr_log(GPR_DEBUG, "%s[%s] PollCallPromise: begin", gpr_log(GPR_DEBUG, "%s[%s] PollCallPromise: begin",
Activity::current()->DebugTag().c_str(), GetContext<Activity>()->DebugTag().c_str(),
source_filter->name); source_filter->name);
auto r = child(); auto r = child();
if (auto* p = r.value_if_ready()) { if (auto* p = r.value_if_ready()) {
gpr_log(GPR_DEBUG, "%s[%s] PollCallPromise: done: %s", gpr_log(GPR_DEBUG, "%s[%s] PollCallPromise: done: %s",
Activity::current()->DebugTag().c_str(), GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, (*p)->DebugString().c_str()); source_filter->name, (*p)->DebugString().c_str());
} else { } else {
gpr_log(GPR_DEBUG, "%s[%s] PollCallPromise: <<pending>>", gpr_log(GPR_DEBUG, "%s[%s] PollCallPromise: <<pending>>",
Activity::current()->DebugTag().c_str(), GetContext<Activity>()->DebugTag().c_str(),
source_filter->name); source_filter->name);
} }
return r; return r;
@ -84,35 +84,35 @@ const grpc_channel_filter* PromiseTracingFilterFor(
call->client_initial_metadata().receiver.InterceptAndMap( call->client_initial_metadata().receiver.InterceptAndMap(
[source_filter](ClientMetadataHandle md) { [source_filter](ClientMetadataHandle md) {
gpr_log(GPR_DEBUG, "%s[%s] OnClientInitialMetadata: %s", gpr_log(GPR_DEBUG, "%s[%s] OnClientInitialMetadata: %s",
Activity::current()->DebugTag().c_str(), GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, md->DebugString().c_str()); source_filter->name, md->DebugString().c_str());
return md; return md;
}); });
call->client_to_server_messages().receiver.InterceptAndMap( call->client_to_server_messages().receiver.InterceptAndMap(
[source_filter](MessageHandle msg) { [source_filter](MessageHandle msg) {
gpr_log(GPR_DEBUG, "%s[%s] OnClientToServerMessage: %s", gpr_log(GPR_DEBUG, "%s[%s] OnClientToServerMessage: %s",
Activity::current()->DebugTag().c_str(), GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, msg->DebugString().c_str()); source_filter->name, msg->DebugString().c_str());
return msg; return msg;
}); });
call->server_initial_metadata().sender.InterceptAndMap( call->server_initial_metadata().sender.InterceptAndMap(
[source_filter](ServerMetadataHandle md) { [source_filter](ServerMetadataHandle md) {
gpr_log(GPR_DEBUG, "%s[%s] OnServerInitialMetadata: %s", gpr_log(GPR_DEBUG, "%s[%s] OnServerInitialMetadata: %s",
Activity::current()->DebugTag().c_str(), GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, md->DebugString().c_str()); source_filter->name, md->DebugString().c_str());
return md; return md;
}); });
call->server_to_client_messages().sender.InterceptAndMap( call->server_to_client_messages().sender.InterceptAndMap(
[source_filter](MessageHandle msg) { [source_filter](MessageHandle msg) {
gpr_log(GPR_DEBUG, "%s[%s] OnServerToClientMessage: %s", gpr_log(GPR_DEBUG, "%s[%s] OnServerToClientMessage: %s",
Activity::current()->DebugTag().c_str(), GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, msg->DebugString().c_str()); source_filter->name, msg->DebugString().c_str());
return msg; return msg;
}); });
call->server_trailing_metadata().sender.InterceptAndMap( call->server_trailing_metadata().sender.InterceptAndMap(
[source_filter](ServerMetadataHandle md) { [source_filter](ServerMetadataHandle md) {
gpr_log(GPR_DEBUG, "%s[%s] OnServerTrailingMetadata: %s", gpr_log(GPR_DEBUG, "%s[%s] OnServerTrailingMetadata: %s",
Activity::current()->DebugTag().c_str(), GetContext<Activity>()->DebugTag().c_str(),
source_filter->name, md->DebugString().c_str()); source_filter->name, md->DebugString().c_str());
return md; return md;
}); });

@ -525,7 +525,7 @@ class Server::RealRequestMatcherPromises : public RequestMatcherInterface {
"Too many pending requests for this server")); "Too many pending requests for this server"));
} }
auto w = std::make_shared<ActivityWaiter>( auto w = std::make_shared<ActivityWaiter>(
Activity::current()->MakeOwningWaker()); GetContext<Activity>()->MakeOwningWaker());
pending_.push(w); pending_.push(w);
return OnCancel( return OnCancel(
[w]() -> Poll<absl::StatusOr<MatchResult>> { [w]() -> Poll<absl::StatusOr<MatchResult>> {

@ -41,7 +41,7 @@ class WaitForCqEndOp {
} else { } else {
auto not_started = std::move(*n); auto not_started = std::move(*n);
auto& started = auto& started =
state_.emplace<Started>(Activity::current()->MakeOwningWaker()); state_.emplace<Started>(GetContext<Activity>()->MakeOwningWaker());
grpc_cq_end_op( grpc_cq_end_op(
not_started.cq, not_started.tag, std::move(not_started.error), not_started.cq, not_started.tag, std::move(not_started.error),
[](void* p, grpc_cq_completion*) { [](void* p, grpc_cq_completion*) {

@ -55,8 +55,7 @@ BatchBuilder::PendingCompletion::PendingCompletion(RefCountedPtr<Batch> batch)
BatchBuilder::Batch::Batch(grpc_transport_stream_op_batch_payload* payload, BatchBuilder::Batch::Batch(grpc_transport_stream_op_batch_payload* payload,
grpc_stream_refcount* stream_refcount) grpc_stream_refcount* stream_refcount)
: party(static_cast<Party*>(Activity::current())->Ref()), : party(GetContext<Party>()->Ref()), stream_refcount(stream_refcount) {
stream_refcount(stream_refcount) {
batch.payload = payload; batch.payload = payload;
batch.is_traced = GetContext<CallContext>()->traced(); batch.is_traced = GetContext<CallContext>()->traced();
#ifndef NDEBUG #ifndef NDEBUG
@ -70,7 +69,7 @@ BatchBuilder::Batch::~Batch() {
auto* arena = party->arena(); auto* arena = party->arena();
if (grpc_call_trace.enabled()) { if (grpc_call_trace.enabled()) {
gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Destroy", gpr_log(GPR_DEBUG, "%s[connected] [batch %p] Destroy",
Activity::current()->DebugTag().c_str(), this); GetContext<Activity>()->DebugTag().c_str(), this);
} }
if (pending_receive_message != nullptr) { if (pending_receive_message != nullptr) {
arena->DeletePooled(pending_receive_message); arena->DeletePooled(pending_receive_message);

@ -202,7 +202,7 @@ class BatchBuilder {
~Batch(); ~Batch();
Batch(const Batch&) = delete; Batch(const Batch&) = delete;
Batch& operator=(const Batch&) = delete; Batch& operator=(const Batch&) = delete;
std::string DebugPrefix(Activity* activity = Activity::current()) const { std::string DebugPrefix(Activity* activity = GetContext<Activity>()) const {
return absl::StrFormat("%s[connected] [batch %p] ", activity->DebugTag(), return absl::StrFormat("%s[connected] [batch %p] ", activity->DebugTag(),
this); this);
} }

@ -80,7 +80,7 @@ class PromiseEndpoint {
data.c_slice_buffer()); data.c_slice_buffer());
// If `Write()` returns true immediately, the callback will not be called. // If `Write()` returns true immediately, the callback will not be called.
// We still need to call our callback to pick up the result. // We still need to call our callback to pick up the result.
write_state_->waker = Activity::current()->MakeNonOwningWaker(); write_state_->waker = GetContext<Activity>()->MakeNonOwningWaker();
completed = endpoint_->Write( completed = endpoint_->Write(
[write_state = write_state_](absl::Status status) { [write_state = write_state_](absl::Status status) {
ApplicationCallbackExecCtx callback_exec_ctx; ApplicationCallbackExecCtx callback_exec_ctx;
@ -124,7 +124,7 @@ class PromiseEndpoint {
static_cast<int64_t>(num_bytes - read_state_->buffer.Length())}; static_cast<int64_t>(num_bytes - read_state_->buffer.Length())};
// If `Read()` returns true immediately, the callback will not be // If `Read()` returns true immediately, the callback will not be
// called. // called.
read_state_->waker = Activity::current()->MakeNonOwningWaker(); read_state_->waker = GetContext<Activity>()->MakeNonOwningWaker();
if (endpoint_->Read( if (endpoint_->Read(
[read_state = read_state_, num_bytes](absl::Status status) { [read_state = read_state_, num_bytes](absl::Status status) {
ApplicationCallbackExecCtx callback_exec_ctx; ApplicationCallbackExecCtx callback_exec_ctx;

@ -203,7 +203,7 @@ class CallSpineInterface {
// and this construction supports that (and has helped the author not write // and this construction supports that (and has helped the author not write
// some bugs). // some bugs).
GRPC_MUST_USE_RESULT absl::nullopt_t Cancel(ServerMetadataHandle metadata) { GRPC_MUST_USE_RESULT absl::nullopt_t Cancel(ServerMetadataHandle metadata) {
GPR_DEBUG_ASSERT(Activity::current() == &party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
auto& c = cancel_latch(); auto& c = cancel_latch();
if (c.is_set()) return absl::nullopt; if (c.is_set()) return absl::nullopt;
c.Set(std::move(metadata)); c.Set(std::move(metadata));
@ -216,7 +216,7 @@ class CallSpineInterface {
} }
auto WaitForCancel() { auto WaitForCancel() {
GPR_DEBUG_ASSERT(Activity::current() == &party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
return cancel_latch().Wait(); return cancel_latch().Wait();
} }
@ -225,7 +225,7 @@ class CallSpineInterface {
// The resulting (returned) promise will resolve to Empty. // The resulting (returned) promise will resolve to Empty.
template <typename Promise> template <typename Promise>
auto CancelIfFails(Promise promise) { auto CancelIfFails(Promise promise) {
GPR_DEBUG_ASSERT(Activity::current() == &party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &party());
using P = promise_detail::PromiseLike<Promise>; using P = promise_detail::PromiseLike<Promise>;
using ResultType = typename P::Result; using ResultType = typename P::Result;
return Map(std::move(promise), [this](ResultType r) { return Map(std::move(promise), [this](ResultType r) {
@ -349,13 +349,13 @@ class CallInitiator {
: spine_(std::move(spine)) {} : spine_(std::move(spine)) {}
auto PushClientInitialMetadata(ClientMetadataHandle md) { auto PushClientInitialMetadata(ClientMetadataHandle md) {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(spine_->client_initial_metadata().sender.Push(std::move(md)), return Map(spine_->client_initial_metadata().sender.Push(std::move(md)),
[](bool ok) { return StatusFlag(ok); }); [](bool ok) { return StatusFlag(ok); });
} }
auto PullServerInitialMetadata() { auto PullServerInitialMetadata() {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(spine_->server_initial_metadata().receiver.Next(), return Map(spine_->server_initial_metadata().receiver.Next(),
[](NextResult<ClientMetadataHandle> md) [](NextResult<ClientMetadataHandle> md)
-> ValueOrFailure<ClientMetadataHandle> { -> ValueOrFailure<ClientMetadataHandle> {
@ -365,7 +365,7 @@ class CallInitiator {
} }
auto PullServerTrailingMetadata() { auto PullServerTrailingMetadata() {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Race(spine_->WaitForCancel(), return Race(spine_->WaitForCancel(),
Map(spine_->server_trailing_metadata().receiver.Next(), Map(spine_->server_trailing_metadata().receiver.Next(),
[spine = spine_](NextResult<ServerMetadataHandle> md) [spine = spine_](NextResult<ServerMetadataHandle> md)
@ -376,19 +376,19 @@ class CallInitiator {
} }
auto PullMessage() { auto PullMessage() {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return spine_->server_to_client_messages().receiver.Next(); return spine_->server_to_client_messages().receiver.Next();
} }
auto PushMessage(MessageHandle message) { auto PushMessage(MessageHandle message) {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map( return Map(
spine_->client_to_server_messages().sender.Push(std::move(message)), spine_->client_to_server_messages().sender.Push(std::move(message)),
[](bool r) { return StatusFlag(r); }); [](bool r) { return StatusFlag(r); });
} }
void FinishSends() { void FinishSends() {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
spine_->client_to_server_messages().sender.Close(); spine_->client_to_server_messages().sender.Close();
} }
@ -398,7 +398,7 @@ class CallInitiator {
} }
void Cancel() { void Cancel() {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
std::ignore = std::ignore =
spine_->Cancel(ServerMetadataFromStatus(absl::CancelledError())); spine_->Cancel(ServerMetadataFromStatus(absl::CancelledError()));
} }
@ -430,7 +430,7 @@ class CallHandler {
: spine_(std::move(spine)) {} : spine_(std::move(spine)) {}
auto PullClientInitialMetadata() { auto PullClientInitialMetadata() {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(spine_->client_initial_metadata().receiver.Next(), return Map(spine_->client_initial_metadata().receiver.Next(),
[](NextResult<ClientMetadataHandle> md) [](NextResult<ClientMetadataHandle> md)
-> ValueOrFailure<ClientMetadataHandle> { -> ValueOrFailure<ClientMetadataHandle> {
@ -440,13 +440,13 @@ class CallHandler {
} }
auto PushServerInitialMetadata(ServerMetadataHandle md) { auto PushServerInitialMetadata(ServerMetadataHandle md) {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(spine_->server_initial_metadata().sender.Push(std::move(md)), return Map(spine_->server_initial_metadata().sender.Push(std::move(md)),
[](bool ok) { return StatusFlag(ok); }); [](bool ok) { return StatusFlag(ok); });
} }
auto PushServerTrailingMetadata(ServerMetadataHandle md) { auto PushServerTrailingMetadata(ServerMetadataHandle md) {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
spine_->server_to_client_messages().sender.Close(); spine_->server_to_client_messages().sender.Close();
spine_->CallOnDone(); spine_->CallOnDone();
return Map(spine_->server_trailing_metadata().sender.Push(std::move(md)), return Map(spine_->server_trailing_metadata().sender.Push(std::move(md)),
@ -454,19 +454,19 @@ class CallHandler {
} }
auto PullMessage() { auto PullMessage() {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return spine_->client_to_server_messages().receiver.Next(); return spine_->client_to_server_messages().receiver.Next();
} }
auto PushMessage(MessageHandle message) { auto PushMessage(MessageHandle message) {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map( return Map(
spine_->server_to_client_messages().sender.Push(std::move(message)), spine_->server_to_client_messages().sender.Push(std::move(message)),
[](bool ok) { return StatusFlag(ok); }); [](bool ok) { return StatusFlag(ok); });
} }
void Cancel(ServerMetadataHandle status) { void Cancel(ServerMetadataHandle status) {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party()); GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
std::ignore = spine_->Cancel(std::move(status)); std::ignore = spine_->Cancel(std::move(status));
} }

@ -219,7 +219,7 @@ bool FilterTestBase::Call::Impl::StepOnce() {
events().ForwardedMessageServerToClient(call_, *p->value()); events().ForwardedMessageServerToClient(call_, *p->value());
} }
next_server_to_client_messages_.reset(); next_server_to_client_messages_.reset();
Activity::current()->ForceImmediateRepoll(); GetContext<Activity>()->ForceImmediateRepoll();
} }
} }
@ -229,7 +229,7 @@ bool FilterTestBase::Call::Impl::StepOnce() {
server_to_client_messages_sender_->Push( server_to_client_messages_sender_->Push(
std::move(forward_server_to_client_messages_.front()))); std::move(forward_server_to_client_messages_.front())));
forward_server_to_client_messages_.pop(); forward_server_to_client_messages_.pop();
Activity::current()->ForceImmediateRepoll(); GetContext<Activity>()->ForceImmediateRepoll();
} }
} }
@ -251,7 +251,7 @@ bool FilterTestBase::Call::Impl::StepOnce() {
events().ForwardedMessageClientToServer(call_, *p->value()); events().ForwardedMessageClientToServer(call_, *p->value());
} }
next_client_to_server_messages_.reset(); next_client_to_server_messages_.reset();
Activity::current()->ForceImmediateRepoll(); GetContext<Activity>()->ForceImmediateRepoll();
} }
} }
@ -261,7 +261,7 @@ bool FilterTestBase::Call::Impl::StepOnce() {
pipe_client_to_server_messages_.sender.Push( pipe_client_to_server_messages_.sender.Push(
std::move(forward_client_to_server_messages_.front()))); std::move(forward_client_to_server_messages_.front())));
forward_client_to_server_messages_.pop(); forward_client_to_server_messages_.pop();
Activity::current()->ForceImmediateRepoll(); GetContext<Activity>()->ForceImmediateRepoll();
} }
} }

@ -64,7 +64,7 @@ class DelayStartFilter final : public ChannelFilter {
[args = std::move(args), i = 10]() mutable -> Poll<CallArgs> { [args = std::move(args), i = 10]() mutable -> Poll<CallArgs> {
--i; --i;
if (i == 0) return std::move(args); if (i == 0) return std::move(args);
Activity::current()->ForceImmediateRepoll(); GetContext<Activity>()->ForceImmediateRepoll();
return Pending{}; return Pending{};
}, },
next); next);

@ -49,7 +49,7 @@ class Barrier {
if (cleared_) { if (cleared_) {
return Result{}; return Result{};
} else { } else {
return wait_set_.AddPending(Activity::current()->MakeOwningWaker()); return wait_set_.AddPending(GetContext<Activity>()->MakeOwningWaker());
} }
}; };
} }
@ -80,7 +80,7 @@ class SingleBarrier {
if (cleared_) { if (cleared_) {
return Result{}; return Result{};
} else { } else {
waker_ = Activity::current()->MakeOwningWaker(); waker_ = GetContext<Activity>()->MakeOwningWaker();
return Pending(); return Pending();
} }
}; };

@ -42,6 +42,45 @@ TEST(Context, WithContext) {
EXPECT_TRUE(test.done); EXPECT_TRUE(test.done);
} }
class BaseContext {
public:
virtual int Answer() = 0;
protected:
~BaseContext() = default;
};
class CorrectContext final : public BaseContext {
public:
int Answer() override { return 42; }
};
class IncorrectContext final : public BaseContext {
public:
int Answer() override { return 0; }
};
template <>
struct ContextType<BaseContext> {};
template <>
struct ContextSubclass<CorrectContext> {
using Base = BaseContext;
};
template <>
struct ContextSubclass<IncorrectContext> {
using Base = BaseContext;
};
TEST(Context, ContextSubclass) {
CorrectContext correct;
IncorrectContext incorrect;
EXPECT_EQ(42,
WithContext([]() { return GetContext<BaseContext>()->Answer(); },
&correct)());
EXPECT_EQ(0, WithContext([]() { return GetContext<BaseContext>()->Answer(); },
&incorrect)());
}
} // namespace grpc_core } // namespace grpc_core
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -138,7 +138,7 @@ class MoveableUntilPolled {
} }
Poll<absl::Status> operator()() { Poll<absl::Status> operator()() {
Activity::current()->ForceImmediateRepoll(); GetContext<Activity>()->ForceImmediateRepoll();
++polls_; ++polls_;
if (polls_ == 10) return absl::OkStatus(); if (polls_ == 10) return absl::OkStatus();
return Pending(); return Pending();

@ -47,7 +47,7 @@ class Delayed {
explicit Delayed(T x) : x_(x) {} explicit Delayed(T x) : x_(x) {}
Poll<T> operator()() { Poll<T> operator()() {
Activity::current()->ForceImmediateRepoll(); GetContext<Activity>()->ForceImmediateRepoll();
++polls_; ++polls_;
if (polls_ == 10) return std::move(x_); if (polls_ == 10) return std::move(x_);
return Pending(); return Pending();

@ -284,10 +284,10 @@ TEST_F(PartyTest, CanSpawnAndRun) {
party->Spawn( party->Spawn(
"TestSpawn", "TestSpawn",
[i = 10]() mutable -> Poll<int> { [i = 10]() mutable -> Poll<int> {
EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); EXPECT_EQ(GetContext<Activity>()->DebugTag(), "TestParty");
gpr_log(GPR_DEBUG, "i=%d", i); gpr_log(GPR_DEBUG, "i=%d", i);
GPR_ASSERT(i > 0); GPR_ASSERT(i > 0);
Activity::current()->ForceImmediateRepoll(); GetContext<Activity>()->ForceImmediateRepoll();
--i; --i;
if (i == 0) return 42; if (i == 0) return 42;
return Pending{}; return Pending{};
@ -331,12 +331,12 @@ TEST_F(PartyTest, CanSpawnFromSpawn) {
party->Spawn( party->Spawn(
"TestSpawn", "TestSpawn",
[party, &n2]() -> Poll<int> { [party, &n2]() -> Poll<int> {
EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); EXPECT_EQ(GetContext<Activity>()->DebugTag(), "TestParty");
party->Spawn( party->Spawn(
"TestSpawnInner", "TestSpawnInner",
[i = 10]() mutable -> Poll<int> { [i = 10]() mutable -> Poll<int> {
EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); EXPECT_EQ(GetContext<Activity>()->DebugTag(), "TestParty");
Activity::current()->ForceImmediateRepoll(); GetContext<Activity>()->ForceImmediateRepoll();
--i; --i;
if (i == 0) return 42; if (i == 0) return 42;
return Pending{}; return Pending{};
@ -363,8 +363,8 @@ TEST_F(PartyTest, CanWakeupWithOwningWaker) {
party->Spawn( party->Spawn(
"TestSpawn", "TestSpawn",
[i = 0, &waker, &n]() mutable -> Poll<int> { [i = 0, &waker, &n]() mutable -> Poll<int> {
EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); EXPECT_EQ(GetContext<Activity>()->DebugTag(), "TestParty");
waker = Activity::current()->MakeOwningWaker(); waker = GetContext<Activity>()->MakeOwningWaker();
n[i].Notify(); n[i].Notify();
i++; i++;
if (i == 10) return 42; if (i == 10) return 42;
@ -389,8 +389,8 @@ TEST_F(PartyTest, CanWakeupWithNonOwningWaker) {
party->Spawn( party->Spawn(
"TestSpawn", "TestSpawn",
[i = 10, &waker, &n]() mutable -> Poll<int> { [i = 10, &waker, &n]() mutable -> Poll<int> {
EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); EXPECT_EQ(GetContext<Activity>()->DebugTag(), "TestParty");
waker = Activity::current()->MakeNonOwningWaker(); waker = GetContext<Activity>()->MakeNonOwningWaker();
--i; --i;
n[9 - i].Notify(); n[9 - i].Notify();
if (i == 0) return 42; if (i == 0) return 42;
@ -416,8 +416,8 @@ TEST_F(PartyTest, CanWakeupWithNonOwningWakerAfterOrphaning) {
"TestSpawn", "TestSpawn",
[&waker, &set_waker]() mutable -> Poll<int> { [&waker, &set_waker]() mutable -> Poll<int> {
EXPECT_FALSE(set_waker.HasBeenNotified()); EXPECT_FALSE(set_waker.HasBeenNotified());
EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); EXPECT_EQ(GetContext<Activity>()->DebugTag(), "TestParty");
waker = Activity::current()->MakeNonOwningWaker(); waker = GetContext<Activity>()->MakeNonOwningWaker();
set_waker.Notify(); set_waker.Notify();
return Pending{}; return Pending{};
}, },
@ -437,9 +437,9 @@ TEST_F(PartyTest, CanDropNonOwningWakeAfterOrphaning) {
"TestSpawn", "TestSpawn",
[&waker, &set_waker]() mutable -> Poll<int> { [&waker, &set_waker]() mutable -> Poll<int> {
EXPECT_FALSE(set_waker.HasBeenNotified()); EXPECT_FALSE(set_waker.HasBeenNotified());
EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); EXPECT_EQ(GetContext<Activity>()->DebugTag(), "TestParty");
waker = waker = std::make_unique<Waker>(
std::make_unique<Waker>(Activity::current()->MakeNonOwningWaker()); GetContext<Activity>()->MakeNonOwningWaker());
set_waker.Notify(); set_waker.Notify();
return Pending{}; return Pending{};
}, },
@ -458,8 +458,8 @@ TEST_F(PartyTest, CanWakeupNonOwningOrphanedWakerWithNoEffect) {
"TestSpawn", "TestSpawn",
[&waker, &set_waker]() mutable -> Poll<int> { [&waker, &set_waker]() mutable -> Poll<int> {
EXPECT_FALSE(set_waker.HasBeenNotified()); EXPECT_FALSE(set_waker.HasBeenNotified());
EXPECT_EQ(Activity::current()->DebugTag(), "TestParty"); EXPECT_EQ(GetContext<Activity>()->DebugTag(), "TestParty");
waker = Activity::current()->MakeNonOwningWaker(); waker = GetContext<Activity>()->MakeNonOwningWaker();
set_waker.Notify(); set_waker.Notify();
return Pending{}; return Pending{};
}, },
@ -574,9 +574,9 @@ class PromiseNotification {
if (done_) return 42; if (done_) return 42;
if (!polled_) { if (!polled_) {
if (owning_waker_) { if (owning_waker_) {
waker_ = Activity::current()->MakeOwningWaker(); waker_ = GetContext<Activity>()->MakeOwningWaker();
} else { } else {
waker_ = Activity::current()->MakeNonOwningWaker(); waker_ = GetContext<Activity>()->MakeNonOwningWaker();
} }
polled_ = true; polled_ = true;
} }

@ -304,10 +304,10 @@ class Fuzzer {
if (!called) { if (!called) {
if (config.owning()) { if (config.owning()) {
wakers_[config.waker()].push_back( wakers_[config.waker()].push_back(
Activity::current()->MakeOwningWaker()); GetContext<Activity>()->MakeOwningWaker());
} else { } else {
wakers_[config.waker()].push_back( wakers_[config.waker()].push_back(
Activity::current()->MakeNonOwningWaker()); GetContext<Activity>()->MakeNonOwningWaker());
} }
return Pending(); return Pending();
} }

@ -2413,6 +2413,7 @@ src/core/lib/gprpp/crash.cc \
src/core/lib/gprpp/crash.h \ src/core/lib/gprpp/crash.h \
src/core/lib/gprpp/debug_location.h \ src/core/lib/gprpp/debug_location.h \
src/core/lib/gprpp/directory_reader.h \ src/core/lib/gprpp/directory_reader.h \
src/core/lib/gprpp/down_cast.h \
src/core/lib/gprpp/dual_ref_counted.h \ src/core/lib/gprpp/dual_ref_counted.h \
src/core/lib/gprpp/env.h \ src/core/lib/gprpp/env.h \
src/core/lib/gprpp/examine_stack.cc \ src/core/lib/gprpp/examine_stack.cc \

@ -2191,6 +2191,7 @@ src/core/lib/gprpp/crash.cc \
src/core/lib/gprpp/crash.h \ src/core/lib/gprpp/crash.h \
src/core/lib/gprpp/debug_location.h \ src/core/lib/gprpp/debug_location.h \
src/core/lib/gprpp/directory_reader.h \ src/core/lib/gprpp/directory_reader.h \
src/core/lib/gprpp/down_cast.h \
src/core/lib/gprpp/dual_ref_counted.h \ src/core/lib/gprpp/dual_ref_counted.h \
src/core/lib/gprpp/env.h \ src/core/lib/gprpp/env.h \
src/core/lib/gprpp/examine_stack.cc \ src/core/lib/gprpp/examine_stack.cc \

Loading…
Cancel
Save