diff --git a/CMakeLists.txt b/CMakeLists.txt
index a9e7007f462..98f77605b34 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1021,7 +1021,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx latch_test)
add_dependencies(buildtests_cxx lb_get_cpu_stats_test)
add_dependencies(buildtests_cxx lb_load_data_store_test)
- add_dependencies(buildtests_cxx lock_free_event_test)
+ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
+ add_dependencies(buildtests_cxx lock_free_event_test)
+ endif()
add_dependencies(buildtests_cxx log_test)
add_dependencies(buildtests_cxx loop_test)
add_dependencies(buildtests_cxx map_pipe_test)
@@ -1074,6 +1076,9 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx posix_engine_listener_utils_test)
endif()
+ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
+ add_dependencies(buildtests_cxx posix_event_engine_connect_test)
+ endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx posix_event_engine_test)
endif()
@@ -2092,10 +2097,13 @@ add_library(grpc
src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc
src/core/lib/event_engine/posix_engine/internal_errqueue.cc
src/core/lib/event_engine/posix_engine/lockfree_event.cc
+ src/core/lib/event_engine/posix_engine/posix_endpoint.cc
src/core/lib/event_engine/posix_engine/posix_engine.cc
+ src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
src/core/lib/event_engine/posix_engine/timer.cc
src/core/lib/event_engine/posix_engine/timer_heap.cc
src/core/lib/event_engine/posix_engine/timer_manager.cc
+ src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
@@ -2111,6 +2119,7 @@ add_library(grpc
src/core/lib/event_engine/windows/windows_engine.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
+ src/core/lib/gprpp/load_file.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
src/core/lib/gprpp/time_averaged_stats.cc
@@ -2744,10 +2753,13 @@ add_library(grpc_unsecure
src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc
src/core/lib/event_engine/posix_engine/internal_errqueue.cc
src/core/lib/event_engine/posix_engine/lockfree_event.cc
+ src/core/lib/event_engine/posix_engine/posix_endpoint.cc
src/core/lib/event_engine/posix_engine/posix_engine.cc
+ src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
src/core/lib/event_engine/posix_engine/timer.cc
src/core/lib/event_engine/posix_engine/timer_heap.cc
src/core/lib/event_engine/posix_engine/timer_manager.cc
+ src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
@@ -2763,6 +2775,7 @@ add_library(grpc_unsecure
src/core/lib/event_engine/windows/windows_engine.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
+ src/core/lib/gprpp/load_file.cc
src/core/lib/gprpp/status_helper.cc
src/core/lib/gprpp/time.cc
src/core/lib/gprpp/time_averaged_stats.cc
@@ -13201,39 +13214,41 @@ target_link_libraries(lb_load_data_store_test
endif()
if(gRPC_BUILD_TESTS)
+if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
-add_executable(lock_free_event_test
- test/core/event_engine/posix/lock_free_event_test.cc
- third_party/googletest/googletest/src/gtest-all.cc
- third_party/googletest/googlemock/src/gmock-all.cc
-)
+ add_executable(lock_free_event_test
+ test/core/event_engine/posix/lock_free_event_test.cc
+ third_party/googletest/googletest/src/gtest-all.cc
+ third_party/googletest/googlemock/src/gmock-all.cc
+ )
-target_include_directories(lock_free_event_test
- PRIVATE
- ${CMAKE_CURRENT_SOURCE_DIR}
- ${CMAKE_CURRENT_SOURCE_DIR}/include
- ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
- ${_gRPC_RE2_INCLUDE_DIR}
- ${_gRPC_SSL_INCLUDE_DIR}
- ${_gRPC_UPB_GENERATED_DIR}
- ${_gRPC_UPB_GRPC_GENERATED_DIR}
- ${_gRPC_UPB_INCLUDE_DIR}
- ${_gRPC_XXHASH_INCLUDE_DIR}
- ${_gRPC_ZLIB_INCLUDE_DIR}
- third_party/googletest/googletest/include
- third_party/googletest/googletest
- third_party/googletest/googlemock/include
- third_party/googletest/googlemock
- ${_gRPC_PROTO_GENS_DIR}
-)
+ target_include_directories(lock_free_event_test
+ PRIVATE
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_SOURCE_DIR}/include
+ ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+ ${_gRPC_RE2_INCLUDE_DIR}
+ ${_gRPC_SSL_INCLUDE_DIR}
+ ${_gRPC_UPB_GENERATED_DIR}
+ ${_gRPC_UPB_GRPC_GENERATED_DIR}
+ ${_gRPC_UPB_INCLUDE_DIR}
+ ${_gRPC_XXHASH_INCLUDE_DIR}
+ ${_gRPC_ZLIB_INCLUDE_DIR}
+ third_party/googletest/googletest/include
+ third_party/googletest/googletest
+ third_party/googletest/googlemock/include
+ third_party/googletest/googlemock
+ ${_gRPC_PROTO_GENS_DIR}
+ )
-target_link_libraries(lock_free_event_test
- ${_gRPC_PROTOBUF_LIBRARIES}
- ${_gRPC_ALLTARGETS_LIBRARIES}
- grpc_test_util
-)
+ target_link_libraries(lock_free_event_test
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc_test_util
+ )
+endif()
endif()
if(gRPC_BUILD_TESTS)
@@ -14833,10 +14848,6 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(posix_endpoint_test
- src/core/lib/event_engine/posix_engine/posix_endpoint.cc
- src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
- src/core/lib/gprpp/load_file.cc
test/core/event_engine/posix/posix_endpoint_test.cc
test/core/event_engine/posix/posix_engine_test_utils.cc
test/core/event_engine/test_suite/event_engine_test.cc
@@ -14879,7 +14890,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(posix_engine_listener_utils_test
src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
- src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
test/core/event_engine/posix/posix_engine_listener_utils_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@@ -14911,14 +14921,56 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
)
+endif()
+endif()
+if(gRPC_BUILD_TESTS)
+if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
+
+ add_executable(posix_event_engine_connect_test
+ test/core/event_engine/posix/posix_event_engine_connect_test.cc
+ test/core/event_engine/test_suite/event_engine_test.cc
+ test/core/event_engine/test_suite/event_engine_test_utils.cc
+ test/core/event_engine/test_suite/oracle_event_engine_posix.cc
+ third_party/googletest/googletest/src/gtest-all.cc
+ third_party/googletest/googlemock/src/gmock-all.cc
+ )
+
+ target_include_directories(posix_event_engine_connect_test
+ PRIVATE
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_SOURCE_DIR}/include
+ ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+ ${_gRPC_RE2_INCLUDE_DIR}
+ ${_gRPC_SSL_INCLUDE_DIR}
+ ${_gRPC_UPB_GENERATED_DIR}
+ ${_gRPC_UPB_GRPC_GENERATED_DIR}
+ ${_gRPC_UPB_INCLUDE_DIR}
+ ${_gRPC_XXHASH_INCLUDE_DIR}
+ ${_gRPC_ZLIB_INCLUDE_DIR}
+ third_party/googletest/googletest/include
+ third_party/googletest/googletest
+ third_party/googletest/googlemock/include
+ third_party/googletest/googlemock
+ ${_gRPC_PROTO_GENS_DIR}
+ )
+
+ target_link_libraries(posix_event_engine_connect_test
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc_test_util
+ )
+
+
endif()
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(posix_event_engine_test
+ test/core/event_engine/test_suite/client_test.cc
test/core/event_engine/test_suite/event_engine_test.cc
test/core/event_engine/test_suite/event_engine_test_utils.cc
+ test/core/event_engine/test_suite/oracle_event_engine_posix.cc
test/core/event_engine/test_suite/posix_event_engine_test.cc
test/core/event_engine/test_suite/timer_test.cc
third_party/googletest/googletest/src/gtest-all.cc
@@ -18158,7 +18210,6 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(tcp_posix_socket_utils_test
- src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
test/core/event_engine/posix/tcp_posix_socket_utils_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@@ -18407,7 +18458,6 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(test_core_gprpp_load_file_test
- src/core/lib/gprpp/load_file.cc
test/core/gprpp/load_file_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@@ -19406,7 +19456,6 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(traced_buffer_list_test
- src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
test/core/event_engine/posix/traced_buffer_list_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
diff --git a/Makefile b/Makefile
index 86ed1146edc..ad94b60d2f1 100644
--- a/Makefile
+++ b/Makefile
@@ -1399,10 +1399,13 @@ LIBGRPC_SRC = \
src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc \
src/core/lib/event_engine/posix_engine/internal_errqueue.cc \
src/core/lib/event_engine/posix_engine/lockfree_event.cc \
+ src/core/lib/event_engine/posix_engine/posix_endpoint.cc \
src/core/lib/event_engine/posix_engine/posix_engine.cc \
+ src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/posix_engine/timer.cc \
src/core/lib/event_engine/posix_engine/timer_heap.cc \
src/core/lib/event_engine/posix_engine/timer_manager.cc \
+ src/core/lib/event_engine/posix_engine/traced_buffer_list.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc \
@@ -1418,6 +1421,7 @@ LIBGRPC_SRC = \
src/core/lib/event_engine/windows/windows_engine.cc \
src/core/lib/experiments/config.cc \
src/core/lib/experiments/experiments.cc \
+ src/core/lib/gprpp/load_file.cc \
src/core/lib/gprpp/status_helper.cc \
src/core/lib/gprpp/time.cc \
src/core/lib/gprpp/time_averaged_stats.cc \
@@ -1914,10 +1918,13 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc \
src/core/lib/event_engine/posix_engine/internal_errqueue.cc \
src/core/lib/event_engine/posix_engine/lockfree_event.cc \
+ src/core/lib/event_engine/posix_engine/posix_endpoint.cc \
src/core/lib/event_engine/posix_engine/posix_engine.cc \
+ src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/posix_engine/timer.cc \
src/core/lib/event_engine/posix_engine/timer_heap.cc \
src/core/lib/event_engine/posix_engine/timer_manager.cc \
+ src/core/lib/event_engine/posix_engine/traced_buffer_list.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc \
@@ -1933,6 +1940,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/event_engine/windows/windows_engine.cc \
src/core/lib/experiments/config.cc \
src/core/lib/experiments/experiments.cc \
+ src/core/lib/gprpp/load_file.cc \
src/core/lib/gprpp/status_helper.cc \
src/core/lib/gprpp/time.cc \
src/core/lib/gprpp/time_averaged_stats.cc \
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 9fcad05ca12..3d9761d3000 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -756,11 +756,14 @@ libs:
- src/core/lib/event_engine/posix_engine/event_poller_posix_default.h
- src/core/lib/event_engine/posix_engine/internal_errqueue.h
- src/core/lib/event_engine/posix_engine/lockfree_event.h
+ - src/core/lib/event_engine/posix_engine/posix_endpoint.h
- src/core/lib/event_engine/posix_engine/posix_engine.h
- src/core/lib/event_engine/posix_engine/posix_engine_closure.h
+ - src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
- src/core/lib/event_engine/posix_engine/timer.h
- src/core/lib/event_engine/posix_engine/timer_heap.h
- src/core/lib/event_engine/posix_engine/timer_manager.h
+ - src/core/lib/event_engine/posix_engine/traced_buffer_list.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
@@ -782,6 +785,7 @@ libs:
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/debug_location.h
- src/core/lib/gprpp/dual_ref_counted.h
+ - src/core/lib/gprpp/load_file.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/match.h
- src/core/lib/gprpp/notification.h
@@ -1483,10 +1487,13 @@ libs:
- src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc
- src/core/lib/event_engine/posix_engine/internal_errqueue.cc
- src/core/lib/event_engine/posix_engine/lockfree_event.cc
+ - src/core/lib/event_engine/posix_engine/posix_endpoint.cc
- src/core/lib/event_engine/posix_engine/posix_engine.cc
+ - src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/posix_engine/timer.cc
- src/core/lib/event_engine/posix_engine/timer_heap.cc
- src/core/lib/event_engine/posix_engine/timer_manager.cc
+ - src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
@@ -1502,6 +1509,7 @@ libs:
- src/core/lib/event_engine/windows/windows_engine.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
+ - src/core/lib/gprpp/load_file.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
- src/core/lib/gprpp/time_averaged_stats.cc
@@ -2017,11 +2025,14 @@ libs:
- src/core/lib/event_engine/posix_engine/event_poller_posix_default.h
- src/core/lib/event_engine/posix_engine/internal_errqueue.h
- src/core/lib/event_engine/posix_engine/lockfree_event.h
+ - src/core/lib/event_engine/posix_engine/posix_endpoint.h
- src/core/lib/event_engine/posix_engine/posix_engine.h
- src/core/lib/event_engine/posix_engine/posix_engine_closure.h
+ - src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
- src/core/lib/event_engine/posix_engine/timer.h
- src/core/lib/event_engine/posix_engine/timer_heap.h
- src/core/lib/event_engine/posix_engine/timer_manager.h
+ - src/core/lib/event_engine/posix_engine/traced_buffer_list.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
@@ -2043,6 +2054,7 @@ libs:
- src/core/lib/gprpp/cpp_impl_of.h
- src/core/lib/gprpp/debug_location.h
- src/core/lib/gprpp/dual_ref_counted.h
+ - src/core/lib/gprpp/load_file.h
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/match.h
- src/core/lib/gprpp/notification.h
@@ -2389,10 +2401,13 @@ libs:
- src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc
- src/core/lib/event_engine/posix_engine/internal_errqueue.cc
- src/core/lib/event_engine/posix_engine/lockfree_event.cc
+ - src/core/lib/event_engine/posix_engine/posix_endpoint.cc
- src/core/lib/event_engine/posix_engine/posix_engine.cc
+ - src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/posix_engine/timer.cc
- src/core/lib/event_engine/posix_engine/timer_heap.cc
- src/core/lib/event_engine/posix_engine/timer_manager.cc
+ - src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
@@ -2408,6 +2423,7 @@ libs:
- src/core/lib/event_engine/windows/windows_engine.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
+ - src/core/lib/gprpp/load_file.cc
- src/core/lib/gprpp/status_helper.cc
- src/core/lib/gprpp/time.cc
- src/core/lib/gprpp/time_averaged_stats.cc
@@ -7798,6 +7814,10 @@ targets:
- test/core/event_engine/posix/lock_free_event_test.cc
deps:
- grpc_test_util
+ platforms:
+ - linux
+ - posix
+ - mac
uses_polling: false
- name: log_test
gtest: true
@@ -8607,19 +8627,11 @@ targets:
build: test
language: c++
headers:
- - src/core/lib/event_engine/posix_engine/posix_endpoint.h
- - src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
- - src/core/lib/event_engine/posix_engine/traced_buffer_list.h
- - src/core/lib/gprpp/load_file.h
- test/core/event_engine/posix/posix_engine_test_utils.h
- test/core/event_engine/test_suite/event_engine_test.h
- test/core/event_engine/test_suite/event_engine_test_utils.h
- test/core/event_engine/test_suite/oracle_event_engine_posix.h
src:
- - src/core/lib/event_engine/posix_engine/posix_endpoint.cc
- - src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
- - src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
- - src/core/lib/gprpp/load_file.cc
- test/core/event_engine/posix/posix_endpoint_test.cc
- test/core/event_engine/posix/posix_engine_test_utils.cc
- test/core/event_engine/test_suite/event_engine_test.cc
@@ -8637,10 +8649,8 @@ targets:
language: c++
headers:
- src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h
- - src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
src:
- src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
- - src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
- test/core/event_engine/posix/posix_engine_listener_utils_test.cc
deps:
- grpc_test_util
@@ -8648,6 +8658,25 @@ targets:
- linux
- posix
- mac
+- name: posix_event_engine_connect_test
+ gtest: true
+ build: test
+ language: c++
+ headers:
+ - test/core/event_engine/test_suite/event_engine_test.h
+ - test/core/event_engine/test_suite/event_engine_test_utils.h
+ - test/core/event_engine/test_suite/oracle_event_engine_posix.h
+ src:
+ - test/core/event_engine/posix/posix_event_engine_connect_test.cc
+ - test/core/event_engine/test_suite/event_engine_test.cc
+ - test/core/event_engine/test_suite/event_engine_test_utils.cc
+ - test/core/event_engine/test_suite/oracle_event_engine_posix.cc
+ deps:
+ - grpc_test_util
+ platforms:
+ - linux
+ - posix
+ - mac
- name: posix_event_engine_test
gtest: true
build: test
@@ -8655,9 +8684,12 @@ targets:
headers:
- test/core/event_engine/test_suite/event_engine_test.h
- test/core/event_engine/test_suite/event_engine_test_utils.h
+ - test/core/event_engine/test_suite/oracle_event_engine_posix.h
src:
+ - test/core/event_engine/test_suite/client_test.cc
- test/core/event_engine/test_suite/event_engine_test.cc
- test/core/event_engine/test_suite/event_engine_test_utils.cc
+ - test/core/event_engine/test_suite/oracle_event_engine_posix.cc
- test/core/event_engine/test_suite/posix_event_engine_test.cc
- test/core/event_engine/test_suite/timer_test.cc
deps:
@@ -8666,7 +8698,6 @@ targets:
- linux
- posix
- mac
- uses_polling: false
- name: promise_factory_test
gtest: true
build: test
@@ -10120,10 +10151,8 @@ targets:
gtest: true
build: test
language: c++
- headers:
- - src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
+ headers: []
src:
- - src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
- test/core/event_engine/posix/tcp_posix_socket_utils_test.cc
deps:
- grpc_test_util
@@ -10251,10 +10280,8 @@ targets:
gtest: true
build: test
language: c++
- headers:
- - src/core/lib/gprpp/load_file.h
+ headers: []
src:
- - src/core/lib/gprpp/load_file.cc
- test/core/gprpp/load_file_test.cc
deps:
- grpc_test_util
@@ -10674,10 +10701,8 @@ targets:
gtest: true
build: test
language: c++
- headers:
- - src/core/lib/event_engine/posix_engine/traced_buffer_list.h
+ headers: []
src:
- - src/core/lib/event_engine/posix_engine/traced_buffer_list.cc
- test/core/event_engine/posix/traced_buffer_list_test.cc
deps:
- grpc_test_util
diff --git a/config.m4 b/config.m4
index 5618e9a060b..e331bca732a 100644
--- a/config.m4
+++ b/config.m4
@@ -481,10 +481,13 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc \
src/core/lib/event_engine/posix_engine/internal_errqueue.cc \
src/core/lib/event_engine/posix_engine/lockfree_event.cc \
+ src/core/lib/event_engine/posix_engine/posix_endpoint.cc \
src/core/lib/event_engine/posix_engine/posix_engine.cc \
+ src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/posix_engine/timer.cc \
src/core/lib/event_engine/posix_engine/timer_heap.cc \
src/core/lib/event_engine/posix_engine/timer_manager.cc \
+ src/core/lib/event_engine/posix_engine/traced_buffer_list.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc \
@@ -534,6 +537,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/gprpp/fork.cc \
src/core/lib/gprpp/global_config_env.cc \
src/core/lib/gprpp/host_port.cc \
+ src/core/lib/gprpp/load_file.cc \
src/core/lib/gprpp/mpscq.cc \
src/core/lib/gprpp/stat_posix.cc \
src/core/lib/gprpp/stat_windows.cc \
diff --git a/config.w32 b/config.w32
index e2bff4c3171..583161bc457 100644
--- a/config.w32
+++ b/config.w32
@@ -447,10 +447,13 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\event_engine\\posix_engine\\event_poller_posix_default.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\internal_errqueue.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\lockfree_event.cc " +
+ "src\\core\\lib\\event_engine\\posix_engine\\posix_endpoint.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\posix_engine.cc " +
+ "src\\core\\lib\\event_engine\\posix_engine\\tcp_socket_utils.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\timer.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\timer_heap.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\timer_manager.cc " +
+ "src\\core\\lib\\event_engine\\posix_engine\\traced_buffer_list.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\wakeup_fd_eventfd.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\wakeup_fd_pipe.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\wakeup_fd_posix_default.cc " +
@@ -500,6 +503,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\gprpp\\fork.cc " +
"src\\core\\lib\\gprpp\\global_config_env.cc " +
"src\\core\\lib\\gprpp\\host_port.cc " +
+ "src\\core\\lib\\gprpp\\load_file.cc " +
"src\\core\\lib\\gprpp\\mpscq.cc " +
"src\\core\\lib\\gprpp\\stat_posix.cc " +
"src\\core\\lib\\gprpp\\stat_windows.cc " +
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index c28eba71860..8221b167aab 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -699,11 +699,14 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/event_poller_posix_default.h',
'src/core/lib/event_engine/posix_engine/internal_errqueue.h',
'src/core/lib/event_engine/posix_engine/lockfree_event.h',
+ 'src/core/lib/event_engine/posix_engine/posix_endpoint.h',
'src/core/lib/event_engine/posix_engine/posix_engine.h',
'src/core/lib/event_engine/posix_engine/posix_engine_closure.h',
+ 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/posix_engine/timer.h',
'src/core/lib/event_engine/posix_engine/timer_heap.h',
'src/core/lib/event_engine/posix_engine/timer_manager.h',
+ 'src/core/lib/event_engine/posix_engine/traced_buffer_list.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h',
@@ -739,6 +742,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/global_config_env.h',
'src/core/lib/gprpp/global_config_generic.h',
'src/core/lib/gprpp/host_port.h',
+ 'src/core/lib/gprpp/load_file.h',
'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/match.h',
'src/core/lib/gprpp/memory.h',
@@ -1579,11 +1583,14 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/event_poller_posix_default.h',
'src/core/lib/event_engine/posix_engine/internal_errqueue.h',
'src/core/lib/event_engine/posix_engine/lockfree_event.h',
+ 'src/core/lib/event_engine/posix_engine/posix_endpoint.h',
'src/core/lib/event_engine/posix_engine/posix_engine.h',
'src/core/lib/event_engine/posix_engine/posix_engine_closure.h',
+ 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/posix_engine/timer.h',
'src/core/lib/event_engine/posix_engine/timer_heap.h',
'src/core/lib/event_engine/posix_engine/timer_manager.h',
+ 'src/core/lib/event_engine/posix_engine/traced_buffer_list.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h',
@@ -1619,6 +1626,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/global_config_env.h',
'src/core/lib/gprpp/global_config_generic.h',
'src/core/lib/gprpp/host_port.h',
+ 'src/core/lib/gprpp/load_file.h',
'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/match.h',
'src/core/lib/gprpp/memory.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index a64908755ea..5c760c704fa 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -1074,15 +1074,21 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/internal_errqueue.h',
'src/core/lib/event_engine/posix_engine/lockfree_event.cc',
'src/core/lib/event_engine/posix_engine/lockfree_event.h',
+ 'src/core/lib/event_engine/posix_engine/posix_endpoint.cc',
+ 'src/core/lib/event_engine/posix_engine/posix_endpoint.h',
'src/core/lib/event_engine/posix_engine/posix_engine.cc',
'src/core/lib/event_engine/posix_engine/posix_engine.h',
'src/core/lib/event_engine/posix_engine/posix_engine_closure.h',
+ 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc',
+ 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/posix_engine/timer.cc',
'src/core/lib/event_engine/posix_engine/timer.h',
'src/core/lib/event_engine/posix_engine/timer_heap.cc',
'src/core/lib/event_engine/posix_engine/timer_heap.h',
'src/core/lib/event_engine/posix_engine/timer_manager.cc',
'src/core/lib/event_engine/posix_engine/timer_manager.h',
+ 'src/core/lib/event_engine/posix_engine/traced_buffer_list.cc',
+ 'src/core/lib/event_engine/posix_engine/traced_buffer_list.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc',
@@ -1167,6 +1173,8 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/global_config_generic.h',
'src/core/lib/gprpp/host_port.cc',
'src/core/lib/gprpp/host_port.h',
+ 'src/core/lib/gprpp/load_file.cc',
+ 'src/core/lib/gprpp/load_file.h',
'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/match.h',
'src/core/lib/gprpp/memory.h',
@@ -2214,11 +2222,14 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/event_poller_posix_default.h',
'src/core/lib/event_engine/posix_engine/internal_errqueue.h',
'src/core/lib/event_engine/posix_engine/lockfree_event.h',
+ 'src/core/lib/event_engine/posix_engine/posix_endpoint.h',
'src/core/lib/event_engine/posix_engine/posix_engine.h',
'src/core/lib/event_engine/posix_engine/posix_engine_closure.h',
+ 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/posix_engine/timer.h',
'src/core/lib/event_engine/posix_engine/timer_heap.h',
'src/core/lib/event_engine/posix_engine/timer_manager.h',
+ 'src/core/lib/event_engine/posix_engine/traced_buffer_list.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h',
@@ -2254,6 +2265,7 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/global_config_env.h',
'src/core/lib/gprpp/global_config_generic.h',
'src/core/lib/gprpp/host_port.h',
+ 'src/core/lib/gprpp/load_file.h',
'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/match.h',
'src/core/lib/gprpp/memory.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index afc4820ab59..78b73fce800 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -985,15 +985,21 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/posix_engine/internal_errqueue.h )
s.files += %w( src/core/lib/event_engine/posix_engine/lockfree_event.cc )
s.files += %w( src/core/lib/event_engine/posix_engine/lockfree_event.h )
+ s.files += %w( src/core/lib/event_engine/posix_engine/posix_endpoint.cc )
+ s.files += %w( src/core/lib/event_engine/posix_engine/posix_endpoint.h )
s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine.cc )
s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine.h )
s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine_closure.h )
+ s.files += %w( src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc )
+ s.files += %w( src/core/lib/event_engine/posix_engine/tcp_socket_utils.h )
s.files += %w( src/core/lib/event_engine/posix_engine/timer.cc )
s.files += %w( src/core/lib/event_engine/posix_engine/timer.h )
s.files += %w( src/core/lib/event_engine/posix_engine/timer_heap.cc )
s.files += %w( src/core/lib/event_engine/posix_engine/timer_heap.h )
s.files += %w( src/core/lib/event_engine/posix_engine/timer_manager.cc )
s.files += %w( src/core/lib/event_engine/posix_engine/timer_manager.h )
+ s.files += %w( src/core/lib/event_engine/posix_engine/traced_buffer_list.cc )
+ s.files += %w( src/core/lib/event_engine/posix_engine/traced_buffer_list.h )
s.files += %w( src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc )
s.files += %w( src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.h )
s.files += %w( src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc )
@@ -1078,6 +1084,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/gprpp/global_config_generic.h )
s.files += %w( src/core/lib/gprpp/host_port.cc )
s.files += %w( src/core/lib/gprpp/host_port.h )
+ s.files += %w( src/core/lib/gprpp/load_file.cc )
+ s.files += %w( src/core/lib/gprpp/load_file.h )
s.files += %w( src/core/lib/gprpp/manual_constructor.h )
s.files += %w( src/core/lib/gprpp/match.h )
s.files += %w( src/core/lib/gprpp/memory.h )
diff --git a/grpc.gyp b/grpc.gyp
index ddf64da5784..682b776a3dc 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -813,10 +813,13 @@
'src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc',
'src/core/lib/event_engine/posix_engine/internal_errqueue.cc',
'src/core/lib/event_engine/posix_engine/lockfree_event.cc',
+ 'src/core/lib/event_engine/posix_engine/posix_endpoint.cc',
'src/core/lib/event_engine/posix_engine/posix_engine.cc',
+ 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/posix_engine/timer.cc',
'src/core/lib/event_engine/posix_engine/timer_heap.cc',
'src/core/lib/event_engine/posix_engine/timer_manager.cc',
+ 'src/core/lib/event_engine/posix_engine/traced_buffer_list.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc',
@@ -832,6 +835,7 @@
'src/core/lib/event_engine/windows/windows_engine.cc',
'src/core/lib/experiments/config.cc',
'src/core/lib/experiments/experiments.cc',
+ 'src/core/lib/gprpp/load_file.cc',
'src/core/lib/gprpp/status_helper.cc',
'src/core/lib/gprpp/time.cc',
'src/core/lib/gprpp/time_averaged_stats.cc',
@@ -1275,10 +1279,13 @@
'src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc',
'src/core/lib/event_engine/posix_engine/internal_errqueue.cc',
'src/core/lib/event_engine/posix_engine/lockfree_event.cc',
+ 'src/core/lib/event_engine/posix_engine/posix_endpoint.cc',
'src/core/lib/event_engine/posix_engine/posix_engine.cc',
+ 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/posix_engine/timer.cc',
'src/core/lib/event_engine/posix_engine/timer_heap.cc',
'src/core/lib/event_engine/posix_engine/timer_manager.cc',
+ 'src/core/lib/event_engine/posix_engine/traced_buffer_list.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc',
@@ -1294,6 +1301,7 @@
'src/core/lib/event_engine/windows/windows_engine.cc',
'src/core/lib/experiments/config.cc',
'src/core/lib/experiments/experiments.cc',
+ 'src/core/lib/gprpp/load_file.cc',
'src/core/lib/gprpp/status_helper.cc',
'src/core/lib/gprpp/time.cc',
'src/core/lib/gprpp/time_averaged_stats.cc',
diff --git a/package.xml b/package.xml
index 63661275d47..0bafd0f35e7 100644
--- a/package.xml
+++ b/package.xml
@@ -967,15 +967,21 @@
+
+
+
+
+
+
@@ -1060,6 +1066,8 @@
+
+
diff --git a/src/core/BUILD b/src/core/BUILD
index b9eeafb592e..3a1f57d72b6 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -1507,7 +1507,6 @@ grpc_cc_library(
"absl/status",
"absl/status:statusor",
"absl/strings",
- "absl/synchronization",
],
deps = [
"event_engine_poller",
@@ -1541,7 +1540,6 @@ grpc_cc_library(
"absl/status",
"absl/status:statusor",
"absl/strings",
- "absl/synchronization",
],
deps = [
"common_event_engine_closures",
@@ -1632,6 +1630,7 @@ grpc_cc_library(
"absl/types:optional",
],
deps = [
+ "event_engine_common",
"experiments",
"iomgr_port",
"load_file",
@@ -1734,8 +1733,12 @@ grpc_cc_library(
hdrs = ["lib/event_engine/posix_engine/posix_engine.h"],
external_deps = [
"absl/base:core_headers",
+ "absl/cleanup",
+ "absl/container:flat_hash_map",
"absl/container:flat_hash_set",
"absl/functional:any_invocable",
+ "absl/hash",
+ "absl/meta:type_traits",
"absl/status",
"absl/status:statusor",
"absl/strings",
@@ -1749,8 +1752,11 @@ grpc_cc_library(
"experiments",
"init_internally",
"iomgr_port",
+ "posix_event_engine_closure",
+ "posix_event_engine_endpoint",
"posix_event_engine_event_poller",
"posix_event_engine_poller_posix_default",
+ "posix_event_engine_tcp_socket_utils",
"posix_event_engine_timer",
"posix_event_engine_timer_manager",
"//:event_engine_base_hdrs",
diff --git a/src/core/lib/event_engine/poller.h b/src/core/lib/event_engine/poller.h
index 0ef6ffc03a4..7f70fae6436 100644
--- a/src/core/lib/event_engine/poller.h
+++ b/src/core/lib/event_engine/poller.h
@@ -40,10 +40,16 @@ class Poller {
// polling.
//
// Returns:
- // * Poller::WorkResult::kKicked if it was Kicked.
- // * Poller::WorkResult::kDeadlineExceeded if timeout occurred
- // * Poller::WorkResult::kOk, otherwise indicating that the callback function
- // was run synchonously before some events were processed.
+ // * Poller::WorkResult::kKicked if it was Kicked. A poller that was Kicked
+ // may still process some events and if so, it may have run the
+ // schedule_poll_again callback function synchronously. When the poller
+ // returns Poller::WorkResult::kKicked tts upto the user to determine
+ // if the schedule_poll_again callback has run or not.
+ // * Poller::WorkResult::kDeadlineExceeded if timeout occurred. The
+ // schedule_poll_again callback is not run in this case.
+ // * Poller::WorkResult::kOk, otherwise indicating that the
+ // schedule_poll_again callback function was run synchonously before some
+ // events were processed.
virtual WorkResult Work(EventEngine::Duration timeout,
absl::FunctionRef schedule_poll_again) = 0;
// Trigger the threads executing Work(..) to break out as soon as possible.
diff --git a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
index c1ba73bf456..2b91ea1f637 100644
--- a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
+++ b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
@@ -40,8 +40,6 @@
#include
#include
-#include "absl/synchronization/mutex.h"
-
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/lockfree_event.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
@@ -49,6 +47,7 @@
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h"
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/strerror.h"
+#include "src/core/lib/gprpp/sync.h"
using ::grpc_event_engine::posix_engine::LockfreeEvent;
using ::grpc_event_engine::posix_engine::WakeupFd;
@@ -138,7 +137,7 @@ class Epoll1EventHandle : public EventHandle {
error_closure_->SetReady();
}
}
- absl::Mutex* mu() { return &mu_; }
+ grpc_core::Mutex* mu() { return &mu_; }
LockfreeEvent* ReadClosure() { return read_closure_.get(); }
LockfreeEvent* WriteClosure() { return write_closure_.get(); }
LockfreeEvent* ErrorClosure() { return error_closure_.get(); }
@@ -149,7 +148,7 @@ class Epoll1EventHandle : public EventHandle {
void HandleShutdownInternal(absl::Status why, bool releasing_fd);
// See Epoll1Poller::ShutdownHandle for explanation on why a mutex is
// required.
- absl::Mutex mu_;
+ grpc_core::Mutex mu_;
int fd_;
// See Epoll1Poller::SetPendingActions for explanation on why pending_<***>_
// need to be atomic.
@@ -307,7 +306,7 @@ void Epoll1EventHandle::OrphanHandle(PosixEngineClosure* on_done,
{
// See Epoll1Poller::ShutdownHandle for explanation on why a mutex is
// required here.
- absl::MutexLock lock(&mu_);
+ grpc_core::MutexLock lock(&mu_);
read_closure_->DestroyEvent();
write_closure_->DestroyEvent();
error_closure_->DestroyEvent();
@@ -316,7 +315,7 @@ void Epoll1EventHandle::OrphanHandle(PosixEngineClosure* on_done,
pending_write_.store(false, std::memory_order_release);
pending_error_.store(false, std::memory_order_release);
{
- absl::MutexLock lock(&poller_->mu_);
+ grpc_core::MutexLock lock(&poller_->mu_);
poller_->free_epoll1_handles_list_.push_back(this);
}
if (on_done != nullptr) {
@@ -374,7 +373,7 @@ Epoll1Poller::~Epoll1Poller() {
g_epoll_set_.epfd = -1;
}
{
- absl::MutexLock lock(&mu_);
+ grpc_core::MutexLock lock(&mu_);
while (!free_epoll1_handles_list_.empty()) {
Epoll1EventHandle* handle = reinterpret_cast(
free_epoll1_handles_list_.front());
@@ -388,7 +387,7 @@ EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/,
bool track_err) {
Epoll1EventHandle* new_handle = nullptr;
{
- absl::MutexLock lock(&mu_);
+ grpc_core::MutexLock lock(&mu_);
if (free_epoll1_handles_list_.empty()) {
new_handle = new Epoll1EventHandle(fd, this);
} else {
@@ -487,7 +486,7 @@ void Epoll1EventHandle::ShutdownHandle(absl::Status why) {
// in parallel is not safe because some of the lockfree event types e.g, read,
// write, error may-not have called SetShutdown when DestroyEvent gets
// called in the OrphanHandle method.
- absl::MutexLock lock(&mu_);
+ grpc_core::MutexLock lock(&mu_);
HandleShutdownInternal(why, false);
}
@@ -521,18 +520,20 @@ Poller::WorkResult Epoll1Poller::Work(
EventEngine::Duration timeout,
absl::FunctionRef schedule_poll_again) {
Events pending_events;
+ bool was_kicked_ext = false;
if (g_epoll_set_.cursor == g_epoll_set_.num_events) {
if (DoEpollWait(timeout) == 0) {
return Poller::WorkResult::kDeadlineExceeded;
}
}
{
- absl::MutexLock lock(&mu_);
+ grpc_core::MutexLock lock(&mu_);
// If was_kicked_ is true, collect all pending events in this iteration.
if (ProcessEpollEvents(
was_kicked_ ? INT_MAX : MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION,
pending_events)) {
was_kicked_ = false;
+ was_kicked_ext = true;
}
if (pending_events.empty()) {
return Poller::WorkResult::kKicked;
@@ -544,11 +545,11 @@ Poller::WorkResult Epoll1Poller::Work(
for (auto& it : pending_events) {
it->ExecutePendingActions();
}
- return Poller::WorkResult::kOk;
+ return was_kicked_ext ? Poller::WorkResult::kKicked : Poller::WorkResult::kOk;
}
void Epoll1Poller::Kick() {
- absl::MutexLock lock(&mu_);
+ grpc_core::MutexLock lock(&mu_);
if (was_kicked_) {
return;
}
diff --git a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
index ef06a40e6c0..94e22ee43ae 100644
--- a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
+++ b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
@@ -24,7 +24,6 @@
#include "absl/container/inlined_vector.h"
#include "absl/functional/function_ref.h"
#include "absl/strings/string_view.h"
-#include "absl/synchronization/mutex.h"
#include
@@ -32,6 +31,7 @@
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
+#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_LINUX_EPOLL
@@ -110,7 +110,7 @@ class Epoll1Poller : public PosixEventPoller {
#else
struct EpollSet {};
#endif
- absl::Mutex mu_;
+ grpc_core::Mutex mu_;
Scheduler* scheduler_;
// A singleton epoll set
EpollSet g_epoll_set_;
diff --git a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
index fcac4b68e8f..7b47052b0e4 100644
--- a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
+++ b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
@@ -49,8 +49,6 @@
#include
#include
-#include "absl/synchronization/mutex.h"
-
#include
#include "src/core/lib/event_engine/common_closures.h"
@@ -60,6 +58,7 @@
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/global_config.h"
#include "src/core/lib/gprpp/strerror.h"
+#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
GPR_GLOBAL_CONFIG_DECLARE_STRING(grpc_poll_strategy);
@@ -100,7 +99,7 @@ class PollEventHandle : public EventHandle {
write_closure_(
reinterpret_cast(kClosureNotReady)) {
poller_->Ref();
- absl::MutexLock lock(&poller_->mu_);
+ grpc_core::MutexLock lock(&poller_->mu_);
poller_->PollerHandlesListAddHandle(this);
}
PollPoller* Poller() override { return poller_; }
@@ -118,7 +117,7 @@ class PollEventHandle : public EventHandle {
return false;
}
void ForceRemoveHandleFromPoller() {
- absl::MutexLock lock(&poller_->mu_);
+ grpc_core::MutexLock lock(&poller_->mu_);
poller_->PollerHandlesListRemoveHandle(this);
}
int WrappedFd() override { return fd_; }
@@ -155,13 +154,13 @@ class PollEventHandle : public EventHandle {
void SetWritable() override;
void SetHasError() override;
bool IsHandleShutdown() override {
- absl::MutexLock lock(&mu_);
+ grpc_core::MutexLock lock(&mu_);
return is_shutdown_;
};
inline void ExecutePendingActions() {
int kick = 0;
{
- absl::MutexLock lock(&mu_);
+ grpc_core::MutexLock lock(&mu_);
if ((pending_actions_ & 1UL)) {
if (SetReadyLocked(&read_closure_)) {
kick = 1;
@@ -196,7 +195,7 @@ class PollEventHandle : public EventHandle {
}
}
~PollEventHandle() override = default;
- absl::Mutex* mu() ABSL_LOCK_RETURNED(mu_) { return &mu_; }
+ grpc_core::Mutex* mu() ABSL_LOCK_RETURNED(mu_) { return &mu_; }
PollPoller::HandlesList& ForkFdListPos() { return fork_fd_list_; }
PollPoller::HandlesList& PollerHandlesListPos() {
return poller_handles_list_;
@@ -211,7 +210,7 @@ class PollEventHandle : public EventHandle {
int NotifyOnLocked(PosixEngineClosure** st, PosixEngineClosure* closure);
// See Epoll1Poller::ShutdownHandle for explanation on why a mutex is
// required.
- absl::Mutex mu_;
+ grpc_core::Mutex mu_;
std::atomic ref_count_{1};
int fd_;
int pending_actions_;
@@ -361,11 +360,11 @@ EventHandle* PollPoller::CreateHandle(int fd, absl::string_view /*name*/,
}
void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
- absl::string_view /* reason */) {
+ absl::string_view /*reason*/) {
ForkFdListRemoveHandle(this);
ForceRemoveHandleFromPoller();
{
- absl::ReleasableMutexLock lock(&mu_);
+ grpc_core::ReleasableMutexLock lock(&mu_);
on_done_ = on_done;
released_ = release_fd != nullptr;
if (release_fd != nullptr) {
@@ -448,7 +447,7 @@ void PollEventHandle::ShutdownHandle(absl::Status why) {
// of a closure which calls OrphanHandle or poller->Shutdown() prematurely.
Ref();
{
- absl::MutexLock lock(&mu_);
+ grpc_core::MutexLock lock(&mu_);
// only shutdown once
if (!is_shutdown_) {
is_shutdown_ = true;
@@ -469,7 +468,7 @@ void PollEventHandle::NotifyOnRead(PosixEngineClosure* on_read) {
// poller->Shutdown() prematurely.
Ref();
{
- absl::ReleasableMutexLock lock(&mu_);
+ grpc_core::ReleasableMutexLock lock(&mu_);
if (NotifyOnLocked(&read_closure_, on_read)) {
lock.Release();
// NotifyOnLocked immediately scheduled some closure. It would have set
@@ -491,7 +490,7 @@ void PollEventHandle::NotifyOnWrite(PosixEngineClosure* on_write) {
// poller->Shutdown() prematurely.
Ref();
{
- absl::ReleasableMutexLock lock(&mu_);
+ grpc_core::ReleasableMutexLock lock(&mu_);
if (NotifyOnLocked(&write_closure_, on_write)) {
lock.Release();
// NotifyOnLocked immediately scheduled some closure. It would have set
@@ -517,7 +516,7 @@ void PollEventHandle::NotifyOnError(PosixEngineClosure* on_error) {
void PollEventHandle::SetReadable() {
Ref();
{
- absl::MutexLock lock(&mu_);
+ grpc_core::MutexLock lock(&mu_);
SetReadyLocked(&read_closure_);
}
Unref();
@@ -526,7 +525,7 @@ void PollEventHandle::SetReadable() {
void PollEventHandle::SetWritable() {
Ref();
{
- absl::MutexLock lock(&mu_);
+ grpc_core::MutexLock lock(&mu_);
SetReadyLocked(&write_closure_);
}
Unref();
@@ -570,7 +569,7 @@ bool PollEventHandle::EndPollLocked(bool got_read, bool got_write) {
}
void PollPoller::KickExternal(bool ext) {
- absl::MutexLock lock(&mu_);
+ grpc_core::MutexLock lock(&mu_);
if (was_kicked_) {
if (ext) {
was_kicked_ext_ = true;
@@ -649,6 +648,7 @@ Poller::WorkResult PollPoller::Work(
bool was_kicked_ext = false;
PollEventHandle* watcher_space[inline_elements];
Events pending_events;
+ pending_events.clear();
int timeout_ms =
static_cast(grpc_event_engine::experimental::Milliseconds(timeout));
mu_.Lock();
@@ -684,7 +684,7 @@ Poller::WorkResult PollPoller::Work(
PollEventHandle* head = poll_handles_list_head_;
while (head != nullptr) {
{
- absl::MutexLock lock(head->mu());
+ grpc_core::MutexLock lock(head->mu());
// There shouldn't be any orphaned fds at this point. This is because
// prior to marking a handle as orphaned it is first removed from
// poll handle list for the poller under the poller lock.
@@ -733,7 +733,7 @@ Poller::WorkResult PollPoller::Work(
for (i = 1; i < pfd_count; i++) {
PollEventHandle* head = watchers[i];
int watch_mask;
- absl::ReleasableMutexLock lock(head->mu());
+ grpc_core::ReleasableMutexLock lock(head->mu());
if (head->IsWatched(watch_mask)) {
head->SetWatched(-1);
// This fd was Watched with a watch mask > 0.
@@ -772,7 +772,7 @@ Poller::WorkResult PollPoller::Work(
for (i = 1; i < pfd_count; i++) {
PollEventHandle* head = watchers[i];
int watch_mask;
- absl::ReleasableMutexLock lock(head->mu());
+ grpc_core::ReleasableMutexLock lock(head->mu());
if (!head->IsWatched(watch_mask) || watch_mask == 0) {
// IsWatched will be false if an orphan was invoked on the
// handle while it was being polled. If watch_mask is 0, then the fd
@@ -828,7 +828,7 @@ Poller::WorkResult PollPoller::Work(
for (auto& it : pending_events) {
it->ExecutePendingActions();
}
- return Poller::WorkResult::kOk;
+ return was_kicked_ext ? Poller::WorkResult::kKicked : Poller::WorkResult::kOk;
}
void PollPoller::Shutdown() {
diff --git a/src/core/lib/event_engine/posix_engine/ev_poll_posix.h b/src/core/lib/event_engine/posix_engine/ev_poll_posix.h
index 04d1a248c75..d751a32277b 100644
--- a/src/core/lib/event_engine/posix_engine/ev_poll_posix.h
+++ b/src/core/lib/event_engine/posix_engine/ev_poll_posix.h
@@ -24,13 +24,13 @@
#include "absl/base/thread_annotations.h"
#include "absl/functional/function_ref.h"
#include "absl/strings/string_view.h"
-#include "absl/synchronization/mutex.h"
#include
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
+#include "src/core/lib/gprpp/sync.h"
namespace grpc_event_engine {
namespace posix_engine {
@@ -74,7 +74,7 @@ class PollPoller : public PosixEventPoller {
PollEventHandle* next = nullptr;
PollEventHandle* prev = nullptr;
};
- absl::Mutex mu_;
+ grpc_core::Mutex mu_;
Scheduler* scheduler_;
std::atomic ref_count_{1};
bool use_phony_poll_;
diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc
index e6162126aa0..082012ec602 100644
--- a/src/core/lib/event_engine/posix_engine/posix_engine.cc
+++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc
@@ -15,18 +15,24 @@
#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
+#include
#include
#include
+#include
#include
#include
#include
+#include "absl/cleanup/cleanup.h"
#include "absl/container/flat_hash_set.h"
#include "absl/functional/any_invocable.h"
+#include "absl/meta/type_traits.h"
#include "absl/status/status.h"
+#include "absl/strings/str_cat.h"
#include
#include
+#include
#include
#include "src/core/lib/debug/trace.h"
@@ -38,7 +44,13 @@
#include "src/core/lib/gprpp/sync.h"
#ifdef GRPC_POSIX_SOCKET_TCP
+#include // IWYU pragma: keep
+#include // IWYU pragma: keep
+#include // IWYU pragma: keep
+
+#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
+#include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
#endif // GRPC_POSIX_SOCKET_TCP
// IWYU pragma: no_include
@@ -49,7 +61,233 @@ namespace grpc_event_engine {
namespace experimental {
#ifdef GRPC_POSIX_SOCKET_TCP
-using grpc_event_engine::posix_engine::PosixEventPoller;
+using ::grpc_event_engine::posix_engine::EventHandle;
+using ::grpc_event_engine::posix_engine::PosixEngineClosure;
+using ::grpc_event_engine::posix_engine::PosixEventPoller;
+using ::grpc_event_engine::posix_engine::PosixSocketWrapper;
+using ::grpc_event_engine::posix_engine::PosixTcpOptions;
+using ::grpc_event_engine::posix_engine::SockaddrToString;
+using ::grpc_event_engine::posix_engine::TcpOptionsFromEndpointConfig;
+
+void AsyncConnect::Start(EventEngine::Duration timeout) {
+ on_writable_ = PosixEngineClosure::ToPermanentClosure(
+ [this](absl::Status status) { OnWritable(std::move(status)); });
+ alarm_handle_ = engine_->RunAfter(timeout, [this]() {
+ OnTimeoutExpired(absl::DeadlineExceededError("connect() timed out"));
+ });
+ fd_->NotifyOnWrite(on_writable_);
+}
+
+AsyncConnect ::~AsyncConnect() { delete on_writable_; }
+
+void AsyncConnect::OnTimeoutExpired(absl::Status status) {
+ bool done = false;
+ {
+ grpc_core::MutexLock lock(&mu_);
+ if (fd_ != nullptr) {
+ fd_->ShutdownHandle(std::move(status));
+ }
+ done = (--refs_ == 0);
+ }
+ if (done) {
+ delete this;
+ }
+}
+
+void AsyncConnect::OnWritable(absl::Status status)
+ ABSL_NO_THREAD_SAFETY_ANALYSIS {
+ int so_error = 0;
+ socklen_t so_error_size;
+ int err;
+ int done;
+ int consumed_refs = 1;
+ EventHandle* fd;
+ absl::StatusOr> ep;
+
+ mu_.Lock();
+ GPR_ASSERT(fd_ != nullptr);
+ fd = std::exchange(fd_, nullptr);
+ bool connect_cancelled = connect_cancelled_;
+ if (fd->IsHandleShutdown() && status.ok()) {
+ if (!connect_cancelled) {
+ // status is OK and handle has been shutdown but the connect was not
+ // cancelled. This can happen if the timeout expired and the while the
+ // OnWritable just started executing.
+ status = absl::DeadlineExceededError("connect() timed out");
+ } else {
+ // This can happen if the connection was cancelled while the OnWritable
+ // just started executing.
+ status = absl::FailedPreconditionError("Connection cancelled");
+ }
+ }
+ mu_.Unlock();
+
+ if (engine_->Cancel(alarm_handle_)) {
+ ++consumed_refs;
+ }
+
+ auto on_writable_finish = absl::MakeCleanup([&]() -> void {
+ mu_.AssertHeld();
+ if (!connect_cancelled) {
+ reinterpret_cast(engine_.get())
+ ->OnConnectFinishInternal(connection_handle_);
+ }
+ if (fd != nullptr) {
+ fd->OrphanHandle(nullptr, nullptr, "tcp_client_orphan");
+ fd = nullptr;
+ }
+ if (!status.ok()) {
+ ep = absl::CancelledError(
+ absl::StrCat("Failed to connect to remote host: ", resolved_addr_str_,
+ " with error: ", status.ToString()));
+ }
+ // Run the OnConnect callback asynchronously.
+ if (!connect_cancelled) {
+ executor_->Run(
+ [ep = std::move(ep), on_connect = std::move(on_connect_)]() mutable {
+ if (on_connect) {
+ on_connect(std::move(ep));
+ }
+ });
+ }
+ done = ((refs_ -= consumed_refs) == 0);
+ mu_.Unlock();
+ if (done) {
+ delete this;
+ }
+ });
+
+ mu_.Lock();
+ if (!status.ok() || connect_cancelled) {
+ return;
+ }
+
+ do {
+ so_error_size = sizeof(so_error);
+ err = getsockopt(fd->WrappedFd(), SOL_SOCKET, SO_ERROR, &so_error,
+ &so_error_size);
+ } while (err < 0 && errno == EINTR);
+ if (err < 0) {
+ status = absl::FailedPreconditionError(
+ absl::StrCat("getsockopt: ", std::strerror(errno)));
+ return;
+ }
+
+ switch (so_error) {
+ case 0:
+ ep = CreatePosixEndpoint(fd, nullptr, engine_, std::move(allocator_),
+ options_);
+ fd = nullptr;
+ break;
+ case ENOBUFS:
+ // We will get one of these errors if we have run out of
+ // memory in the kernel for the data structures allocated
+ // when you connect a socket. If this happens it is very
+ // likely that if we wait a little bit then try again the
+ // connection will work (since other programs or this
+ // program will close their network connections and free up
+ // memory). This does _not_ indicate that there is anything
+ // wrong with the server we are connecting to, this is a
+ // local problem.
+
+ // If you are looking at this code, then chances are that
+ // your program or another program on the same computer
+ // opened too many network connections. The "easy" fix:
+ // don't do that!
+ gpr_log(GPR_ERROR, "kernel out of buffers");
+ mu_.Unlock();
+ fd->NotifyOnWrite(on_writable_);
+ // Don't run the cleanup function for this case.
+ std::move(on_writable_finish).Cancel();
+ return;
+ case ECONNREFUSED:
+ // This error shouldn't happen for anything other than connect().
+ status = absl::FailedPreconditionError(
+ absl::StrCat("connect: ", std::strerror(so_error)));
+ break;
+ default:
+ // We don't really know which syscall triggered the problem here, so
+ // punt by reporting getsockopt().
+ status = absl::FailedPreconditionError(
+ absl::StrCat("getsockopt(SO_ERROR): ", std::strerror(so_error)));
+ break;
+ }
+}
+
+EventEngine::ConnectionHandle PosixEventEngine::ConnectInternal(
+ PosixSocketWrapper sock, OnConnectCallback on_connect, ResolvedAddress addr,
+ MemoryAllocator&& allocator, const PosixTcpOptions& options,
+ Duration timeout) {
+ int err;
+ int saved_errno;
+ do {
+ err = connect(sock.Fd(), addr.address(), addr.size());
+ } while (err < 0 && errno == EINTR);
+ saved_errno = errno;
+
+ auto addr_uri = SockaddrToString(&addr, true);
+ if (!addr_uri.ok()) {
+ Run([on_connect = std::move(on_connect),
+ ep = absl::FailedPreconditionError(absl::StrCat(
+ "connect failed: ", "invalid addr: ",
+ addr_uri.value()))]() mutable { on_connect(std::move(ep)); });
+ return {0, 0};
+ }
+
+ std::string name = absl::StrCat("tcp-client:", addr_uri.value());
+ PosixEventPoller* poller = poller_manager_->Poller();
+ EventHandle* handle =
+ poller->CreateHandle(sock.Fd(), name, poller->CanTrackErrors());
+ int64_t connection_id = 0;
+ if (saved_errno == EWOULDBLOCK || saved_errno == EINPROGRESS) {
+ // Connection is still in progress.
+ connection_id = last_connection_id_.fetch_add(1, std::memory_order_acq_rel);
+ }
+
+ if (err >= 0) {
+ // Connection already succeded. Return 0 to discourage any cancellation
+ // attempts.
+ Run([on_connect = std::move(on_connect),
+ ep = CreatePosixEndpoint(handle, nullptr, shared_from_this(),
+ std::move(allocator), options)]() mutable {
+ on_connect(std::move(ep));
+ });
+ return {0, 0};
+ }
+ if (saved_errno != EWOULDBLOCK && saved_errno != EINPROGRESS) {
+ // Connection already failed. Return 0 to discourage any cancellation
+ // attempts.
+ handle->OrphanHandle(nullptr, nullptr, "tcp_client_connect_error");
+ Run([on_connect = std::move(on_connect),
+ ep = absl::FailedPreconditionError(
+ absl::StrCat("connect failed: ", "addr: ", addr_uri.value(),
+ " error: ", std::strerror(saved_errno)))]() mutable {
+ on_connect(std::move(ep));
+ });
+ return {0, 0};
+ }
+ AsyncConnect* ac = new AsyncConnect(
+ std::move(on_connect), shared_from_this(), executor_.get(), handle,
+ std::move(allocator), options, addr_uri.value(), connection_id);
+ int shard_number = connection_id % connection_shards_.size();
+ struct ConnectionShard* shard = &connection_shards_[shard_number];
+ {
+ grpc_core::MutexLock lock(&shard->mu);
+ shard->pending_connections.insert_or_assign(connection_id, ac);
+ }
+ // Start asynchronous connect and return the connection id.
+ ac->Start(timeout);
+ return {static_cast(connection_id), 0};
+}
+
+void PosixEventEngine::OnConnectFinishInternal(int connection_handle) {
+ int shard_number = connection_handle % connection_shards_.size();
+ struct ConnectionShard* shard = &connection_shards_[shard_number];
+ {
+ grpc_core::MutexLock lock(&shard->mu);
+ shard->pending_connections.erase(connection_handle);
+ }
+}
PosixEnginePollerManager::PosixEnginePollerManager(
std::shared_ptr executor)
@@ -94,12 +332,16 @@ PosixEnginePollerManager::~PosixEnginePollerManager() {
}
PosixEventEngine::PosixEventEngine(PosixEventPoller* poller)
- : executor_(std::make_shared()), timer_manager_(executor_) {
+ : connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
+ executor_(std::make_shared()),
+ timer_manager_(executor_) {
poller_manager_ = std::make_shared(poller);
}
PosixEventEngine::PosixEventEngine()
- : executor_(std::make_shared()), timer_manager_(executor_) {
+ : connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
+ executor_(std::make_shared()),
+ timer_manager_(executor_) {
if (grpc_core::IsPosixEventEngineEnablePollingEnabled()) {
poller_manager_ = std::make_shared(executor_);
if (poller_manager_->Poller() != nullptr) {
@@ -240,15 +482,87 @@ bool PosixEventEngine::IsWorkerThread() {
GPR_ASSERT(false && "unimplemented");
}
-bool PosixEventEngine::CancelConnect(EventEngine::ConnectionHandle /*handle*/) {
- GPR_ASSERT(false && "unimplemented");
+bool PosixEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) {
+#ifdef GRPC_POSIX_SOCKET_TCP
+ int connection_handle = handle.keys[0];
+ if (connection_handle <= 0) {
+ return false;
+ }
+ int shard_number = connection_handle % connection_shards_.size();
+ struct ConnectionShard* shard = &connection_shards_[shard_number];
+ AsyncConnect* ac = nullptr;
+ {
+ grpc_core::MutexLock lock(&shard->mu);
+ auto it = shard->pending_connections.find(connection_handle);
+ if (it != shard->pending_connections.end()) {
+ ac = it->second;
+ GPR_ASSERT(ac != nullptr);
+ // Trying to acquire ac->mu here would could cause a deadlock because
+ // the OnWritable method tries to acquire the two mutexes used
+ // here in the reverse order. But we dont need to acquire ac->mu before
+ // incrementing ac->refs here. This is because the OnWritable
+ // method decrements ac->refs only after deleting the connection handle
+ // from the corresponding hashmap. If the code enters here, it means
+ // that deletion hasn't happened yet. The deletion can only happen after
+ // the corresponding g_shard_mu is unlocked.
+ ++ac->refs_;
+ // Remove connection from list of active connections.
+ shard->pending_connections.erase(it);
+ }
+ }
+ if (ac == nullptr) {
+ return false;
+ }
+ ac->mu_.Lock();
+ bool connection_cancel_success = (ac->fd_ != nullptr);
+ if (connection_cancel_success) {
+ // Connection is still pending. The OnWritable callback hasn't executed
+ // yet because ac->fd != nullptr.
+ ac->connect_cancelled_ = true;
+ // Shutdown the fd. This would cause OnWritable to run as soon as
+ // possible. We dont need to pass a custom error here because it wont be
+ // used since the on_connect_closure is not run if connect cancellation is
+ // successfull.
+ ac->fd_->ShutdownHandle(
+ absl::FailedPreconditionError("Connection cancelled"));
+ }
+ bool done = (--ac->refs_ == 0);
+ ac->mu_.Unlock();
+ if (done) {
+ delete ac;
+ }
+ return connection_cancel_success;
+#else // GRPC_POSIX_SOCKET_TCP
+ GPR_ASSERT(false &&
+ "EventEngine::CancelConnect is not supported on this platform");
+#endif // GRPC_POSIX_SOCKET_TCP
}
EventEngine::ConnectionHandle PosixEventEngine::Connect(
- OnConnectCallback /*on_connect*/, const ResolvedAddress& /*addr*/,
- const EndpointConfig& /*args*/, MemoryAllocator /*memory_allocator*/,
- Duration /*timeout*/) {
- GPR_ASSERT(false && "unimplemented");
+ OnConnectCallback on_connect, const ResolvedAddress& addr,
+ const EndpointConfig& args, MemoryAllocator memory_allocator,
+ Duration timeout) {
+#ifdef GRPC_POSIX_SOCKET_TCP
+ if (!grpc_core::IsPosixEventEngineEnablePollingEnabled()) {
+ GPR_ASSERT(
+ false &&
+ "EventEngine::Connect is not supported because polling is not enabled");
+ }
+ GPR_ASSERT(poller_manager_ != nullptr);
+ PosixTcpOptions options = TcpOptionsFromEndpointConfig(args);
+ absl::StatusOr socket =
+ PosixSocketWrapper::CreateAndPrepareTcpClientSocket(options, addr);
+ if (!socket.ok()) {
+ Run([on_connect = std::move(on_connect),
+ status = socket.status()]() mutable { on_connect(status); });
+ return {0, 0};
+ }
+ return ConnectInternal((*socket).sock, std::move(on_connect),
+ (*socket).mapped_target_addr,
+ std::move(memory_allocator), options, timeout);
+#else // GRPC_POSIX_SOCKET_TCP
+ GPR_ASSERT(false && "EventEngine::Connect is not supported on this platform");
+#endif // GRPC_POSIX_SOCKET_TCP
}
absl::StatusOr>
diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.h b/src/core/lib/event_engine/posix_engine/posix_engine.h
index e7c333ebd5b..31b188a9fb9 100644
--- a/src/core/lib/event_engine/posix_engine/posix_engine.h
+++ b/src/core/lib/event_engine/posix_engine/posix_engine.h
@@ -15,13 +15,17 @@
#define GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_H
#include
-#include
-
#include
+#include
#include
+#include
+#include
+#include
#include "absl/base/thread_annotations.h"
+#include "absl/container/flat_hash_map.h"
#include "absl/functional/any_invocable.h"
+#include "absl/hash/hash.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
@@ -38,10 +42,58 @@
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/surface/init_internally.h"
+#ifdef GRPC_POSIX_SOCKET_TCP
+#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
+#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
+#endif // GRPC_POSIX_SOCKET_TCP
+
namespace grpc_event_engine {
namespace experimental {
#ifdef GRPC_POSIX_SOCKET_TCP
+// A helper class to handle asynchronous connect operations.
+class AsyncConnect {
+ public:
+ AsyncConnect(EventEngine::OnConnectCallback on_connect,
+ std::shared_ptr engine, ThreadPool* executor,
+ grpc_event_engine::posix_engine::EventHandle* fd,
+ MemoryAllocator&& allocator,
+ const grpc_event_engine::posix_engine::PosixTcpOptions& options,
+ std::string resolved_addr_str, int64_t connection_handle)
+ : on_connect_(std::move(on_connect)),
+ engine_(engine),
+ executor_(executor),
+ fd_(fd),
+ allocator_(std::move(allocator)),
+ options_(options),
+ resolved_addr_str_(resolved_addr_str),
+ connection_handle_(connection_handle),
+ connect_cancelled_(false) {}
+
+ void Start(EventEngine::Duration timeout);
+ ~AsyncConnect();
+
+ private:
+ friend class PosixEventEngine;
+ void OnTimeoutExpired(absl::Status status);
+
+ void OnWritable(absl::Status status) ABSL_NO_THREAD_SAFETY_ANALYSIS;
+
+ grpc_core::Mutex mu_;
+ grpc_event_engine::posix_engine::PosixEngineClosure* on_writable_ = nullptr;
+ EventEngine::OnConnectCallback on_connect_;
+ std::shared_ptr engine_;
+ ThreadPool* executor_;
+ EventEngine::TaskHandle alarm_handle_;
+ int refs_{2};
+ grpc_event_engine::posix_engine::EventHandle* fd_;
+ MemoryAllocator allocator_;
+ grpc_event_engine::posix_engine::PosixTcpOptions options_;
+ std::string resolved_addr_str_;
+ int64_t connection_handle_;
+ bool connect_cancelled_;
+};
+
// A helper class to manager lifetime of the poller associated with the
// posix event engine.
class PosixEnginePollerManager
@@ -153,8 +205,28 @@ class PosixEventEngine final : public EventEngine,
absl::AnyInvocable cb);
#ifdef GRPC_POSIX_SOCKET_TCP
+ friend class AsyncConnect;
+ struct ConnectionShard {
+ grpc_core::Mutex mu;
+ absl::flat_hash_map pending_connections
+ ABSL_GUARDED_BY(&mu);
+ };
+
static void PollerWorkInternal(
std::shared_ptr poller_manager);
+
+ ConnectionHandle ConnectInternal(
+ grpc_event_engine::posix_engine::PosixSocketWrapper sock,
+ OnConnectCallback on_connect, ResolvedAddress addr,
+ MemoryAllocator&& allocator,
+ const grpc_event_engine::posix_engine::PosixTcpOptions& options,
+ Duration timeout);
+
+ void OnConnectFinishInternal(int connection_handle);
+
+ std::vector connection_shards_;
+ std::atomic last_connection_id_{1};
+
#endif // GRPC_POSIX_SOCKET_TCP
grpc_core::Mutex mu_;
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 7294e274f6b..f3d05fc9a1d 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -456,10 +456,13 @@ CORE_SOURCE_FILES = [
'src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc',
'src/core/lib/event_engine/posix_engine/internal_errqueue.cc',
'src/core/lib/event_engine/posix_engine/lockfree_event.cc',
+ 'src/core/lib/event_engine/posix_engine/posix_endpoint.cc',
'src/core/lib/event_engine/posix_engine/posix_engine.cc',
+ 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/posix_engine/timer.cc',
'src/core/lib/event_engine/posix_engine/timer_heap.cc',
'src/core/lib/event_engine/posix_engine/timer_manager.cc',
+ 'src/core/lib/event_engine/posix_engine/traced_buffer_list.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc',
@@ -509,6 +512,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/gprpp/fork.cc',
'src/core/lib/gprpp/global_config_env.cc',
'src/core/lib/gprpp/host_port.cc',
+ 'src/core/lib/gprpp/load_file.cc',
'src/core/lib/gprpp/mpscq.cc',
'src/core/lib/gprpp/stat_posix.cc',
'src/core/lib/gprpp/stat_windows.cc',
diff --git a/test/core/event_engine/posix/BUILD b/test/core/event_engine/posix/BUILD
index 5a6f63a3d67..c3282c49666 100644
--- a/test/core/event_engine/posix/BUILD
+++ b/test/core/event_engine/posix/BUILD
@@ -104,6 +104,9 @@ grpc_cc_test(
srcs = ["lock_free_event_test.cc"],
external_deps = ["gtest"],
language = "C++",
+ tags = [
+ "no_windows",
+ ],
uses_event_engine = True,
uses_polling = False,
deps = [
@@ -209,3 +212,26 @@ grpc_cc_test(
"//test/core/util:grpc_test_util",
],
)
+
+grpc_cc_test(
+ name = "posix_event_engine_connect_test",
+ srcs = ["posix_event_engine_connect_test.cc"],
+ external_deps = ["gtest"],
+ language = "C++",
+ tags = [
+ "no_windows",
+ ],
+ uses_event_engine = True,
+ uses_polling = True,
+ deps = [
+ "//src/core:event_engine_poller",
+ "//src/core:posix_event_engine",
+ "//src/core:posix_event_engine_closure",
+ "//src/core:posix_event_engine_endpoint",
+ "//src/core:posix_event_engine_event_poller",
+ "//src/core:posix_event_engine_poller_posix_default",
+ "//test/core/event_engine/test_suite:conformance_test_base_lib",
+ "//test/core/event_engine/test_suite:oracle_event_engine_posix",
+ "//test/core/util:grpc_test_util",
+ ],
+)
diff --git a/test/core/event_engine/posix/posix_event_engine_connect_test.cc b/test/core/event_engine/posix/posix_event_engine_connect_test.cc
new file mode 100644
index 00000000000..63fcd6e1caa
--- /dev/null
+++ b/test/core/event_engine/posix/posix_event_engine_connect_test.cc
@@ -0,0 +1,222 @@
+// Copyright 2022 gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "absl/memory/memory.h"
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/str_cat.h"
+#include "gtest/gtest.h"
+
+#include
+#include
+#include
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
+#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
+#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
+#include "src/core/lib/experiments/experiments.h"
+#include "src/core/lib/gprpp/notification.h"
+#include "src/core/lib/resource_quota/memory_quota.h"
+#include "src/core/lib/resource_quota/resource_quota.h"
+#include "test/core/event_engine/test_suite/event_engine_test_utils.h"
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+
+namespace grpc_event_engine {
+namespace posix_engine {
+
+using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
+using ::grpc_event_engine::experimental::EventEngine;
+using ::grpc_event_engine::experimental::PosixEventEngine;
+using ::grpc_event_engine::experimental::URIToResolvedAddress;
+using ::grpc_event_engine::experimental::WaitForSingleOwner;
+using namespace std::chrono_literals;
+
+namespace {
+
+// Creates a server socket listening for one connection on a specific port. It
+// then creates another client socket connected to the server socket. This fills
+// up the kernel listen queue on the server socket. Any subsequent attempts to
+// connect to the server socket will be pending indefinitely. This can be used
+// to test Connection timeouts and cancellation attempts.
+std::vector CreateConnectedSockets(
+ EventEngine::ResolvedAddress resolved_addr) {
+ int server_socket;
+ int opt = -1;
+ int client_socket;
+ int one = 1;
+ int flags;
+ std::vector ret_sockets;
+ // Creating a new socket file descriptor.
+ if ((server_socket = socket(AF_INET6, SOCK_STREAM, 0)) <= 0) {
+ gpr_log(GPR_ERROR, "Error creating socket: %s", std::strerror(errno));
+ abort();
+ }
+ // MacOS builds fail if SO_REUSEADDR and SO_REUSEPORT are set in the same
+ // setsockopt syscall. So they are set separately one after the other.
+ if (setsockopt(server_socket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
+ gpr_log(GPR_ERROR, "Error setsockopt(SO_REUSEADDR): %s",
+ std::strerror(errno));
+ abort();
+ }
+ if (setsockopt(server_socket, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))) {
+ gpr_log(GPR_ERROR, "Error setsockopt(SO_REUSEPORT): %s",
+ std::strerror(errno));
+ abort();
+ }
+
+ // Bind the new socket to server address.
+ if (bind(server_socket, resolved_addr.address(), resolved_addr.size()) < 0) {
+ gpr_log(GPR_ERROR, "Error bind: %s", std::strerror(errno));
+ abort();
+ }
+ // Set the new socket to listen for one active connection at a time.
+ // accept() is intentionally not called on the socket. This allows the
+ // connection queue to build up.
+ if (listen(server_socket, 1) < 0) {
+ gpr_log(GPR_ERROR, "Error listen: %s", std::strerror(errno));
+ abort();
+ }
+ ret_sockets.push_back(server_socket);
+ // Create and connect client sockets until the connection attempt times out.
+ // Even if the backlog specified to listen is 1, the kernel continues to
+ // accept a certain number of SYN packets before dropping them. This loop
+ // attempts to identify the number of new connection attempts that will
+ // be allowed by the kernel before any subsequent connection attempts
+ // become pending indefinitely.
+ while (true) {
+ client_socket = socket(AF_INET6, SOCK_STREAM, 0);
+ setsockopt(client_socket, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+ // Make fd non-blocking.
+ flags = fcntl(client_socket, F_GETFL, 0);
+ EXPECT_EQ(fcntl(client_socket, F_SETFL, flags | O_NONBLOCK), 0);
+
+ if (connect(client_socket,
+ const_cast(resolved_addr.address()),
+ resolved_addr.size()) == -1) {
+ if (errno == EINPROGRESS) {
+ struct pollfd pfd;
+ pfd.fd = client_socket;
+ pfd.events = POLLOUT;
+ pfd.revents = 0;
+ int ret = poll(&pfd, 1, 1000);
+ if (ret == -1) {
+ gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
+ abort();
+ } else if (ret == 0) {
+ // current connection attempt timed out. It indicates that the
+ // kernel will cause any subsequent connection attempts to
+ // become pending indefinitely.
+ ret_sockets.push_back(client_socket);
+ return ret_sockets;
+ }
+ } else {
+ gpr_log(GPR_ERROR, "Failed to connect to the server (errno=%d)", errno);
+ abort();
+ }
+ }
+ ret_sockets.push_back(client_socket);
+ }
+ return ret_sockets;
+}
+
+} // namespace
+
+TEST(PosixEventEngineTest, IndefiniteConnectTimeoutOrRstTest) {
+ std::string target_addr = absl::StrCat(
+ "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
+ auto resolved_addr = URIToResolvedAddress(target_addr);
+ std::shared_ptr posix_ee = std::make_shared();
+ std::string resolved_addr_str =
+ SockaddrToString(&resolved_addr, true).value();
+ auto sockets = CreateConnectedSockets(resolved_addr);
+ grpc_core::Notification signal;
+ grpc_core::ChannelArgs args;
+ auto quota = grpc_core::ResourceQuota::Default();
+ args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
+ ChannelArgsEndpointConfig config(args);
+ auto memory_quota = absl::make_unique("bar");
+ posix_ee->Connect(
+ [&signal](absl::StatusOr> status) {
+ EXPECT_EQ(status.status().code(), absl::StatusCode::kCancelled);
+ signal.Notify();
+ },
+ URIToResolvedAddress(target_addr), config,
+ memory_quota->CreateMemoryAllocator("conn-1"), 3s);
+ signal.WaitForNotification();
+ for (auto sock : sockets) {
+ close(sock);
+ }
+ WaitForSingleOwner(std::move(posix_ee));
+}
+
+TEST(PosixEventEngineTest, IndefiniteConnectCancellationTest) {
+ std::string target_addr = absl::StrCat(
+ "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
+ auto resolved_addr = URIToResolvedAddress(target_addr);
+ std::shared_ptr posix_ee = std::make_shared();
+ std::string resolved_addr_str =
+ SockaddrToString(&resolved_addr, true).value();
+ auto sockets = CreateConnectedSockets(resolved_addr);
+ grpc_core::ChannelArgs args;
+ auto quota = grpc_core::ResourceQuota::Default();
+ args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
+ ChannelArgsEndpointConfig config(args);
+ auto memory_quota = absl::make_unique("bar");
+ auto connection_handle = posix_ee->Connect(
+ [](absl::StatusOr> /*status*/) {
+ FAIL() << "The on_connect callback should not have run since the "
+ "connection attempt was cancelled.";
+ },
+ URIToResolvedAddress(target_addr), config,
+ memory_quota->CreateMemoryAllocator("conn-2"), 3s);
+ if (connection_handle.keys[0] > 0) {
+ ASSERT_TRUE(posix_ee->CancelConnect(connection_handle));
+ }
+ for (auto sock : sockets) {
+ close(sock);
+ }
+ WaitForSingleOwner(std::move(posix_ee));
+}
+
+} // namespace posix_engine
+} // namespace grpc_event_engine
+
+int main(int argc, char** argv) {
+ grpc::testing::TestEnvironment env(&argc, argv);
+ ::testing::InitGoogleTest(&argc, argv);
+ if (!grpc_core::IsPosixEventEngineEnablePollingEnabled()) {
+ return 0;
+ }
+ grpc_init();
+ int ret = RUN_ALL_TESTS();
+ grpc_shutdown();
+ return ret;
+}
diff --git a/test/core/event_engine/test_suite/BUILD b/test/core/event_engine/test_suite/BUILD
index 51b1ccaadbc..3aad5e41f21 100644
--- a/test/core/event_engine/test_suite/BUILD
+++ b/test/core/event_engine/test_suite/BUILD
@@ -80,8 +80,11 @@ grpc_cc_test(
tags = [
"no_windows",
],
- uses_polling = False,
+ uses_event_engine = True,
+ uses_polling = True,
deps = [
+ ":client",
+ ":oracle_event_engine_posix",
"//src/core:posix_event_engine",
"//test/core/event_engine/test_suite:timer",
],
diff --git a/test/core/event_engine/test_suite/event_engine_test.cc b/test/core/event_engine/test_suite/event_engine_test.cc
index a3e115d08ef..c852067b79e 100644
--- a/test/core/event_engine/test_suite/event_engine_test.cc
+++ b/test/core/event_engine/test_suite/event_engine_test.cc
@@ -17,19 +17,21 @@
#include
-std::function()>*
+absl::AnyInvocable<
+ std::unique_ptr()>*
g_ee_factory = nullptr;
-std::function()>*
+absl::AnyInvocable<
+ std::unique_ptr()>*
g_oracle_ee_factory = nullptr;
void SetEventEngineFactories(
- std::function<
+ absl::AnyInvocable<
std::unique_ptr()>
factory,
- std::function<
+ absl::AnyInvocable<
std::unique_ptr()>
oracle_ee_factory) {
- testing::AddGlobalTestEnvironment(
- new EventEngineTestEnvironment(factory, oracle_ee_factory));
+ testing::AddGlobalTestEnvironment(new EventEngineTestEnvironment(
+ std::move(factory), std::move(oracle_ee_factory)));
}
diff --git a/test/core/event_engine/test_suite/event_engine_test.h b/test/core/event_engine/test_suite/event_engine_test.h
index 1e74792e852..60b00982b36 100644
--- a/test/core/event_engine/test_suite/event_engine_test.h
+++ b/test/core/event_engine/test_suite/event_engine_test.h
@@ -14,19 +14,21 @@
#ifndef GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_EVENT_ENGINE_TEST_H
#define GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_EVENT_ENGINE_TEST_H
-#include
#include
+#include
#include
+#include "absl/functional/any_invocable.h"
+
#include
#include
-extern std::function<
+extern absl::AnyInvocable<
std::unique_ptr()>*
g_ee_factory;
-extern std::function<
+extern absl::AnyInvocable<
std::unique_ptr()>*
g_oracle_ee_factory;
@@ -34,13 +36,14 @@ extern std::function<
class EventEngineTestEnvironment : public testing::Environment {
public:
EventEngineTestEnvironment(
- std::function<
+ absl::AnyInvocable<
std::unique_ptr()>
factory,
- std::function<
+ absl::AnyInvocable<
std::unique_ptr()>
oracle_factory)
- : factory_(factory), oracle_factory_(oracle_factory) {}
+ : factory_(std::move(factory)),
+ oracle_factory_(std::move(oracle_factory)) {}
void SetUp() override {
g_ee_factory = &factory_;
@@ -53,9 +56,11 @@ class EventEngineTestEnvironment : public testing::Environment {
}
private:
- std::function()>
+ absl::AnyInvocable<
+ std::unique_ptr()>
factory_;
- std::function()>
+ absl::AnyInvocable<
+ std::unique_ptr()>
oracle_factory_;
};
@@ -77,10 +82,10 @@ class EventEngineTest : public testing::Test {
// Set a custom factory for the EventEngine test suite. An optional oracle
// EventEngine can additionally be specified here.
void SetEventEngineFactories(
- std::function<
+ absl::AnyInvocable<
std::unique_ptr()>
ee_factory,
- std::function<
+ absl::AnyInvocable<
std::unique_ptr()>
oracle_ee_factory);
diff --git a/test/core/event_engine/test_suite/posix_event_engine_test.cc b/test/core/event_engine/test_suite/posix_event_engine_test.cc
index e31e5126b1b..e08e2e3c49e 100644
--- a/test/core/event_engine/test_suite/posix_event_engine_test.cc
+++ b/test/core/event_engine/test_suite/posix_event_engine_test.cc
@@ -19,18 +19,26 @@
#include
#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
+#include "src/core/lib/experiments/experiments.h"
#include "test/core/event_engine/test_suite/event_engine_test.h"
+#include "test/core/event_engine/test_suite/oracle_event_engine_posix.h"
#include "test/core/util/test_config.h"
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
+ if (!grpc_core::IsPosixEventEngineEnablePollingEnabled()) {
+ return 0;
+ }
SetEventEngineFactories(
[]() {
return std::make_unique<
grpc_event_engine::experimental::PosixEventEngine>();
},
- nullptr);
+ []() {
+ return std::make_unique<
+ grpc_event_engine::experimental::PosixOracleEventEngine>();
+ });
// TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
// until we clear out the iomgr shutdown code.
grpc_init();
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index f82125acb57..62b27efa703 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -1973,15 +1973,21 @@ src/core/lib/event_engine/posix_engine/internal_errqueue.cc \
src/core/lib/event_engine/posix_engine/internal_errqueue.h \
src/core/lib/event_engine/posix_engine/lockfree_event.cc \
src/core/lib/event_engine/posix_engine/lockfree_event.h \
+src/core/lib/event_engine/posix_engine/posix_endpoint.cc \
+src/core/lib/event_engine/posix_engine/posix_endpoint.h \
src/core/lib/event_engine/posix_engine/posix_engine.cc \
src/core/lib/event_engine/posix_engine/posix_engine.h \
src/core/lib/event_engine/posix_engine/posix_engine_closure.h \
+src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc \
+src/core/lib/event_engine/posix_engine/tcp_socket_utils.h \
src/core/lib/event_engine/posix_engine/timer.cc \
src/core/lib/event_engine/posix_engine/timer.h \
src/core/lib/event_engine/posix_engine/timer_heap.cc \
src/core/lib/event_engine/posix_engine/timer_heap.h \
src/core/lib/event_engine/posix_engine/timer_manager.cc \
src/core/lib/event_engine/posix_engine/timer_manager.h \
+src/core/lib/event_engine/posix_engine/traced_buffer_list.cc \
+src/core/lib/event_engine/posix_engine/traced_buffer_list.h \
src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.h \
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc \
@@ -2066,6 +2072,8 @@ src/core/lib/gprpp/global_config_env.h \
src/core/lib/gprpp/global_config_generic.h \
src/core/lib/gprpp/host_port.cc \
src/core/lib/gprpp/host_port.h \
+src/core/lib/gprpp/load_file.cc \
+src/core/lib/gprpp/load_file.h \
src/core/lib/gprpp/manual_constructor.h \
src/core/lib/gprpp/match.h \
src/core/lib/gprpp/memory.h \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 8aeac6f6303..8a86e03d8cc 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -1758,15 +1758,21 @@ src/core/lib/event_engine/posix_engine/internal_errqueue.cc \
src/core/lib/event_engine/posix_engine/internal_errqueue.h \
src/core/lib/event_engine/posix_engine/lockfree_event.cc \
src/core/lib/event_engine/posix_engine/lockfree_event.h \
+src/core/lib/event_engine/posix_engine/posix_endpoint.cc \
+src/core/lib/event_engine/posix_engine/posix_endpoint.h \
src/core/lib/event_engine/posix_engine/posix_engine.cc \
src/core/lib/event_engine/posix_engine/posix_engine.h \
src/core/lib/event_engine/posix_engine/posix_engine_closure.h \
+src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc \
+src/core/lib/event_engine/posix_engine/tcp_socket_utils.h \
src/core/lib/event_engine/posix_engine/timer.cc \
src/core/lib/event_engine/posix_engine/timer.h \
src/core/lib/event_engine/posix_engine/timer_heap.cc \
src/core/lib/event_engine/posix_engine/timer_heap.h \
src/core/lib/event_engine/posix_engine/timer_manager.cc \
src/core/lib/event_engine/posix_engine/timer_manager.h \
+src/core/lib/event_engine/posix_engine/traced_buffer_list.cc \
+src/core/lib/event_engine/posix_engine/traced_buffer_list.h \
src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_eventfd.h \
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc \
@@ -1853,6 +1859,8 @@ src/core/lib/gprpp/global_config_env.h \
src/core/lib/gprpp/global_config_generic.h \
src/core/lib/gprpp/host_port.cc \
src/core/lib/gprpp/host_port.h \
+src/core/lib/gprpp/load_file.cc \
+src/core/lib/gprpp/load_file.h \
src/core/lib/gprpp/manual_constructor.h \
src/core/lib/gprpp/match.h \
src/core/lib/gprpp/memory.h \
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index ca0a9024f2c..7477b4411bb 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -4415,8 +4415,7 @@
"ci_platforms": [
"linux",
"mac",
- "posix",
- "windows"
+ "posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
@@ -4428,8 +4427,7 @@
"platforms": [
"linux",
"mac",
- "posix",
- "windows"
+ "posix"
],
"uses_polling": false
},
@@ -5355,6 +5353,28 @@
],
"uses_polling": true
},
+ {
+ "args": [],
+ "benchmark": false,
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": true,
+ "language": "c++",
+ "name": "posix_event_engine_connect_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix"
+ ],
+ "uses_polling": true
+ },
{
"args": [],
"benchmark": false,
@@ -5375,7 +5395,7 @@
"mac",
"posix"
],
- "uses_polling": false
+ "uses_polling": true
},
{
"args": [],