Use AnyInvocable in EventEngine APIs (#30220)

revert-30252-ARGUE
AJ Heller 2 years ago committed by GitHub
parent f4c162f30d
commit 1076a7d447
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      BUILD
  2. 24
      CMakeLists.txt
  3. 15
      build_autogenerated.yaml
  4. 1
      gRPC-C++.podspec
  5. 1
      gRPC-Core.podspec
  6. 2
      grpc.gemspec
  7. 2
      grpc.gyp
  8. 35
      include/grpc/event_engine/event_engine.h
  9. 2
      package.xml
  10. 10
      src/core/lib/event_engine/event_engine.cc
  11. 13
      src/core/lib/event_engine/iomgr_engine/iomgr_engine.cc
  12. 19
      src/core/lib/event_engine/iomgr_engine/iomgr_engine.h
  13. 8
      src/core/lib/event_engine/iomgr_engine/thread_pool.cc
  14. 7
      src/core/lib/event_engine/iomgr_engine/thread_pool.h
  15. 10
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
  16. 13
      test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
  17. 15
      test/core/event_engine/test_suite/oracle_event_engine_posix.cc
  18. 24
      test/core/event_engine/test_suite/oracle_event_engine_posix.h
  19. 2
      tools/distrib/fix_build_deps.py

@ -2241,6 +2241,7 @@ grpc_cc_library(
"absl/status",
"absl/status:statusor",
"absl/time",
"absl/functional:any_invocable",
],
deps = [
"gpr_base",
@ -2313,6 +2314,7 @@ grpc_cc_library(
hdrs = [
"src/core/lib/event_engine/iomgr_engine/thread_pool.h",
],
external_deps = ["absl/functional:any_invocable"],
tags = ["grpc-autodeps"],
deps = ["gpr_base"],
)
@ -2346,6 +2348,7 @@ grpc_cc_library(
external_deps = [
"absl/base:core_headers",
"absl/container:flat_hash_set",
"absl/functional:any_invocable",
"absl/status",
"absl/status:statusor",
"absl/strings",
@ -2408,6 +2411,10 @@ grpc_cc_library(
srcs = [
"src/core/lib/event_engine/event_engine.cc",
],
external_deps = [
"absl/functional:any_invocable",
"absl/memory",
],
deps = [
"default_event_engine_factory",
"default_event_engine_factory_hdrs",
@ -2730,7 +2737,9 @@ grpc_cc_library(
"absl/base:core_headers",
"absl/container:flat_hash_map",
"absl/container:inlined_vector",
"absl/functional:any_invocable",
"absl/functional:bind_front",
"absl/functional:function_ref",
"absl/memory",
"absl/meta:type_traits",
"absl/status:statusor",

24
CMakeLists.txt generated

@ -101,6 +101,7 @@ set_property(CACHE gRPC_ABSL_PROVIDER PROPERTY STRINGS "module" "package")
set(gRPC_ABSL_USED_TARGETS
absl_algorithm
absl_algorithm_container
absl_any_invocable
absl_atomic_hook
absl_bad_optional_access
absl_bad_variant_access
@ -2348,6 +2349,7 @@ target_link_libraries(grpc
absl::flat_hash_map
absl::flat_hash_set
absl::inlined_vector
absl::any_invocable
absl::bind_front
absl::function_ref
absl::hash
@ -2913,6 +2915,7 @@ target_link_libraries(grpc_unsecure
absl::flat_hash_map
absl::flat_hash_set
absl::inlined_vector
absl::any_invocable
absl::bind_front
absl::function_ref
absl::hash
@ -6760,6 +6763,7 @@ target_include_directories(arena_promise_test
target_link_libraries(arena_promise_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::type_traits
absl::statusor
absl::variant
@ -8212,6 +8216,7 @@ target_include_directories(chunked_vector_test
target_link_libraries(chunked_vector_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::type_traits
absl::statusor
absl::variant
@ -9819,6 +9824,7 @@ target_include_directories(exec_ctx_wakeup_scheduler_test
target_link_libraries(exec_ctx_wakeup_scheduler_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::type_traits
absl::statusor
absl::variant
@ -10188,6 +10194,7 @@ target_include_directories(flow_control_test
target_link_libraries(flow_control_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::function_ref
absl::type_traits
absl::statusor
@ -10251,6 +10258,7 @@ target_link_libraries(for_each_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
absl::any_invocable
absl::hash
absl::type_traits
absl::statusor
@ -12908,6 +12916,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
target_link_libraries(memory_quota_stress_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::type_traits
absl::statusor
absl::variant
@ -12967,6 +12976,7 @@ target_include_directories(memory_quota_test
target_link_libraries(memory_quota_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::type_traits
absl::statusor
absl::variant
@ -13730,6 +13740,7 @@ target_include_directories(periodic_update_test
target_link_libraries(periodic_update_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::statusor
gpr
upb
@ -13823,6 +13834,7 @@ target_include_directories(pipe_test
target_link_libraries(pipe_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::type_traits
absl::statusor
absl::variant
@ -14679,6 +14691,7 @@ target_include_directories(resource_quota_test
target_link_libraries(resource_quota_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::any_invocable
absl::type_traits
absl::statusor
absl::variant
@ -16676,6 +16689,7 @@ target_link_libraries(test_core_event_engine_iomgr_event_engine_timer_heap_test
${_gRPC_ALLTARGETS_LIBRARIES}
absl::base
absl::core_headers
absl::any_invocable
absl::memory
absl::random_random
absl::status
@ -16769,6 +16783,7 @@ target_link_libraries(test_core_event_engine_iomgr_event_engine_timer_list_test
${_gRPC_ALLTARGETS_LIBRARIES}
absl::base
absl::core_headers
absl::any_invocable
absl::memory
absl::random_random
absl::status
@ -16894,6 +16909,7 @@ target_link_libraries(test_core_gprpp_time_test
${_gRPC_ALLTARGETS_LIBRARIES}
absl::base
absl::core_headers
absl::any_invocable
absl::memory
absl::random_random
absl::status
@ -20707,7 +20723,7 @@ generate_pkgconfig(
"gRPC"
"high performance general RPC framework"
"${gRPC_CORE_VERSION}"
"gpr openssl absl_base absl_bind_front absl_cord absl_core_headers absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant"
"gpr openssl absl_any_invocable absl_base absl_bind_front absl_cord absl_core_headers absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant"
"-lgrpc -laddress_sorting -lre2 -lupb -lcares -lz"
""
"grpc.pc")
@ -20717,7 +20733,7 @@ generate_pkgconfig(
"gRPC unsecure"
"high performance general RPC framework without SSL"
"${gRPC_CORE_VERSION}"
"gpr absl_base absl_bind_front absl_cord absl_core_headers absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant"
"gpr absl_any_invocable absl_base absl_bind_front absl_cord absl_core_headers absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant"
"-lgrpc_unsecure"
""
"grpc_unsecure.pc")
@ -20727,7 +20743,7 @@ generate_pkgconfig(
"gRPC++"
"C++ wrapper for gRPC"
"${gRPC_CPP_VERSION}"
"grpc absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant"
"grpc absl_any_invocable absl_base absl_bind_front absl_cleanup absl_cord absl_core_headers absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant"
"-lgrpc++"
""
"grpc++.pc")
@ -20737,7 +20753,7 @@ generate_pkgconfig(
"gRPC++ unsecure"
"C++ wrapper for gRPC without SSL"
"${gRPC_CPP_VERSION}"
"grpc_unsecure absl_base absl_bind_front absl_cord absl_core_headers absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant"
"grpc_unsecure absl_any_invocable absl_base absl_bind_front absl_cord absl_core_headers absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_inlined_vector absl_memory absl_optional absl_random_random absl_span absl_status absl_statusor absl_str_format absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant"
"-lgrpc++_unsecure"
""
"grpc++_unsecure.pc")

@ -1671,6 +1671,7 @@ libs:
- absl/container:flat_hash_map
- absl/container:flat_hash_set
- absl/container:inlined_vector
- absl/functional:any_invocable
- absl/functional:bind_front
- absl/functional:function_ref
- absl/hash:hash
@ -2470,6 +2471,7 @@ libs:
- absl/container:flat_hash_map
- absl/container:flat_hash_set
- absl/container:inlined_vector
- absl/functional:any_invocable
- absl/functional:bind_front
- absl/functional:function_ref
- absl/hash:hash
@ -4234,6 +4236,7 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/promise/arena_promise_test.cc
deps:
- absl/functional:any_invocable
- absl/meta:type_traits
- absl/status:statusor
- absl/types:variant
@ -4812,6 +4815,7 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/gprpp/chunked_vector_test.cc
deps:
- absl/functional:any_invocable
- absl/meta:type_traits
- absl/status:statusor
- absl/types:variant
@ -5430,6 +5434,7 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/promise/exec_ctx_wakeup_scheduler_test.cc
deps:
- absl/functional:any_invocable
- absl/meta:type_traits
- absl/status:statusor
- absl/types:variant
@ -5667,6 +5672,7 @@ targets:
- src/core/lib/transport/pid_controller.cc
- test/core/transport/chttp2/flow_control_test.cc
deps:
- absl/functional:any_invocable
- absl/functional:function_ref
- absl/meta:type_traits
- absl/status:statusor
@ -5755,6 +5761,7 @@ targets:
- test/core/promise/for_each_test.cc
deps:
- absl/container:flat_hash_set
- absl/functional:any_invocable
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
@ -6744,6 +6751,7 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/resource_quota/memory_quota_stress_test.cc
deps:
- absl/functional:any_invocable
- absl/meta:type_traits
- absl/status:statusor
- absl/types:variant
@ -6819,6 +6827,7 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/resource_quota/memory_quota_test.cc
deps:
- absl/functional:any_invocable
- absl/meta:type_traits
- absl/status:statusor
- absl/types:variant
@ -7163,6 +7172,7 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/resource_quota/periodic_update_test.cc
deps:
- absl/functional:any_invocable
- absl/status:statusor
- gpr
- upb
@ -7253,6 +7263,7 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/promise/pipe_test.cc
deps:
- absl/functional:any_invocable
- absl/meta:type_traits
- absl/status:statusor
- absl/types:variant
@ -7623,6 +7634,7 @@ targets:
- src/core/lib/slice/slice_string_helpers.cc
- test/core/resource_quota/resource_quota_test.cc
deps:
- absl/functional:any_invocable
- absl/meta:type_traits
- absl/status:statusor
- absl/types:variant
@ -8434,6 +8446,7 @@ targets:
deps:
- absl/base:base
- absl/base:core_headers
- absl/functional:any_invocable
- absl/memory:memory
- absl/random:random
- absl/status:status
@ -8533,6 +8546,7 @@ targets:
deps:
- absl/base:base
- absl/base:core_headers
- absl/functional:any_invocable
- absl/memory:memory
- absl/random:random
- absl/status:status
@ -8635,6 +8649,7 @@ targets:
deps:
- absl/base:base
- absl/base:core_headers
- absl/functional:any_invocable
- absl/memory:memory
- absl/random:random
- absl/status:status

1
gRPC-C++.podspec generated

@ -202,6 +202,7 @@ Pod::Spec.new do |s|
ss.dependency 'abseil/container/flat_hash_map', abseil_version
ss.dependency 'abseil/container/flat_hash_set', abseil_version
ss.dependency 'abseil/container/inlined_vector', abseil_version
ss.dependency 'abseil/functional/any_invocable', abseil_version
ss.dependency 'abseil/functional/bind_front', abseil_version
ss.dependency 'abseil/functional/function_ref', abseil_version
ss.dependency 'abseil/hash/hash', abseil_version

1
gRPC-Core.podspec generated

@ -177,6 +177,7 @@ Pod::Spec.new do |s|
ss.dependency 'abseil/container/flat_hash_map', abseil_version
ss.dependency 'abseil/container/flat_hash_set', abseil_version
ss.dependency 'abseil/container/inlined_vector', abseil_version
ss.dependency 'abseil/functional/any_invocable', abseil_version
ss.dependency 'abseil/functional/bind_front', abseil_version
ss.dependency 'abseil/functional/function_ref', abseil_version
ss.dependency 'abseil/hash/hash', abseil_version

2
grpc.gemspec generated

@ -1622,8 +1622,10 @@ Gem::Specification.new do |s|
s.files += %w( third_party/abseil-cpp/absl/debugging/symbolize_emscripten.inc )
s.files += %w( third_party/abseil-cpp/absl/debugging/symbolize_unimplemented.inc )
s.files += %w( third_party/abseil-cpp/absl/debugging/symbolize_win32.inc )
s.files += %w( third_party/abseil-cpp/absl/functional/any_invocable.h )
s.files += %w( third_party/abseil-cpp/absl/functional/bind_front.h )
s.files += %w( third_party/abseil-cpp/absl/functional/function_ref.h )
s.files += %w( third_party/abseil-cpp/absl/functional/internal/any_invocable.h )
s.files += %w( third_party/abseil-cpp/absl/functional/internal/front_binder.h )
s.files += %w( third_party/abseil-cpp/absl/functional/internal/function_ref.h )
s.files += %w( third_party/abseil-cpp/absl/hash/hash.h )

2
grpc.gyp generated

@ -359,6 +359,7 @@
'absl/container:flat_hash_map',
'absl/container:flat_hash_set',
'absl/container:inlined_vector',
'absl/functional:any_invocable',
'absl/functional:bind_front',
'absl/functional:function_ref',
'absl/hash:hash',
@ -1116,6 +1117,7 @@
'absl/container:flat_hash_map',
'absl/container:flat_hash_set',
'absl/container:inlined_vector',
'absl/functional:any_invocable',
'absl/functional:bind_front',
'absl/functional:function_ref',
'absl/hash:hash',

@ -19,6 +19,7 @@
#include <functional>
#include <vector>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
@ -172,7 +173,7 @@ class EventEngine {
/// For failed read operations, implementations should pass the appropriate
/// statuses to \a on_read. For example, callbacks might expect to receive
/// CANCELLED on endpoint shutdown.
virtual void Read(std::function<void(absl::Status)> on_read,
virtual void Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer, const ReadArgs* args) = 0;
/// A struct representing optional arguments that may be provided to an
/// EventEngine Endpoint Write API call.
@ -206,7 +207,7 @@ class EventEngine {
/// For failed write operations, implementations should pass the appropriate
/// statuses to \a on_writable. For example, callbacks might expect to
/// receive CANCELLED on endpoint shutdown.
virtual void Write(std::function<void(absl::Status)> on_writable,
virtual void Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* args) = 0;
/// Returns an address in the format described in DNSResolver. The returned
/// values are expected to remain valid for the life of the Endpoint.
@ -221,14 +222,14 @@ class EventEngine {
/// expect to receive DEADLINE_EXCEEDED statuses when appropriate, or
/// CANCELLED statuses on EventEngine shutdown.
using OnConnectCallback =
std::function<void(absl::StatusOr<std::unique_ptr<Endpoint>>)>;
absl::AnyInvocable<void(absl::StatusOr<std::unique_ptr<Endpoint>>)>;
/// Listens for incoming connection requests from gRPC clients and initiates
/// request processing once connections are established.
class Listener {
public:
/// Called when the listener has accepted a new client connection.
using AcceptCallback = std::function<void(
using AcceptCallback = absl::AnyInvocable<void(
std::unique_ptr<Endpoint>, MemoryAllocator memory_allocator)>;
virtual ~Listener() = default;
/// Bind an address/port to this Listener.
@ -256,7 +257,7 @@ class EventEngine {
/// MemoryAllocators for Endpoint construction.
virtual absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept,
std::function<void(absl::Status)> on_shutdown,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) = 0;
/// Creates a client network connection to a remote network listener.
@ -308,12 +309,13 @@ class EventEngine {
/// Called with the collection of sockaddrs that were resolved from a given
/// target address.
using LookupHostnameCallback =
std::function<void(absl::StatusOr<std::vector<ResolvedAddress>>)>;
absl::AnyInvocable<void(absl::StatusOr<std::vector<ResolvedAddress>>)>;
/// Called with a collection of SRV records.
using LookupSRVCallback =
std::function<void(absl::StatusOr<std::vector<SRVRecord>>)>;
absl::AnyInvocable<void(absl::StatusOr<std::vector<SRVRecord>>)>;
/// Called with the result of a TXT record lookup
using LookupTXTCallback = std::function<void(absl::StatusOr<std::string>)>;
using LookupTXTCallback =
absl::AnyInvocable<void(absl::StatusOr<std::string>)>;
virtual ~DNSResolver() = default;
@ -381,13 +383,13 @@ class EventEngine {
/// Asynchronously executes a task as soon as possible.
///
/// \a Closures scheduled with \a Run cannot be cancelled. Unlike the
/// overloaded \a Closure alternative, the std::function version's \a closure
/// will be deleted by the EventEngine after the closure has been run.
/// overloaded \a Closure alternative, the absl::AnyInvocable version's \a
/// closure will be deleted by the EventEngine after the closure has been run.
///
/// This version of \a Run may be less performant than the \a Closure version
/// in some scenarios. This overload is useful in situations where performance
/// is not a critical concern.
virtual void Run(std::function<void()> closure) = 0;
virtual void Run(absl::AnyInvocable<void()> closure) = 0;
/// Synonymous with scheduling an alarm to run after duration \a when.
///
/// The \a closure will execute when time \a when arrives unless it has been
@ -398,22 +400,23 @@ class EventEngine {
///
/// The \a closure will execute when time \a when arrives unless it has been
/// cancelled via the \a Cancel method. If cancelled, the closure will not be
/// run. Unilke the overloaded \a Closure alternative, the std::function
/// run. Unilke the overloaded \a Closure alternative, the absl::AnyInvocable
/// version's \a closure will be deleted by the EventEngine after the closure
/// has been run, or upon cancellation.
///
/// This version of \a RunAfter may be less performant than the \a Closure
/// version in some scenarios. This overload is useful in situations where
/// performance is not a critical concern.
virtual TaskHandle RunAfter(Duration when, std::function<void()> closure) = 0;
virtual TaskHandle RunAfter(Duration when,
absl::AnyInvocable<void()> closure) = 0;
/// Request cancellation of a task.
///
/// If the associated closure has already been scheduled to run, it will not
/// be cancelled, and this function will return false.
///
/// If the associated callback has not been scheduled to run, it will be
/// cancelled, and the associated std::function or \a Closure* will not be
/// executed. In this case, Cancel will return true.
/// cancelled, and the associated absl::AnyInvocable or \a Closure* will not
/// be executed. In this case, Cancel will return true.
///
/// Implementation note: closures should be destroyed in a timely manner after
/// execution or cancelliation (milliseconds), since any state bound to the
@ -432,7 +435,7 @@ class EventEngine {
/// created, applications must set a custom EventEngine factory method *before*
/// grpc is initialized.
void SetDefaultEventEngineFactory(
std::function<std::unique_ptr<EventEngine>()> factory);
absl::AnyInvocable<std::unique_ptr<EventEngine>()> factory);
/// Create an EventEngine using the default factory.
std::unique_ptr<EventEngine> CreateEventEngine();

2
package.xml generated

@ -1626,8 +1626,10 @@
<file baseinstalldir="/" name="third_party/abseil-cpp/absl/debugging/symbolize_emscripten.inc" role="src" />
<file baseinstalldir="/" name="third_party/abseil-cpp/absl/debugging/symbolize_unimplemented.inc" role="src" />
<file baseinstalldir="/" name="third_party/abseil-cpp/absl/debugging/symbolize_win32.inc" role="src" />
<file baseinstalldir="/" name="third_party/abseil-cpp/absl/functional/any_invocable.h" role="src" />
<file baseinstalldir="/" name="third_party/abseil-cpp/absl/functional/bind_front.h" role="src" />
<file baseinstalldir="/" name="third_party/abseil-cpp/absl/functional/function_ref.h" role="src" />
<file baseinstalldir="/" name="third_party/abseil-cpp/absl/functional/internal/any_invocable.h" role="src" />
<file baseinstalldir="/" name="third_party/abseil-cpp/absl/functional/internal/front_binder.h" role="src" />
<file baseinstalldir="/" name="third_party/abseil-cpp/absl/functional/internal/function_ref.h" role="src" />
<file baseinstalldir="/" name="third_party/abseil-cpp/absl/hash/hash.h" role="src" />

@ -14,10 +14,11 @@
#include <grpc/support/port_platform.h>
#include <atomic>
#include <functional>
#include <memory>
#include <utility>
#include "absl/functional/any_invocable.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/event_engine_factory.h"
@ -26,15 +27,16 @@ namespace grpc_event_engine {
namespace experimental {
namespace {
std::atomic<const std::function<std::unique_ptr<EventEngine>()>*>
std::atomic<absl::AnyInvocable<std::unique_ptr<EventEngine>()>*>
g_event_engine_factory{nullptr};
std::atomic<EventEngine*> g_event_engine{nullptr};
} // namespace
void SetDefaultEventEngineFactory(
std::function<std::unique_ptr<EventEngine>()> factory) {
absl::AnyInvocable<std::unique_ptr<EventEngine>()> factory) {
delete g_event_engine_factory.exchange(
new std::function<std::unique_ptr<EventEngine>()>(std::move(factory)));
new absl::AnyInvocable<std::unique_ptr<EventEngine>()>(
std::move(factory)));
}
std::unique_ptr<EventEngine> CreateEventEngine() {

@ -20,6 +20,7 @@
#include <utility>
#include "absl/container/flat_hash_set.h"
#include "absl/functional/any_invocable.h"
#include "absl/strings/str_cat.h"
#include <grpc/event_engine/event_engine.h>
@ -49,7 +50,7 @@ grpc_core::Timestamp IomgrEventEngine::ToTimestamp(EventEngine::Duration when) {
}
struct IomgrEventEngine::ClosureData final : public EventEngine::Closure {
std::function<void()> cb;
absl::AnyInvocable<void()> cb;
iomgr_engine::Timer timer;
IomgrEventEngine* engine;
EventEngine::TaskHandle handle;
@ -92,7 +93,7 @@ bool IomgrEventEngine::Cancel(EventEngine::TaskHandle handle) {
}
EventEngine::TaskHandle IomgrEventEngine::RunAfter(
Duration when, std::function<void()> closure) {
Duration when, absl::AnyInvocable<void()> closure) {
return RunAfterInternal(when, std::move(closure));
}
@ -101,8 +102,8 @@ EventEngine::TaskHandle IomgrEventEngine::RunAfter(
return RunAfterInternal(when, [closure]() { closure->Run(); });
}
void IomgrEventEngine::Run(std::function<void()> closure) {
thread_pool_.Add(closure);
void IomgrEventEngine::Run(absl::AnyInvocable<void()> closure) {
thread_pool_.Add(std::move(closure));
}
void IomgrEventEngine::Run(EventEngine::Closure* closure) {
@ -110,7 +111,7 @@ void IomgrEventEngine::Run(EventEngine::Closure* closure) {
}
EventEngine::TaskHandle IomgrEventEngine::RunAfterInternal(
Duration when, std::function<void()> cb) {
Duration when, absl::AnyInvocable<void()> cb) {
auto when_ts = ToTimestamp(when);
auto* cd = new ClosureData;
cd->cb = std::move(cb);
@ -149,7 +150,7 @@ EventEngine::ConnectionHandle IomgrEventEngine::Connect(
absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
IomgrEventEngine::CreateListener(
Listener::AcceptCallback /*on_accept*/,
std::function<void(absl::Status)> /*on_shutdown*/,
absl::AnyInvocable<void(absl::Status)> /*on_shutdown*/,
const EndpointConfig& /*config*/,
std::unique_ptr<MemoryAllocatorFactory> /*memory_allocator_factory*/) {
GPR_ASSERT(false && "unimplemented");

@ -18,10 +18,10 @@
#include <stdint.h>
#include <atomic>
#include <functional>
#include <memory>
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
@ -47,10 +47,10 @@ class IomgrEventEngine final : public EventEngine {
class IomgrEndpoint : public EventEngine::Endpoint {
public:
~IomgrEndpoint() override;
void Read(std::function<void(absl::Status)> on_read, SliceBuffer* buffer,
const ReadArgs* args) override;
void Write(std::function<void(absl::Status)> on_writable, SliceBuffer* data,
const WriteArgs* args) override;
void Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer, const ReadArgs* args) override;
void Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* args) override;
const ResolvedAddress& GetPeerAddress() const override;
const ResolvedAddress& GetLocalAddress() const override;
};
@ -81,7 +81,7 @@ class IomgrEventEngine final : public EventEngine {
absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept,
std::function<void(absl::Status)> on_shutdown,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
override;
@ -97,15 +97,16 @@ class IomgrEventEngine final : public EventEngine {
std::unique_ptr<DNSResolver> GetDNSResolver(
const DNSResolver::ResolverOptions& options) override;
void Run(Closure* closure) override;
void Run(std::function<void()> closure) override;
void Run(absl::AnyInvocable<void()> closure) override;
TaskHandle RunAfter(Duration when, Closure* closure) override;
TaskHandle RunAfter(Duration when, std::function<void()> closure) override;
TaskHandle RunAfter(Duration when,
absl::AnyInvocable<void()> closure) override;
bool Cancel(TaskHandle handle) override;
private:
struct ClosureData;
EventEngine::TaskHandle RunAfterInternal(Duration when,
std::function<void()> cb);
absl::AnyInvocable<void()> cb);
grpc_core::Timestamp ToTimestamp(EventEngine::Duration when);
iomgr_engine::TimerManager timer_manager_;

@ -20,6 +20,8 @@
#include "src/core/lib/event_engine/iomgr_engine/thread_pool.h"
#include <utility>
#include "src/core/lib/gprpp/thd.h"
namespace grpc_event_engine {
@ -64,7 +66,7 @@ void ThreadPool::ThreadFunc() {
// Drain callbacks before considering shutdown to ensure all work
// gets completed.
if (!callbacks_.empty()) {
auto cb = callbacks_.front();
auto cb = std::move(callbacks_.front());
callbacks_.pop();
lock.Release();
cb();
@ -101,10 +103,10 @@ ThreadPool::~ThreadPool() {
ReapThreads(&dead_threads_);
}
void ThreadPool::Add(const std::function<void()>& callback) {
void ThreadPool::Add(absl::AnyInvocable<void()> callback) {
grpc_core::MutexLock lock(&mu_);
// Add works to the callbacks list
callbacks_.push(callback);
callbacks_.push(std::move(callback));
// Increase pool size or notify as needed
if (threads_waiting_ == 0) {
// Kick off a new thread

@ -21,10 +21,11 @@
#include <grpc/support/port_platform.h>
#include <functional>
#include <queue>
#include <vector>
#include "absl/functional/any_invocable.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
@ -36,7 +37,7 @@ class ThreadPool final {
explicit ThreadPool(int reserve_threads);
~ThreadPool();
void Add(const std::function<void()>& callback);
void Add(absl::AnyInvocable<void()> callback);
private:
class Thread {
@ -57,7 +58,7 @@ class ThreadPool final {
grpc_core::CondVar cv_;
grpc_core::CondVar shutdown_cv_;
bool shutdown_;
std::queue<std::function<void()>> callbacks_;
std::queue<absl::AnyInvocable<void()>> callbacks_;
int reserve_threads_;
int nthreads_;
int threads_waiting_;

@ -105,7 +105,7 @@ gpr_timespec FuzzingEventEngine::GlobalNowImpl(gpr_clock_type clock_type) {
}
void FuzzingEventEngine::Tick() {
std::vector<std::function<void()>> to_run;
std::vector<absl::AnyInvocable<void()>> to_run;
{
grpc_core::MutexLock lock(&mu_);
// Increment time
@ -138,7 +138,7 @@ FuzzingEventEngine::Time FuzzingEventEngine::Now() {
absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
FuzzingEventEngine::CreateListener(Listener::AcceptCallback,
std::function<void(absl::Status)>,
absl::AnyInvocable<void(absl::Status)>,
const EndpointConfig&,
std::unique_ptr<MemoryAllocatorFactory>) {
abort();
@ -163,8 +163,8 @@ void FuzzingEventEngine::Run(Closure* closure) {
RunAfter(Duration::zero(), closure);
}
void FuzzingEventEngine::Run(std::function<void()> closure) {
RunAfter(Duration::zero(), closure);
void FuzzingEventEngine::Run(absl::AnyInvocable<void()> closure) {
RunAfter(Duration::zero(), std::move(closure));
}
EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when,
@ -173,7 +173,7 @@ EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when,
}
EventEngine::TaskHandle FuzzingEventEngine::RunAfter(
Duration when, std::function<void()> closure) {
Duration when, absl::AnyInvocable<void()> closure) {
grpc_core::MutexLock lock(&mu_);
const intptr_t id = next_task_id_;
++next_task_id_;

@ -19,6 +19,8 @@
#include <cstdint>
#include <map>
#include "absl/functional/any_invocable.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
@ -45,7 +47,7 @@ class FuzzingEventEngine : public EventEngine {
absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept,
std::function<void(absl::Status)> on_shutdown,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
override;
@ -64,9 +66,10 @@ class FuzzingEventEngine : public EventEngine {
const DNSResolver::ResolverOptions& options) override;
void Run(Closure* closure) override;
void Run(std::function<void()> closure) override;
void Run(absl::AnyInvocable<void()> closure) override;
TaskHandle RunAfter(Duration when, Closure* closure) override;
TaskHandle RunAfter(Duration when, std::function<void()> closure) override;
TaskHandle RunAfter(Duration when,
absl::AnyInvocable<void()> closure) override;
bool Cancel(TaskHandle handle) override;
using Time = std::chrono::time_point<FuzzingEventEngine, Duration>;
@ -75,10 +78,10 @@ class FuzzingEventEngine : public EventEngine {
private:
struct Task {
Task(intptr_t id, std::function<void()> closure)
Task(intptr_t id, absl::AnyInvocable<void()> closure)
: id(id), closure(std::move(closure)) {}
intptr_t id;
std::function<void()> closure;
absl::AnyInvocable<void()> closure;
};
gpr_timespec NowAsTimespec(gpr_clock_type clock_type)

@ -226,7 +226,7 @@ PosixOracleEndpoint::~PosixOracleEndpoint() {
close(socket_fd_);
}
void PosixOracleEndpoint::Read(std::function<void(absl::Status)> on_read,
void PosixOracleEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer, const ReadArgs* args) {
GPR_ASSERT(buffer != nullptr);
int read_hint_bytes =
@ -236,8 +236,9 @@ void PosixOracleEndpoint::Read(std::function<void(absl::Status)> on_read,
ReadOperation(read_hint_bytes, buffer, std::move(on_read)));
}
void PosixOracleEndpoint::Write(std::function<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* /*args*/) {
void PosixOracleEndpoint::Write(
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
const WriteArgs* /*args*/) {
GPR_ASSERT(data != nullptr);
write_ops_channel_.Set(WriteOperation(data, std::move(on_writable)));
}
@ -245,8 +246,7 @@ void PosixOracleEndpoint::Write(std::function<void(absl::Status)> on_writable,
void PosixOracleEndpoint::ProcessReadOperations() {
gpr_log(GPR_INFO, "Starting thread to process read ops ...");
while (true) {
ReadOperation read_op;
read_op = read_ops_channel_.Get();
ReadOperation read_op = std::move(read_ops_channel_.Get());
read_ops_channel_.Reset();
if (!read_op.IsValid()) {
read_op(std::string(), absl::CancelledError("Closed"));
@ -266,8 +266,7 @@ void PosixOracleEndpoint::ProcessReadOperations() {
void PosixOracleEndpoint::ProcessWriteOperations() {
gpr_log(GPR_INFO, "Starting thread to process write ops ...");
while (true) {
WriteOperation write_op;
write_op = write_ops_channel_.Get();
WriteOperation write_op = std::move(write_ops_channel_.Get());
write_ops_channel_.Reset();
if (!write_op.IsValid()) {
write_op(absl::CancelledError("Closed"));
@ -285,7 +284,7 @@ void PosixOracleEndpoint::ProcessWriteOperations() {
PosixOracleListener::PosixOracleListener(
EventEngine::Listener::AcceptCallback on_accept,
std::function<void(absl::Status)> on_shutdown,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
: on_accept_(std::move(on_accept)),
on_shutdown_(std::move(on_shutdown)),

@ -42,10 +42,10 @@ class PosixOracleEndpoint : public EventEngine::Endpoint {
explicit PosixOracleEndpoint(int socket_fd);
static std::unique_ptr<PosixOracleEndpoint> Create(int socket_fd);
~PosixOracleEndpoint() override;
void Read(std::function<void(absl::Status)> on_read, SliceBuffer* buffer,
void Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
const ReadArgs* args) override;
void Write(std::function<void(absl::Status)> on_writable, SliceBuffer* data,
const WriteArgs* args) override;
void Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* args) override;
void Shutdown();
EventEngine::ResolvedAddress& GetPeerAddress() const override {
GPR_ASSERT(false && "unimplemented");
@ -62,7 +62,7 @@ class PosixOracleEndpoint : public EventEngine::Endpoint {
ReadOperation()
: num_bytes_to_read_(-1), buffer_(nullptr), on_complete_(nullptr) {}
ReadOperation(int num_bytes_to_read, SliceBuffer* buffer,
std::function<void(absl::Status)>&& on_complete)
absl::AnyInvocable<void(absl::Status)>&& on_complete)
: num_bytes_to_read_(num_bytes_to_read),
buffer_(buffer),
on_complete_(std::move(on_complete)) {}
@ -78,7 +78,7 @@ class PosixOracleEndpoint : public EventEngine::Endpoint {
private:
int num_bytes_to_read_;
SliceBuffer* buffer_;
std::function<void(absl::Status)> on_complete_;
absl::AnyInvocable<void(absl::Status)> on_complete_;
};
// An internal helper class definition of Write operations to be performed
@ -87,7 +87,7 @@ class PosixOracleEndpoint : public EventEngine::Endpoint {
public:
WriteOperation() : bytes_to_write_(std::string()), on_complete_(nullptr) {}
WriteOperation(SliceBuffer* buffer,
std::function<void(absl::Status)>&& on_complete)
absl::AnyInvocable<void(absl::Status)>&& on_complete)
: bytes_to_write_(ExtractSliceBufferIntoString(buffer)),
on_complete_(std::move(on_complete)) {}
bool IsValid() { return bytes_to_write_.length() > 0; }
@ -100,7 +100,7 @@ class PosixOracleEndpoint : public EventEngine::Endpoint {
private:
std::string bytes_to_write_;
std::function<void(absl::Status)> on_complete_;
absl::AnyInvocable<void(absl::Status)> on_complete_;
};
void ProcessReadOperations();
@ -119,7 +119,7 @@ class PosixOracleListener : public EventEngine::Listener {
public:
PosixOracleListener(
EventEngine::Listener::AcceptCallback on_accept,
std::function<void(absl::Status)> on_shutdown,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory);
~PosixOracleListener() override;
absl::StatusOr<int> Bind(const EventEngine::ResolvedAddress& addr) override;
@ -130,7 +130,7 @@ class PosixOracleListener : public EventEngine::Listener {
mutable absl::Mutex mu_;
EventEngine::Listener::AcceptCallback on_accept_;
std::function<void(absl::Status)> on_shutdown_;
absl::AnyInvocable<void(absl::Status)> on_shutdown_;
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory_;
grpc_core::Thread serve_;
int pipefd_[2];
@ -146,7 +146,7 @@ class PosixOracleEventEngine final : public EventEngine {
absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept,
std::function<void(absl::Status)> on_shutdown,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& /*config*/,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
override {
@ -172,7 +172,7 @@ class PosixOracleEventEngine final : public EventEngine {
void Run(Closure* /*closure*/) override {
GPR_ASSERT(false && "unimplemented");
}
void Run(std::function<void()> /*closure*/) override {
void Run(absl::AnyInvocable<void()> /*closure*/) override {
GPR_ASSERT(false && "unimplemented");
}
TaskHandle RunAfter(EventEngine::Duration /*duration*/,
@ -180,7 +180,7 @@ class PosixOracleEventEngine final : public EventEngine {
GPR_ASSERT(false && "unimplemented");
}
TaskHandle RunAfter(EventEngine::Duration /*duration*/,
std::function<void()> /*closure*/) override {
absl::AnyInvocable<void()> /*closure*/) override {
GPR_ASSERT(false && "unimplemented");
}
bool Cancel(TaskHandle /*handle*/) override {

@ -58,6 +58,8 @@ EXTERNAL_DEPS = {
'absl/container:inlined_vector',
'absl/cleanup/cleanup.h':
'absl/cleanup',
'absl/functional/any_invocable.h':
'absl/functional:any_invocable',
'absl/functional/bind_front.h':
'absl/functional:bind_front',
'absl/functional/function_ref.h':

Loading…
Cancel
Save