diff --git a/BUILD b/BUILD
index ab6cdc11d98..22966d7268a 100644
--- a/BUILD
+++ b/BUILD
@@ -86,11 +86,26 @@ config_setting(
values = {"crosstool_top": "//external:android/crosstool"},
)
+config_setting(
+ name = "macos",
+ values = {"apple_platform_type": "macos"},
+)
+
config_setting(
name = "ios",
values = {"apple_platform_type": "ios"},
)
+config_setting(
+ name = "tvos",
+ values = {"apple_platform_type": "tvos"},
+)
+
+config_setting(
+ name = "watchos",
+ values = {"apple_platform_type": "watchos"},
+)
+
config_setting(
name = "systemd",
values = {"define": "use_systemd=true"},
@@ -174,10 +189,15 @@ config_setting(
)
config_setting(
- name = "mac_x86_64",
+ name = "mac",
values = {"cpu": "darwin"},
)
+config_setting(
+ name = "mac_x86_64",
+ values = {"cpu": "darwin_x86_64"},
+)
+
config_setting(
name = "mac_arm64",
values = {"cpu": "darwin_arm64"},
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 7546d63d75d..b0f940c7d65 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -863,6 +863,9 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx cel_authorization_engine_test)
add_dependencies(buildtests_cxx certificate_provider_registry_test)
add_dependencies(buildtests_cxx certificate_provider_store_test)
+ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
+ add_dependencies(buildtests_cxx cf_engine_test)
+ endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx cf_event_engine_test)
endif()
@@ -1107,10 +1110,10 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx posix_engine_listener_utils_test)
endif()
- if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
+ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx posix_event_engine_connect_test)
endif()
- if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
+ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx posix_event_engine_test)
endif()
add_dependencies(buildtests_cxx prioritized_race_test)
@@ -2047,6 +2050,8 @@ add_library(grpc
src/core/lib/debug/stats.cc
src/core/lib/debug/stats_data.cc
src/core/lib/debug/trace.cc
+ src/core/lib/event_engine/cf_engine/cf_engine.cc
+ src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc
src/core/lib/event_engine/channel_args_endpoint_config.cc
src/core/lib/event_engine/default_event_engine.cc
src/core/lib/event_engine/default_event_engine_factory.cc
@@ -2745,6 +2750,8 @@ add_library(grpc_unsecure
src/core/lib/debug/stats.cc
src/core/lib/debug/stats_data.cc
src/core/lib/debug/trace.cc
+ src/core/lib/event_engine/cf_engine/cf_engine.cc
+ src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc
src/core/lib/event_engine/channel_args_endpoint_config.cc
src/core/lib/event_engine/default_event_engine.cc
src/core/lib/event_engine/default_event_engine_factory.cc
@@ -4275,6 +4282,8 @@ add_library(grpc_authorization_provider
src/core/lib/debug/stats.cc
src/core/lib/debug/stats_data.cc
src/core/lib/debug/trace.cc
+ src/core/lib/event_engine/cf_engine/cf_engine.cc
+ src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc
src/core/lib/event_engine/channel_args_endpoint_config.cc
src/core/lib/event_engine/default_event_engine.cc
src/core/lib/event_engine/default_event_engine_factory.cc
@@ -7470,15 +7479,55 @@ target_link_libraries(certificate_provider_store_test
)
+endif()
+if(gRPC_BUILD_TESTS)
+if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
+
+ add_executable(cf_engine_test
+ test/core/event_engine/cf/cf_engine_test.cc
+ third_party/googletest/googletest/src/gtest-all.cc
+ third_party/googletest/googlemock/src/gmock-all.cc
+ )
+ target_compile_features(cf_engine_test PUBLIC cxx_std_14)
+ target_include_directories(cf_engine_test
+ PRIVATE
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_SOURCE_DIR}/include
+ ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+ ${_gRPC_RE2_INCLUDE_DIR}
+ ${_gRPC_SSL_INCLUDE_DIR}
+ ${_gRPC_UPB_GENERATED_DIR}
+ ${_gRPC_UPB_GRPC_GENERATED_DIR}
+ ${_gRPC_UPB_INCLUDE_DIR}
+ ${_gRPC_XXHASH_INCLUDE_DIR}
+ ${_gRPC_ZLIB_INCLUDE_DIR}
+ third_party/googletest/googletest/include
+ third_party/googletest/googletest
+ third_party/googletest/googlemock/include
+ third_party/googletest/googlemock
+ ${_gRPC_PROTO_GENS_DIR}
+ )
+
+ target_link_libraries(cf_engine_test
+ ${_gRPC_BASELIB_LIBRARIES}
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ ${_gRPC_ZLIB_LIBRARIES}
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ grpc_test_util
+ )
+
+
+endif()
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(cf_event_engine_test
- src/core/lib/event_engine/cf_engine/cf_engine.cc
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/test_suite/cf_event_engine_test.cc
test/core/event_engine/test_suite/event_engine_test_framework.cc
+ test/core/event_engine/test_suite/posix/oracle_event_engine_posix.cc
+ test/core/event_engine/test_suite/tests/client_test.cc
test/core/event_engine/test_suite/tests/timer_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@@ -11464,6 +11513,8 @@ add_executable(frame_test
src/core/lib/debug/stats.cc
src/core/lib/debug/stats_data.cc
src/core/lib/debug/trace.cc
+ src/core/lib/event_engine/cf_engine/cf_engine.cc
+ src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc
src/core/lib/event_engine/channel_args_endpoint_config.cc
src/core/lib/event_engine/default_event_engine.cc
src/core/lib/event_engine/default_event_engine_factory.cc
@@ -16666,7 +16717,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
endif()
endif()
if(gRPC_BUILD_TESTS)
-if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
+if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_executable(posix_event_engine_connect_test
test/core/event_engine/event_engine_test_utils.cc
@@ -16709,7 +16760,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
endif()
endif()
if(gRPC_BUILD_TESTS)
-if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
+if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_executable(posix_event_engine_test
test/core/event_engine/event_engine_test_utils.cc
diff --git a/Makefile b/Makefile
index 6a3791ae0ce..1dde9ff0511 100644
--- a/Makefile
+++ b/Makefile
@@ -1432,6 +1432,8 @@ LIBGRPC_SRC = \
src/core/lib/debug/stats.cc \
src/core/lib/debug/stats_data.cc \
src/core/lib/debug/trace.cc \
+ src/core/lib/event_engine/cf_engine/cf_engine.cc \
+ src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc \
src/core/lib/event_engine/channel_args_endpoint_config.cc \
src/core/lib/event_engine/default_event_engine.cc \
src/core/lib/event_engine/default_event_engine_factory.cc \
@@ -1984,6 +1986,8 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/debug/stats.cc \
src/core/lib/debug/stats_data.cc \
src/core/lib/debug/trace.cc \
+ src/core/lib/event_engine/cf_engine/cf_engine.cc \
+ src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc \
src/core/lib/event_engine/channel_args_endpoint_config.cc \
src/core/lib/event_engine/default_event_engine.cc \
src/core/lib/event_engine/default_event_engine_factory.cc \
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 5ed94d2f898..f723640d935 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -676,6 +676,9 @@ libs:
- src/core/lib/debug/stats.h
- src/core/lib/debug/stats_data.h
- src/core/lib/debug/trace.h
+ - src/core/lib/event_engine/cf_engine/cf_engine.h
+ - src/core/lib/event_engine/cf_engine/cfstream_endpoint.h
+ - src/core/lib/event_engine/cf_engine/cftype_unique_ref.h
- src/core/lib/event_engine/channel_args_endpoint_config.h
- src/core/lib/event_engine/common_closures.h
- src/core/lib/event_engine/default_event_engine.h
@@ -1476,6 +1479,8 @@ libs:
- src/core/lib/debug/stats.cc
- src/core/lib/debug/stats_data.cc
- src/core/lib/debug/trace.cc
+ - src/core/lib/event_engine/cf_engine/cf_engine.cc
+ - src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc
- src/core/lib/event_engine/channel_args_endpoint_config.cc
- src/core/lib/event_engine/default_event_engine.cc
- src/core/lib/event_engine/default_event_engine_factory.cc
@@ -2046,6 +2051,9 @@ libs:
- src/core/lib/debug/stats.h
- src/core/lib/debug/stats_data.h
- src/core/lib/debug/trace.h
+ - src/core/lib/event_engine/cf_engine/cf_engine.h
+ - src/core/lib/event_engine/cf_engine/cfstream_endpoint.h
+ - src/core/lib/event_engine/cf_engine/cftype_unique_ref.h
- src/core/lib/event_engine/channel_args_endpoint_config.h
- src/core/lib/event_engine/common_closures.h
- src/core/lib/event_engine/default_event_engine.h
@@ -2458,6 +2466,8 @@ libs:
- src/core/lib/debug/stats.cc
- src/core/lib/debug/stats_data.cc
- src/core/lib/debug/trace.cc
+ - src/core/lib/event_engine/cf_engine/cf_engine.cc
+ - src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc
- src/core/lib/event_engine/channel_args_endpoint_config.cc
- src/core/lib/event_engine/default_event_engine.cc
- src/core/lib/event_engine/default_event_engine_factory.cc
@@ -3536,6 +3546,9 @@ libs:
- src/core/lib/debug/stats.h
- src/core/lib/debug/stats_data.h
- src/core/lib/debug/trace.h
+ - src/core/lib/event_engine/cf_engine/cf_engine.h
+ - src/core/lib/event_engine/cf_engine/cfstream_endpoint.h
+ - src/core/lib/event_engine/cf_engine/cftype_unique_ref.h
- src/core/lib/event_engine/channel_args_endpoint_config.h
- src/core/lib/event_engine/common_closures.h
- src/core/lib/event_engine/default_event_engine.h
@@ -3827,6 +3840,8 @@ libs:
- src/core/lib/debug/stats.cc
- src/core/lib/debug/stats_data.cc
- src/core/lib/debug/trace.cc
+ - src/core/lib/event_engine/cf_engine/cf_engine.cc
+ - src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc
- src/core/lib/event_engine/channel_args_endpoint_config.cc
- src/core/lib/event_engine/default_event_engine.cc
- src/core/lib/event_engine/default_event_engine_factory.cc
@@ -5440,20 +5455,35 @@ targets:
- test/core/xds/certificate_provider_store_test.cc
deps:
- grpc_test_util
+- name: cf_engine_test
+ gtest: true
+ build: test
+ language: c++
+ headers: []
+ src:
+ - test/core/event_engine/cf/cf_engine_test.cc
+ deps:
+ - grpc_test_util
+ platforms:
+ - linux
+ - posix
+ - mac
- name: cf_event_engine_test
gtest: true
build: test
language: c++
headers:
- - src/core/lib/event_engine/cf_engine/cf_engine.h
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/test_suite/event_engine_test_framework.h
+ - test/core/event_engine/test_suite/posix/oracle_event_engine_posix.h
+ - test/core/event_engine/test_suite/tests/client_test.h
- test/core/event_engine/test_suite/tests/timer_test.h
src:
- - src/core/lib/event_engine/cf_engine/cf_engine.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/test_suite/cf_event_engine_test.cc
- test/core/event_engine/test_suite/event_engine_test_framework.cc
+ - test/core/event_engine/test_suite/posix/oracle_event_engine_posix.cc
+ - test/core/event_engine/test_suite/tests/client_test.cc
- test/core/event_engine/test_suite/tests/timer_test.cc
deps:
- grpc_unsecure
@@ -5462,7 +5492,6 @@ targets:
- linux
- posix
- mac
- uses_polling: false
- name: cfstream_test
gtest: true
build: test
@@ -7450,6 +7479,9 @@ targets:
- src/core/lib/debug/stats.h
- src/core/lib/debug/stats_data.h
- src/core/lib/debug/trace.h
+ - src/core/lib/event_engine/cf_engine/cf_engine.h
+ - src/core/lib/event_engine/cf_engine/cfstream_endpoint.h
+ - src/core/lib/event_engine/cf_engine/cftype_unique_ref.h
- src/core/lib/event_engine/channel_args_endpoint_config.h
- src/core/lib/event_engine/common_closures.h
- src/core/lib/event_engine/default_event_engine.h
@@ -7723,6 +7755,8 @@ targets:
- src/core/lib/debug/stats.cc
- src/core/lib/debug/stats_data.cc
- src/core/lib/debug/trace.cc
+ - src/core/lib/event_engine/cf_engine/cf_engine.cc
+ - src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc
- src/core/lib/event_engine/channel_args_endpoint_config.cc
- src/core/lib/event_engine/default_event_engine.cc
- src/core/lib/event_engine/default_event_engine_factory.cc
@@ -10128,7 +10162,6 @@ targets:
platforms:
- linux
- posix
- - mac
- name: posix_event_engine_test
gtest: true
build: test
@@ -10154,7 +10187,6 @@ targets:
platforms:
- linux
- posix
- - mac
- name: prioritized_race_test
gtest: true
build: test
diff --git a/config.m4 b/config.m4
index 93493c33f11..058372d3705 100644
--- a/config.m4
+++ b/config.m4
@@ -514,6 +514,8 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/debug/stats.cc \
src/core/lib/debug/stats_data.cc \
src/core/lib/debug/trace.cc \
+ src/core/lib/event_engine/cf_engine/cf_engine.cc \
+ src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc \
src/core/lib/event_engine/channel_args_endpoint_config.cc \
src/core/lib/event_engine/default_event_engine.cc \
src/core/lib/event_engine/default_event_engine_factory.cc \
@@ -1458,6 +1460,7 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/config)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/debug)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine)
+ PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine/cf_engine)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine/posix_engine)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine/thread_pool)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine/windows)
diff --git a/config.w32 b/config.w32
index 2c38b5d04b7..e921e465fc0 100644
--- a/config.w32
+++ b/config.w32
@@ -479,6 +479,8 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\debug\\stats.cc " +
"src\\core\\lib\\debug\\stats_data.cc " +
"src\\core\\lib\\debug\\trace.cc " +
+ "src\\core\\lib\\event_engine\\cf_engine\\cf_engine.cc " +
+ "src\\core\\lib\\event_engine\\cf_engine\\cfstream_endpoint.cc " +
"src\\core\\lib\\event_engine\\channel_args_endpoint_config.cc " +
"src\\core\\lib\\event_engine\\default_event_engine.cc " +
"src\\core\\lib\\event_engine\\default_event_engine_factory.cc " +
@@ -1590,6 +1592,7 @@ if (PHP_GRPC != "no") {
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\config");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\debug");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine");
+ FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine\\cf_engine");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine\\posix_engine");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine\\thread_pool");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine\\windows");
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 504de1898b8..516786acfa4 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -746,6 +746,9 @@ Pod::Spec.new do |s|
'src/core/lib/debug/stats.h',
'src/core/lib/debug/stats_data.h',
'src/core/lib/debug/trace.h',
+ 'src/core/lib/event_engine/cf_engine/cf_engine.h',
+ 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.h',
+ 'src/core/lib/event_engine/cf_engine/cftype_unique_ref.h',
'src/core/lib/event_engine/channel_args_endpoint_config.h',
'src/core/lib/event_engine/common_closures.h',
'src/core/lib/event_engine/default_event_engine.h',
@@ -1779,6 +1782,9 @@ Pod::Spec.new do |s|
'src/core/lib/debug/stats.h',
'src/core/lib/debug/stats_data.h',
'src/core/lib/debug/trace.h',
+ 'src/core/lib/event_engine/cf_engine/cf_engine.h',
+ 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.h',
+ 'src/core/lib/event_engine/cf_engine/cftype_unique_ref.h',
'src/core/lib/event_engine/channel_args_endpoint_config.h',
'src/core/lib/event_engine/common_closures.h',
'src/core/lib/event_engine/default_event_engine.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 3cb76b1d17c..22e82789fbc 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -1150,6 +1150,11 @@ Pod::Spec.new do |s|
'src/core/lib/debug/stats_data.h',
'src/core/lib/debug/trace.cc',
'src/core/lib/debug/trace.h',
+ 'src/core/lib/event_engine/cf_engine/cf_engine.cc',
+ 'src/core/lib/event_engine/cf_engine/cf_engine.h',
+ 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc',
+ 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.h',
+ 'src/core/lib/event_engine/cf_engine/cftype_unique_ref.h',
'src/core/lib/event_engine/channel_args_endpoint_config.cc',
'src/core/lib/event_engine/channel_args_endpoint_config.h',
'src/core/lib/event_engine/common_closures.h',
@@ -2502,6 +2507,9 @@ Pod::Spec.new do |s|
'src/core/lib/debug/stats.h',
'src/core/lib/debug/stats_data.h',
'src/core/lib/debug/trace.h',
+ 'src/core/lib/event_engine/cf_engine/cf_engine.h',
+ 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.h',
+ 'src/core/lib/event_engine/cf_engine/cftype_unique_ref.h',
'src/core/lib/event_engine/channel_args_endpoint_config.h',
'src/core/lib/event_engine/common_closures.h',
'src/core/lib/event_engine/default_event_engine.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index efff2618f5c..2455d7bd126 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -1056,6 +1056,11 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/debug/stats_data.h )
s.files += %w( src/core/lib/debug/trace.cc )
s.files += %w( src/core/lib/debug/trace.h )
+ s.files += %w( src/core/lib/event_engine/cf_engine/cf_engine.cc )
+ s.files += %w( src/core/lib/event_engine/cf_engine/cf_engine.h )
+ s.files += %w( src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc )
+ s.files += %w( src/core/lib/event_engine/cf_engine/cfstream_endpoint.h )
+ s.files += %w( src/core/lib/event_engine/cf_engine/cftype_unique_ref.h )
s.files += %w( src/core/lib/event_engine/channel_args_endpoint_config.cc )
s.files += %w( src/core/lib/event_engine/channel_args_endpoint_config.h )
s.files += %w( src/core/lib/event_engine/common_closures.h )
diff --git a/grpc.gyp b/grpc.gyp
index 6aca97209e5..749984e59bb 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -735,6 +735,8 @@
'src/core/lib/debug/stats.cc',
'src/core/lib/debug/stats_data.cc',
'src/core/lib/debug/trace.cc',
+ 'src/core/lib/event_engine/cf_engine/cf_engine.cc',
+ 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc',
'src/core/lib/event_engine/channel_args_endpoint_config.cc',
'src/core/lib/event_engine/default_event_engine.cc',
'src/core/lib/event_engine/default_event_engine_factory.cc',
@@ -1226,6 +1228,8 @@
'src/core/lib/debug/stats.cc',
'src/core/lib/debug/stats_data.cc',
'src/core/lib/debug/trace.cc',
+ 'src/core/lib/event_engine/cf_engine/cf_engine.cc',
+ 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc',
'src/core/lib/event_engine/channel_args_endpoint_config.cc',
'src/core/lib/event_engine/default_event_engine.cc',
'src/core/lib/event_engine/default_event_engine_factory.cc',
@@ -1741,6 +1745,8 @@
'src/core/lib/debug/stats.cc',
'src/core/lib/debug/stats_data.cc',
'src/core/lib/debug/trace.cc',
+ 'src/core/lib/event_engine/cf_engine/cf_engine.cc',
+ 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc',
'src/core/lib/event_engine/channel_args_endpoint_config.cc',
'src/core/lib/event_engine/default_event_engine.cc',
'src/core/lib/event_engine/default_event_engine_factory.cc',
diff --git a/package.xml b/package.xml
index 0ac426b684b..c3d78e43d21 100644
--- a/package.xml
+++ b/package.xml
@@ -1038,6 +1038,11 @@
+
+
+
+
+
diff --git a/src/core/BUILD b/src/core/BUILD
index a53c9117b85..f13ccda5998 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -2097,17 +2097,33 @@ grpc_cc_library(
grpc_cc_library(
name = "cf_event_engine",
- srcs = ["lib/event_engine/cf_engine/cf_engine.cc"],
- hdrs = ["lib/event_engine/cf_engine/cf_engine.h"],
+ srcs = [
+ "lib/event_engine/cf_engine/cf_engine.cc",
+ "lib/event_engine/cf_engine/cfstream_endpoint.cc",
+ ],
+ hdrs = [
+ "lib/event_engine/cf_engine/cf_engine.h",
+ "lib/event_engine/cf_engine/cfstream_endpoint.h",
+ "lib/event_engine/cf_engine/cftype_unique_ref.h",
+ ],
+ external_deps = ["absl/strings:str_format"],
deps = [
"event_engine_common",
+ "event_engine_tcp_socket_utils",
"event_engine_thread_pool",
"event_engine_trace",
"event_engine_utils",
"init_internally",
+ "posix_event_engine_closure",
+ "posix_event_engine_event_poller",
+ "posix_event_engine_lockfree_event",
"posix_event_engine_timer_manager",
+ "ref_counted",
+ "strerror",
"//:event_engine_base_hdrs",
"//:gpr",
+ "//:ref_counted_ptr",
+ "//:sockaddr_utils",
],
)
@@ -2180,6 +2196,21 @@ grpc_cc_library(
"//:windows": ["windows_event_engine"],
"//:windows_msvc": ["windows_event_engine"],
"//:windows_other": ["windows_event_engine"],
+ "//:mac": [
+ "posix_event_engine",
+ "cf_event_engine",
+ ],
+ "//:mac_x86_64": [
+ "posix_event_engine",
+ "cf_event_engine",
+ ],
+ "//:mac_arm64": [
+ "posix_event_engine",
+ "cf_event_engine",
+ ],
+ "//:ios": ["cf_event_engine"],
+ "//:tvos": ["cf_event_engine"],
+ "//:watchos": ["cf_event_engine"],
"//conditions:default": ["posix_event_engine"],
}],
deps = [
diff --git a/src/core/lib/event_engine/cf_engine/cf_engine.cc b/src/core/lib/event_engine/cf_engine/cf_engine.cc
index 86b368f9910..ecd5e4fe218 100644
--- a/src/core/lib/event_engine/cf_engine/cf_engine.cc
+++ b/src/core/lib/event_engine/cf_engine/cf_engine.cc
@@ -1,4 +1,4 @@
-// Copyright 2022 The gRPC Authors
+// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -16,10 +16,14 @@
#ifdef GPR_APPLE
+#include
+
#include
#include "src/core/lib/event_engine/cf_engine/cf_engine.h"
+#include "src/core/lib/event_engine/cf_engine/cfstream_endpoint.h"
#include "src/core/lib/event_engine/posix_engine/timer_manager.h"
+#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/thread_pool/thread_pool.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/utils.h"
@@ -38,7 +42,7 @@ struct CFEventEngine::Closure final : public EventEngine::Closure {
GRPC_EVENT_ENGINE_TRACE("CFEventEngine:%p executing callback:%s", engine,
HandleToString(handle).c_str());
{
- grpc_core::MutexLock lock(&engine->mu_);
+ grpc_core::MutexLock lock(&engine->task_mu_);
engine->known_handles_.erase(handle);
}
cb();
@@ -53,7 +57,7 @@ CFEventEngine::CFEventEngine()
CFEventEngine::~CFEventEngine() {
{
- grpc_core::MutexLock lock(&mu_);
+ grpc_core::MutexLock lock(&task_mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
for (auto handle : known_handles_) {
gpr_log(GPR_ERROR,
@@ -77,14 +81,76 @@ CFEventEngine::CreateListener(
}
CFEventEngine::ConnectionHandle CFEventEngine::Connect(
- OnConnectCallback /* on_connect */, const ResolvedAddress& /* addr */,
- const EndpointConfig& /* args */, MemoryAllocator /* memory_allocator */,
- Duration /* timeout */) {
- grpc_core::Crash("unimplemented");
+ OnConnectCallback on_connect, const ResolvedAddress& addr,
+ const EndpointConfig& /* args */, MemoryAllocator memory_allocator,
+ Duration timeout) {
+ auto endpoint_ptr = new CFStreamEndpoint(
+ std::static_pointer_cast(shared_from_this()),
+ std::move(memory_allocator));
+
+ ConnectionHandle handle{reinterpret_cast(endpoint_ptr), 0};
+ {
+ grpc_core::MutexLock lock(&conn_mu_);
+ conn_handles_.insert(handle);
+ }
+
+ auto deadline_timer =
+ RunAfter(timeout, [handle, that = std::static_pointer_cast(
+ shared_from_this())]() {
+ that->CancelConnectInternal(
+ handle, absl::DeadlineExceededError("Connect timed out"));
+ });
+
+ auto on_connect2 =
+ [that = std::static_pointer_cast(shared_from_this()),
+ deadline_timer, handle,
+ on_connect = std::move(on_connect)](absl::Status status) mutable {
+ // best effort canceling deadline timer
+ that->Cancel(deadline_timer);
+
+ {
+ grpc_core::MutexLock lock(&that->conn_mu_);
+ that->conn_handles_.erase(handle);
+ }
+
+ auto endpoint_ptr = reinterpret_cast(handle.keys[0]);
+
+ if (!status.ok()) {
+ on_connect(std::move(status));
+ delete endpoint_ptr;
+ return;
+ }
+
+ on_connect(std::unique_ptr(endpoint_ptr));
+ };
+
+ endpoint_ptr->Connect(std::move(on_connect2), addr);
+
+ return handle;
}
-bool CFEventEngine::CancelConnect(ConnectionHandle /* handle */) {
- grpc_core::Crash("unimplemented");
+bool CFEventEngine::CancelConnect(ConnectionHandle handle) {
+ CancelConnectInternal(handle, absl::CancelledError("CancelConnect"));
+ // on_connect will always be called, even if cancellation is successful
+ return false;
+}
+
+bool CFEventEngine::CancelConnectInternal(ConnectionHandle handle,
+ absl::Status status) {
+ grpc_core::MutexLock lock(&conn_mu_);
+
+ if (!conn_handles_.contains(handle)) {
+ GRPC_EVENT_ENGINE_TRACE(
+ "Unknown connection handle: %s",
+ HandleToString(handle).c_str());
+ return false;
+ }
+ conn_handles_.erase(handle);
+
+ // keep the `conn_mu_` lock to prevent endpoint_ptr from being deleted
+
+ auto endpoint_ptr = reinterpret_cast(handle.keys[0]);
+ return endpoint_ptr->CancelConnect(status);
}
bool CFEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
@@ -94,12 +160,12 @@ std::unique_ptr CFEventEngine::GetDNSResolver(
grpc_core::Crash("unimplemented");
}
-void CFEventEngine::Run(EventEngine::Closure* /* closure */) {
- grpc_core::Crash("unimplemented");
+void CFEventEngine::Run(EventEngine::Closure* closure) {
+ thread_pool_->Run(closure);
}
-void CFEventEngine::Run(absl::AnyInvocable /* closure */) {
- grpc_core::Crash("unimplemented");
+void CFEventEngine::Run(absl::AnyInvocable closure) {
+ thread_pool_->Run(std::move(closure));
}
EventEngine::TaskHandle CFEventEngine::RunAfter(Duration when,
@@ -113,7 +179,7 @@ EventEngine::TaskHandle CFEventEngine::RunAfter(
}
bool CFEventEngine::Cancel(TaskHandle handle) {
- grpc_core::MutexLock lock(&mu_);
+ grpc_core::MutexLock lock(&task_mu_);
if (!known_handles_.contains(handle)) return false;
auto* cd = reinterpret_cast(handle.keys[0]);
bool r = timer_manager_.TimerCancel(&cd->timer);
@@ -130,7 +196,7 @@ EventEngine::TaskHandle CFEventEngine::RunAfterInternal(
cd->engine = this;
EventEngine::TaskHandle handle{reinterpret_cast(cd),
aba_token_.fetch_add(1)};
- grpc_core::MutexLock lock(&mu_);
+ grpc_core::MutexLock lock(&task_mu_);
known_handles_.insert(handle);
cd->handle = handle;
GRPC_EVENT_ENGINE_TRACE("CFEventEngine:%p scheduling callback:%s", this,
diff --git a/src/core/lib/event_engine/cf_engine/cf_engine.h b/src/core/lib/event_engine/cf_engine/cf_engine.h
index 7b9a7f4a804..3e55ce85616 100644
--- a/src/core/lib/event_engine/cf_engine/cf_engine.h
+++ b/src/core/lib/event_engine/cf_engine/cf_engine.h
@@ -1,4 +1,4 @@
-// Copyright 2022 The gRPC Authors
+// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -20,6 +20,9 @@
#include
#include "src/core/lib/event_engine/handle_containers.h"
+#include "src/core/lib/event_engine/posix_engine/event_poller.h"
+#include "src/core/lib/event_engine/posix_engine/lockfree_event.h"
+#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/event_engine/posix_engine/timer_manager.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/surface/init_internally.h"
@@ -28,6 +31,7 @@ namespace grpc_event_engine {
namespace experimental {
class CFEventEngine : public EventEngine,
+ public Scheduler,
public grpc_core::KeepsGrpcInitialized {
public:
CFEventEngine();
@@ -60,9 +64,16 @@ class CFEventEngine : public EventEngine,
struct Closure;
EventEngine::TaskHandle RunAfterInternal(Duration when,
absl::AnyInvocable cb);
- grpc_core::Mutex mu_;
- TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_);
+
+ bool CancelConnectInternal(ConnectionHandle handle, absl::Status status);
+
+ grpc_core::Mutex task_mu_;
+ TaskHandleSet known_handles_ ABSL_GUARDED_BY(task_mu_);
std::atomic aba_token_{0};
+
+ grpc_core::Mutex conn_mu_;
+ ConnectionHandleSet conn_handles_ ABSL_GUARDED_BY(conn_mu_);
+
std::shared_ptr thread_pool_;
TimerManager timer_manager_;
};
diff --git a/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc
new file mode 100644
index 00000000000..f77a626d7f5
--- /dev/null
+++ b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc
@@ -0,0 +1,354 @@
+// Copyright 2023 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include
+
+#ifdef GPR_APPLE
+
+#include "src/core/lib/event_engine/cf_engine/cfstream_endpoint.h"
+#include "src/core/lib/event_engine/trace.h"
+#include "src/core/lib/gprpp/strerror.h"
+
+namespace grpc_event_engine {
+namespace experimental {
+
+namespace {
+
+int kDefaultReadBufferSize = 8192;
+
+absl::Status CFErrorToStatus(CFTypeUniqueRef cf_error) {
+ if (cf_error == nullptr) {
+ return absl::OkStatus();
+ }
+ CFErrorDomain cf_domain = CFErrorGetDomain((cf_error));
+ CFIndex code = CFErrorGetCode((cf_error));
+ CFTypeUniqueRef cf_desc = CFErrorCopyDescription((cf_error));
+ char domain_buf[256];
+ char desc_buf[256];
+ CFStringGetCString(cf_domain, domain_buf, 256, kCFStringEncodingUTF8);
+ CFStringGetCString(cf_desc, desc_buf, 256, kCFStringEncodingUTF8);
+ return absl::Status(absl::StatusCode::kUnknown,
+ absl::StrFormat("(domain:%s, code:%ld, description:%s)",
+ domain_buf, code, desc_buf));
+}
+
+absl::StatusOr CFReadStreamLocallAddress(
+ CFReadStreamRef stream) {
+ CFTypeUniqueRef cf_native_handle = static_cast(
+ CFReadStreamCopyProperty(stream, kCFStreamPropertySocketNativeHandle));
+ CFSocketNativeHandle socket;
+ CFDataGetBytes(cf_native_handle, CFRangeMake(0, sizeof(CFSocketNativeHandle)),
+ (UInt8*)&socket);
+ EventEngine::ResolvedAddress addr;
+ socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
+ if (getsockname(socket, const_cast(addr.address()), &len) < 0) {
+ return absl::InternalError(
+ absl::StrCat("getsockname:", grpc_core::StrError(errno)));
+ }
+ return EventEngine::ResolvedAddress(addr.address(), len);
+}
+
+} // namespace
+
+bool CFStreamEndpointImpl::CancelConnect(absl::Status status) {
+ GRPC_EVENT_ENGINE_ENDPOINT_TRACE(
+ "CFStreamEndpointImpl::CancelConnect: status: %s, this: %p",
+ status.ToString().c_str(), this);
+
+ return open_event_.SetShutdown(std::move(status));
+}
+
+void CFStreamEndpointImpl::Connect(
+ absl::AnyInvocable on_connect,
+ EventEngine::ResolvedAddress addr) {
+ auto addr_uri = ResolvedAddressToURI(addr);
+
+ if (!addr_uri.ok()) {
+ on_connect(std::move(addr_uri).status());
+ return;
+ }
+
+ GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Connect: %s",
+ addr_uri.value().c_str());
+
+ peer_address_ = std::move(addr);
+ auto host_port = ResolvedAddressToNormalizedString(peer_address_);
+ if (!host_port.ok()) {
+ on_connect(std::move(host_port).status());
+ return;
+ }
+
+ peer_address_string_ = host_port.value();
+ GRPC_EVENT_ENGINE_ENDPOINT_TRACE(
+ "CFStreamEndpointImpl::Connect, host_port: %s", host_port->c_str());
+
+ std::string host_string;
+ std::string port_string;
+ grpc_core::SplitHostPort(host_port.value(), &host_string, &port_string);
+ CFStringRef host = CFStringCreateWithCString(NULL, host_string.c_str(),
+ kCFStringEncodingUTF8);
+ int port = ResolvedAddressGetPort(peer_address_);
+ CFStreamCreatePairWithSocketToHost(NULL, host, port, &cf_read_stream_,
+ &cf_write_stream_);
+
+ CFStreamClientContext cf_context = {0, this, Retain, Release, nullptr};
+ CFReadStreamSetClient(
+ cf_read_stream_,
+ kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
+ kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
+ ReadCallback, &cf_context);
+ CFWriteStreamSetClient(
+ cf_write_stream_,
+ kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
+ kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
+ WriteCallback, &cf_context);
+ CFReadStreamSetDispatchQueue(cf_read_stream_,
+ dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0));
+ CFWriteStreamSetDispatchQueue(
+ cf_write_stream_, dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0));
+
+ if (!CFReadStreamOpen(cf_read_stream_)) {
+ auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_));
+ on_connect(std::move(status));
+ return;
+ }
+
+ if (!CFWriteStreamOpen(cf_write_stream_)) {
+ auto status = CFErrorToStatus(CFWriteStreamCopyError(cf_write_stream_));
+ on_connect(std::move(status));
+ return;
+ }
+
+ open_event_.NotifyOn(new PosixEngineClosure(
+ [that = Ref(),
+ on_connect = std::move(on_connect)](absl::Status status) mutable {
+ if (!status.ok()) {
+ on_connect(std::move(status));
+ return;
+ }
+
+ auto local_addr = CFReadStreamLocallAddress(that->cf_read_stream_);
+ if (!local_addr.ok()) {
+ on_connect(std::move(local_addr).status());
+ return;
+ }
+
+ that->local_address_ = local_addr.value();
+ that->local_address_string_ =
+ *ResolvedAddressToURI(that->local_address_);
+ on_connect(absl::OkStatus());
+ },
+ false /* is_permanent */));
+}
+
+/* static */ void CFStreamEndpointImpl::ReadCallback(
+ CFReadStreamRef stream, CFStreamEventType type,
+ void* client_callback_info) {
+ auto self = static_cast(client_callback_info);
+
+ GRPC_EVENT_ENGINE_ENDPOINT_TRACE(
+ "CFStreamEndpointImpl::ReadCallback, type: %lu, this: %p", type, self);
+
+ switch (type) {
+ case kCFStreamEventOpenCompleted:
+ // wait for write stream open completed to signal connection ready
+ break;
+ case kCFStreamEventHasBytesAvailable:
+ ABSL_FALLTHROUGH_INTENDED;
+ case kCFStreamEventEndEncountered:
+ self->read_event_.SetReady();
+ break;
+ case kCFStreamEventErrorOccurred: {
+ auto status = CFErrorToStatus(CFReadStreamCopyError(stream));
+ GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream Read error: %s",
+ status.ToString().c_str());
+
+ self->open_event_.SetShutdown(status);
+ self->read_event_.SetShutdown(status);
+ self->write_event_.SetShutdown(status);
+ } break;
+ default:
+ GPR_UNREACHABLE_CODE(return);
+ }
+}
+
+/* static */
+void CFStreamEndpointImpl::WriteCallback(CFWriteStreamRef stream,
+ CFStreamEventType type,
+ void* client_callback_info) {
+ auto self = static_cast(client_callback_info);
+ GRPC_EVENT_ENGINE_ENDPOINT_TRACE(
+ "CFStreamEndpointImpl::WriteCallback, type: %lu, this: %p", type, self);
+
+ switch (type) {
+ case kCFStreamEventOpenCompleted:
+ self->open_event_.SetReady();
+ break;
+ case kCFStreamEventCanAcceptBytes:
+ ABSL_FALLTHROUGH_INTENDED;
+ case kCFStreamEventEndEncountered:
+ self->write_event_.SetReady();
+ break;
+ case kCFStreamEventErrorOccurred: {
+ auto status = CFErrorToStatus(CFWriteStreamCopyError(stream));
+ GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream Write error: %s",
+ status.ToString().c_str());
+
+ self->open_event_.SetShutdown(status);
+ self->read_event_.SetShutdown(status);
+ self->write_event_.SetShutdown(status);
+ } break;
+ default:
+ GPR_UNREACHABLE_CODE(return);
+ }
+}
+
+CFStreamEndpointImpl::CFStreamEndpointImpl(
+ std::shared_ptr engine, MemoryAllocator memory_allocator)
+ : engine_(std::move(engine)),
+ memory_allocator_(std::move(memory_allocator)),
+ open_event_(engine_.get()),
+ read_event_(engine_.get()),
+ write_event_(engine_.get()) {
+ open_event_.InitEvent();
+ read_event_.InitEvent();
+ write_event_.InitEvent();
+}
+
+CFStreamEndpointImpl::~CFStreamEndpointImpl() {
+ open_event_.DestroyEvent();
+ read_event_.DestroyEvent();
+ write_event_.DestroyEvent();
+}
+
+void CFStreamEndpointImpl::Shutdown() {
+ GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Shutdown: this: %p",
+ this);
+
+ auto shutdownStatus =
+ absl::Status(absl::StatusCode::kUnknown,
+ absl::StrFormat("Shutting down CFStreamEndpointImpl"));
+ open_event_.SetShutdown(shutdownStatus);
+ read_event_.SetShutdown(shutdownStatus);
+ write_event_.SetShutdown(shutdownStatus);
+
+ CFReadStreamSetClient(cf_read_stream_, kCFStreamEventNone, nullptr, nullptr);
+ CFWriteStreamSetClient(cf_write_stream_, kCFStreamEventNone, nullptr,
+ nullptr);
+ CFReadStreamClose(cf_read_stream_);
+ CFWriteStreamClose(cf_write_stream_);
+}
+
+bool CFStreamEndpointImpl::Read(
+ absl::AnyInvocable on_read, SliceBuffer* buffer,
+ const EventEngine::Endpoint::ReadArgs* /* args */) {
+ GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Read, this: %p",
+ this);
+
+ read_event_.NotifyOn(new PosixEngineClosure(
+ [that = Ref(), on_read = std::move(on_read),
+ buffer](absl::Status status) mutable {
+ if (status.ok()) {
+ that->DoRead(std::move(on_read), buffer);
+ } else {
+ on_read(status);
+ }
+ },
+ false /* is_permanent*/));
+
+ return false;
+}
+
+void CFStreamEndpointImpl::DoRead(
+ absl::AnyInvocable on_read, SliceBuffer* buffer) {
+ GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::DoRead, this: %p",
+ this);
+
+ auto buffer_index = buffer->AppendIndexed(
+ Slice(memory_allocator_.MakeSlice(kDefaultReadBufferSize)));
+
+ CFIndex read_size = CFReadStreamRead(
+ cf_read_stream_,
+ internal::SliceCast(buffer->MutableSliceAt(buffer_index))
+ .begin(),
+ kDefaultReadBufferSize);
+
+ if (read_size < 0) {
+ auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_));
+ GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream read error: %s, read_size: %ld",
+ status.ToString().c_str(), read_size);
+ on_read(status);
+ return;
+ }
+
+ buffer->RemoveLastNBytes(buffer->Length() - read_size);
+ on_read(absl::OkStatus());
+}
+
+bool CFStreamEndpointImpl::Write(
+ absl::AnyInvocable on_writable, SliceBuffer* data,
+ const EventEngine::Endpoint::WriteArgs* /* args */) {
+ GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Write, this: %p",
+ this);
+
+ write_event_.NotifyOn(new PosixEngineClosure(
+ [that = Ref(), on_writable = std::move(on_writable),
+ data](absl::Status status) mutable {
+ if (status.ok()) {
+ that->DoWrite(std::move(on_writable), data);
+ } else {
+ on_writable(status);
+ }
+ },
+ false /* is_permanent*/));
+
+ return false;
+}
+
+void CFStreamEndpointImpl::DoWrite(
+ absl::AnyInvocable on_writable, SliceBuffer* data) {
+ GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::DoWrite, this: %p",
+ this);
+
+ size_t total_written_size = 0;
+ for (size_t i = 0; i < data->Count(); i++) {
+ auto slice = data->RefSlice(i);
+ size_t written_size =
+ CFWriteStreamWrite(cf_write_stream_, slice.begin(), slice.size());
+
+ total_written_size += written_size;
+ if (written_size < slice.size()) {
+ SliceBuffer written;
+ data->MoveFirstNBytesIntoSliceBuffer(total_written_size, written);
+
+ write_event_.NotifyOn(new PosixEngineClosure(
+ [that = Ref(), on_writable = std::move(on_writable),
+ data](absl::Status status) mutable {
+ if (status.ok()) {
+ that->DoWrite(std::move(on_writable), data);
+ } else {
+ on_writable(status);
+ }
+ },
+ false /* is_permanent*/));
+ return;
+ }
+ }
+ on_writable(absl::OkStatus());
+}
+
+} // namespace experimental
+} // namespace grpc_event_engine
+
+#endif // GPR_APPLE
diff --git a/src/core/lib/event_engine/cf_engine/cfstream_endpoint.h b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.h
new file mode 100644
index 00000000000..832efda4412
--- /dev/null
+++ b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.h
@@ -0,0 +1,146 @@
+// Copyright 2023 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFSTREAM_ENDPOINT_H
+#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFSTREAM_ENDPOINT_H
+#include
+
+#ifdef GPR_APPLE
+
+#include
+
+#include "absl/strings/str_format.h"
+
+#include
+
+#include "src/core/lib/address_utils/sockaddr_utils.h"
+#include "src/core/lib/event_engine/cf_engine/cf_engine.h"
+#include "src/core/lib/event_engine/cf_engine/cftype_unique_ref.h"
+#include "src/core/lib/event_engine/posix_engine/lockfree_event.h"
+#include "src/core/lib/event_engine/tcp_socket_utils.h"
+#include "src/core/lib/gprpp/host_port.h"
+#include "src/core/lib/gprpp/ref_counted.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+
+namespace grpc_event_engine {
+namespace experimental {
+
+class CFStreamEndpointImpl
+ : public grpc_core::RefCounted {
+ public:
+ CFStreamEndpointImpl(std::shared_ptr engine,
+ MemoryAllocator memory_allocator);
+ ~CFStreamEndpointImpl();
+
+ void Shutdown();
+
+ bool Read(absl::AnyInvocable on_read, SliceBuffer* buffer,
+ const EventEngine::Endpoint::ReadArgs* args);
+ bool Write(absl::AnyInvocable on_writable,
+ SliceBuffer* data, const EventEngine::Endpoint::WriteArgs* args);
+
+ const EventEngine::ResolvedAddress& GetPeerAddress() const {
+ return peer_address_;
+ }
+ const EventEngine::ResolvedAddress& GetLocalAddress() const {
+ return local_address_;
+ }
+
+ public:
+ void Connect(absl::AnyInvocable on_connect,
+ EventEngine::ResolvedAddress addr);
+ bool CancelConnect(absl::Status status);
+
+ private:
+ void DoWrite(absl::AnyInvocable on_writable,
+ SliceBuffer* data);
+ void DoRead(absl::AnyInvocable on_read,
+ SliceBuffer* buffer);
+
+ private:
+ static void* Retain(void* info) {
+ auto that = static_cast(info);
+ return that->Ref().release();
+ }
+
+ static void Release(void* info) {
+ auto that = static_cast(info);
+ that->Unref();
+ }
+
+ static void ReadCallback(CFReadStreamRef stream, CFStreamEventType type,
+ void* client_callback_info);
+ static void WriteCallback(CFWriteStreamRef stream, CFStreamEventType type,
+ void* client_callback_info);
+
+ private:
+ CFTypeUniqueRef cf_read_stream_;
+ CFTypeUniqueRef cf_write_stream_;
+
+ std::shared_ptr engine_;
+
+ EventEngine::ResolvedAddress peer_address_;
+ EventEngine::ResolvedAddress local_address_;
+ std::string peer_address_string_;
+ std::string local_address_string_;
+ MemoryAllocator memory_allocator_;
+
+ LockfreeEvent open_event_;
+ LockfreeEvent read_event_;
+ LockfreeEvent write_event_;
+};
+
+class CFStreamEndpoint : public EventEngine::Endpoint {
+ public:
+ CFStreamEndpoint(std::shared_ptr engine,
+ MemoryAllocator memory_allocator) {
+ impl_ = grpc_core::MakeRefCounted(
+ std::move(engine), std::move(memory_allocator));
+ }
+ ~CFStreamEndpoint() override { impl_->Shutdown(); }
+
+ bool Read(absl::AnyInvocable on_read, SliceBuffer* buffer,
+ const ReadArgs* args) override {
+ return impl_->Read(std::move(on_read), buffer, args);
+ }
+ bool Write(absl::AnyInvocable on_writable,
+ SliceBuffer* data, const WriteArgs* args) override {
+ return impl_->Write(std::move(on_writable), data, args);
+ }
+
+ const EventEngine::ResolvedAddress& GetPeerAddress() const override {
+ return impl_->GetPeerAddress();
+ }
+ const EventEngine::ResolvedAddress& GetLocalAddress() const override {
+ return impl_->GetLocalAddress();
+ }
+
+ public:
+ void Connect(absl::AnyInvocable on_connect,
+ EventEngine::ResolvedAddress addr) {
+ impl_->Connect(std::move(on_connect), std::move(addr));
+ }
+ bool CancelConnect(absl::Status status) {
+ return impl_->CancelConnect(std::move(status));
+ }
+
+ private:
+ grpc_core::RefCountedPtr impl_;
+};
+
+} // namespace experimental
+} // namespace grpc_event_engine
+
+#endif // GPR_APPLE
+
+#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFSTREAM_ENDPOINT_H
diff --git a/src/core/lib/event_engine/cf_engine/cftype_unique_ref.h b/src/core/lib/event_engine/cf_engine/cftype_unique_ref.h
new file mode 100644
index 00000000000..c7915170f62
--- /dev/null
+++ b/src/core/lib/event_engine/cf_engine/cftype_unique_ref.h
@@ -0,0 +1,79 @@
+// Copyright 2023 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFTYPE_UNIQUE_REF_H
+#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFTYPE_UNIQUE_REF_H
+#include
+
+#ifdef GPR_APPLE
+
+#include
+
+namespace grpc_event_engine {
+namespace experimental {
+
+template
+class CFTypeUniqueRef {
+ static_assert(std::is_convertible::value,
+ "T should be `CFXxxRef` type");
+
+ public:
+ /* implicit */
+ CFTypeUniqueRef(T cf_type_ref = nullptr) : cf_type_ref_(cf_type_ref) {}
+ ~CFTypeUniqueRef() { reset(); }
+
+ CFTypeUniqueRef(CFTypeUniqueRef const&) = delete;
+ CFTypeUniqueRef& operator=(CFTypeUniqueRef const&) = delete;
+
+ CFTypeUniqueRef(CFTypeUniqueRef&& other) : cf_type_ref_(other.release()){};
+ CFTypeUniqueRef& operator=(CFTypeUniqueRef&& other) {
+ reset(other.release());
+ return *this;
+ }
+
+ operator T() { return cf_type_ref_; }
+
+ // Note: this is for passing a CFTypeRef as output parameter to a CF API, the
+ // current ref is released (if any) regardless of whether new value is set
+ T* operator&() {
+ reset();
+ return &cf_type_ref_;
+ }
+
+ T release() {
+ T old = cf_type_ref_;
+ cf_type_ref_ = nullptr;
+ return old;
+ }
+
+ void reset(T other = nullptr) {
+ if (cf_type_ref_ == other) {
+ return;
+ }
+ T old = cf_type_ref_;
+ cf_type_ref_ = other;
+ if (old) {
+ CFRelease(old);
+ }
+ }
+
+ private:
+ T cf_type_ref_;
+};
+
+} // namespace experimental
+} // namespace grpc_event_engine
+
+#endif // GPR_APPLE
+
+#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFTYPE_UNIQUE_REF_H
diff --git a/src/core/lib/event_engine/default_event_engine_factory.cc b/src/core/lib/event_engine/default_event_engine_factory.cc
index a4311bc9668..10bfe898650 100644
--- a/src/core/lib/event_engine/default_event_engine_factory.cc
+++ b/src/core/lib/event_engine/default_event_engine_factory.cc
@@ -20,7 +20,7 @@
#include
-#ifdef GPR_WINDOWS
+#if defined(GPR_WINDOWS)
#include "src/core/lib/event_engine/windows/windows_engine.h"
namespace grpc_event_engine {
@@ -32,7 +32,19 @@ std::unique_ptr DefaultEventEngineFactory() {
} // namespace experimental
} // namespace grpc_event_engine
-#else // not GPR_WINDOWS
+#elif defined(GRPC_CFSTREAM)
+#include "src/core/lib/event_engine/cf_engine/cf_engine.h"
+
+namespace grpc_event_engine {
+namespace experimental {
+
+std::unique_ptr DefaultEventEngineFactory() {
+ return std::make_unique();
+}
+
+} // namespace experimental
+} // namespace grpc_event_engine
+#else
#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
namespace grpc_event_engine {
diff --git a/src/core/lib/iomgr/cfstream_handle.cc b/src/core/lib/iomgr/cfstream_handle.cc
index 434a0199e8a..2978139b606 100644
--- a/src/core/lib/iomgr/cfstream_handle.cc
+++ b/src/core/lib/iomgr/cfstream_handle.cc
@@ -188,7 +188,7 @@ void CFStreamHandle::Ref(const char* file, int line, const char* reason) {
void CFStreamHandle::Unref(const char* file, int line, const char* reason) {
if (grpc_tcp_trace.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
- gpr_log(GPR_DEBUG,
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
reason, val, val - 1);
}
diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc
index edfea7c802f..673a0b9f368 100644
--- a/src/core/lib/iomgr/endpoint_cfstream.cc
+++ b/src/core/lib/iomgr/endpoint_cfstream.cc
@@ -240,7 +240,7 @@ static void WriteAction(void* arg, grpc_error_handle error) {
}
static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb, bool urgent,
+ grpc_closure* cb, bool /*urgent*/,
int /*min_progress_size*/) {
CFStreamEndpoint* ep_impl = reinterpret_cast(ep);
if (grpc_tcp_trace.enabled()) {
@@ -258,7 +258,8 @@ static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb, void* arg, int /*max_frame_size*/) {
+ grpc_closure* cb, void* /*arg*/,
+ int /*max_frame_size*/) {
CFStreamEndpoint* ep_impl = reinterpret_cast(ep);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu",
@@ -304,14 +305,15 @@ absl::string_view CFStreamGetLocalAddress(grpc_endpoint* ep) {
return ep_impl->local_address;
}
-int CFStreamGetFD(grpc_endpoint* ep) { return 0; }
+int CFStreamGetFD(grpc_endpoint* /*ep*/) { return 0; }
-bool CFStreamCanTrackErr(grpc_endpoint* ep) { return false; }
+bool CFStreamCanTrackErr(grpc_endpoint* /*ep*/) { return false; }
-void CFStreamAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
-void CFStreamAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {}
-void CFStreamDeleteFromPollsetSet(grpc_endpoint* ep,
- grpc_pollset_set* pollset) {}
+void CFStreamAddToPollset(grpc_endpoint* /*ep*/, grpc_pollset* /*pollset*/) {}
+void CFStreamAddToPollsetSet(grpc_endpoint* /*ep*/,
+ grpc_pollset_set* /*pollset*/) {}
+void CFStreamDeleteFromPollsetSet(grpc_endpoint* /*ep*/,
+ grpc_pollset_set* /*pollset*/) {}
static const grpc_endpoint_vtable vtable = {CFStreamRead,
CFStreamWrite,
diff --git a/src/core/lib/iomgr/ev_apple.cc b/src/core/lib/iomgr/ev_apple.cc
index efc7639ac91..ba6c21d670c 100644
--- a/src/core/lib/iomgr/ev_apple.cc
+++ b/src/core/lib/iomgr/ev_apple.cc
@@ -118,7 +118,7 @@ static void grpc_apple_register_write_stream_queue(
/// be issued to the run loop when a network event happens and will be driven by
/// the global run loop thread gGlobalRunLoopThread.
static void grpc_apple_register_read_stream_run_loop(
- CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) {
+ CFReadStreamRef read_stream, dispatch_queue_t /*dispatch_queue*/) {
GRPC_POLLING_TRACE("Register read stream: %p", read_stream);
grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
CFReadStreamScheduleWithRunLoop(read_stream, gGlobalRunLoopContext->run_loop,
@@ -131,7 +131,7 @@ static void grpc_apple_register_read_stream_run_loop(
/// be issued to the run loop when a network event happens, and will be driven
/// by the global run loop thread gGlobalRunLoopThread.
static void grpc_apple_register_write_stream_run_loop(
- CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) {
+ CFWriteStreamRef write_stream, dispatch_queue_t /*dispatch_queue*/) {
GRPC_POLLING_TRACE("Register write stream: %p", write_stream);
grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
CFWriteStreamScheduleWithRunLoop(
@@ -163,7 +163,7 @@ void grpc_apple_register_write_stream(CFWriteStreamRef write_stream,
/// Drive the run loop in a global singleton thread until the global run loop is
/// shutdown.
-static void GlobalRunLoopFunc(void* arg) {
+static void GlobalRunLoopFunc(void* /*arg*/) {
grpc_core::LockableAndReleasableMutexLock lock(&gGlobalRunLoopContext->mu);
gGlobalRunLoopContext->run_loop = CFRunLoopGetCurrent();
gGlobalRunLoopContext->init_cv.Signal();
@@ -342,15 +342,15 @@ grpc_pollset_vtable grpc_apple_pollset_vtable = {
// pollset_set implementation
grpc_pollset_set* pollset_set_create(void) { return nullptr; }
-void pollset_set_destroy(grpc_pollset_set* pollset_set) {}
-void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
- grpc_pollset* pollset) {}
-void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
- grpc_pollset* pollset) {}
-void pollset_set_add_pollset_set(grpc_pollset_set* bag,
- grpc_pollset_set* item) {}
-void pollset_set_del_pollset_set(grpc_pollset_set* bag,
- grpc_pollset_set* item) {}
+void pollset_set_destroy(grpc_pollset_set* /*pollset_set*/) {}
+void pollset_set_add_pollset(grpc_pollset_set* /*pollset_set*/,
+ grpc_pollset* /*pollset*/) {}
+void pollset_set_del_pollset(grpc_pollset_set* /*pollset_set*/,
+ grpc_pollset* /*pollset*/) {}
+void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/,
+ grpc_pollset_set* /*item*/) {}
+void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/,
+ grpc_pollset_set* /*item*/) {}
grpc_pollset_set_vtable grpc_apple_pollset_set_vtable = {
pollset_set_create, pollset_set_destroy,
diff --git a/src/core/lib/iomgr/iomgr_posix_cfstream.cc b/src/core/lib/iomgr/iomgr_posix_cfstream.cc
index ddb514000fe..8e2d2186664 100644
--- a/src/core/lib/iomgr/iomgr_posix_cfstream.cc
+++ b/src/core/lib/iomgr/iomgr_posix_cfstream.cc
@@ -73,7 +73,7 @@ static bool apple_iomgr_platform_is_any_background_poller_thread(void) {
}
static bool apple_iomgr_platform_add_closure_to_background_poller(
- grpc_closure* closure, grpc_error_handle error) {
+ grpc_closure* /*closure*/, grpc_error_handle /*error*/) {
return false;
}
diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc
index 7be3637ac42..09e7c5d799a 100644
--- a/src/core/lib/iomgr/tcp_client_cfstream.cc
+++ b/src/core/lib/iomgr/tcp_client_cfstream.cc
@@ -33,6 +33,7 @@
#include
#include "src/core/lib/address_utils/sockaddr_utils.h"
+#include "src/core/lib/event_engine/shim.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/iomgr/cfstream_handle.h"
@@ -40,6 +41,7 @@
#include "src/core/lib/iomgr/endpoint_cfstream.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/error_cfstream.h"
+#include "src/core/lib/iomgr/event_engine_shims/tcp_client.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/timer.h"
@@ -149,9 +151,14 @@ static void ParseResolvedAddress(const grpc_resolved_address* addr,
static int64_t CFStreamClientConnect(
grpc_closure* closure, grpc_endpoint** ep,
- grpc_pollset_set* interested_parties,
- const grpc_event_engine::experimental::EndpointConfig& /*config*/,
+ grpc_pollset_set* /*interested_parties*/,
+ const grpc_event_engine::experimental::EndpointConfig& config,
const grpc_resolved_address* resolved_addr, grpc_core::Timestamp deadline) {
+ if (grpc_event_engine::experimental::UseEventEngineClient()) {
+ return grpc_event_engine::experimental::event_engine_tcp_client_connect(
+ closure, ep, config, resolved_addr, deadline);
+ }
+
auto addr_uri = grpc_sockaddr_to_uri(resolved_addr);
if (!addr_uri.ok()) {
grpc_error_handle error = GRPC_ERROR_CREATE(addr_uri.status().ToString());
@@ -198,7 +205,11 @@ static int64_t CFStreamClientConnect(
return 0;
}
-static bool CFStreamClientCancelConnect(int64_t /*connection_handle*/) {
+static bool CFStreamClientCancelConnect(int64_t connection_handle) {
+ if (grpc_event_engine::experimental::UseEventEngineClient()) {
+ return grpc_event_engine::experimental::
+ event_engine_tcp_client_cancel_connect(connection_handle);
+ }
return false;
}
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index e6c288401fd..151465b521b 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -488,6 +488,8 @@ CORE_SOURCE_FILES = [
'src/core/lib/debug/stats.cc',
'src/core/lib/debug/stats_data.cc',
'src/core/lib/debug/trace.cc',
+ 'src/core/lib/event_engine/cf_engine/cf_engine.cc',
+ 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc',
'src/core/lib/event_engine/channel_args_endpoint_config.cc',
'src/core/lib/event_engine/default_event_engine.cc',
'src/core/lib/event_engine/default_event_engine_factory.cc',
diff --git a/test/core/event_engine/cf/BUILD b/test/core/event_engine/cf/BUILD
new file mode 100644
index 00000000000..9bc735bc860
--- /dev/null
+++ b/test/core/event_engine/cf/BUILD
@@ -0,0 +1,34 @@
+# Copyright 2023 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+load("//bazel:grpc_build_system.bzl", "grpc_cc_test")
+
+licenses(["notice"])
+
+grpc_cc_test(
+ name = "cf_engine_test",
+ timeout = "short",
+ srcs = ["cf_engine_test.cc"],
+ external_deps = ["gtest"],
+ language = "C++",
+ tags = [
+ "no_linux",
+ "no_windows",
+ ],
+ deps = [
+ "//:gpr_platform",
+ "//src/core:cf_event_engine",
+ "//test/core/util:grpc_test_util",
+ ],
+)
diff --git a/test/core/event_engine/cf/cf_engine_test.cc b/test/core/event_engine/cf/cf_engine_test.cc
new file mode 100644
index 00000000000..f50b40f82fa
--- /dev/null
+++ b/test/core/event_engine/cf/cf_engine_test.cc
@@ -0,0 +1,96 @@
+// Copyright 2023 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include
+
+#ifdef GPR_APPLE
+
+#include
+
+#include "absl/status/status.h"
+#include "gtest/gtest.h"
+
+#include
+#include
+
+#include "src/core/lib/event_engine/cf_engine/cf_engine.h"
+#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
+#include "src/core/lib/event_engine/tcp_socket_utils.h"
+#include "src/core/lib/resource_quota/memory_quota.h"
+#include "src/core/lib/resource_quota/resource_quota.h"
+#include "test/core/util/port.h"
+
+using namespace std::chrono_literals;
+
+namespace grpc_event_engine {
+namespace experimental {
+
+TEST(CFEventEngineTest, TestConnectionTimeout) {
+ // use a non-routable IP so connection will timeout
+ auto resolved_addr = URIToResolvedAddress("ipv4:10.255.255.255:1234");
+ GPR_ASSERT(resolved_addr.ok());
+
+ grpc_core::MemoryQuota memory_quota("cf_engine_test");
+ grpc_core::Notification client_signal;
+ auto cf_engine = std::make_shared();
+
+ ChannelArgsEndpointConfig config(grpc_core::ChannelArgs().Set(
+ GRPC_ARG_RESOURCE_QUOTA, grpc_core::ResourceQuota::Default()));
+ cf_engine->Connect(
+ [&client_signal](auto endpoint) {
+ EXPECT_EQ(endpoint.status().code(),
+ absl::StatusCode::kDeadlineExceeded);
+ client_signal.Notify();
+ },
+ *resolved_addr, config, memory_quota.CreateMemoryAllocator("conn1"), 1ms);
+
+ client_signal.WaitForNotification();
+}
+
+TEST(CFEventEngineTest, TestConnectionCancelled) {
+ // use a non-routable IP so to cancel connection before timeout
+ auto resolved_addr = URIToResolvedAddress("ipv4:10.255.255.255:1234");
+ GPR_ASSERT(resolved_addr.ok());
+
+ grpc_core::MemoryQuota memory_quota("cf_engine_test");
+ grpc_core::Notification client_signal;
+ auto cf_engine = std::make_shared();
+
+ ChannelArgsEndpointConfig config(grpc_core::ChannelArgs().Set(
+ GRPC_ARG_RESOURCE_QUOTA, grpc_core::ResourceQuota::Default()));
+ auto conn_handle = cf_engine->Connect(
+ [&client_signal](auto endpoint) {
+ EXPECT_EQ(endpoint.status().code(), absl::StatusCode::kCancelled);
+ client_signal.Notify();
+ },
+ *resolved_addr, config, memory_quota.CreateMemoryAllocator("conn1"), 1h);
+
+ cf_engine->CancelConnect(conn_handle);
+ client_signal.WaitForNotification();
+}
+
+} // namespace experimental
+} // namespace grpc_event_engine
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ grpc_init();
+ int status = RUN_ALL_TESTS();
+ grpc_shutdown();
+ return status;
+}
+
+#else // not GPR_APPLE
+int main(int /* argc */, char** /* argv */) { return 0; }
+#endif
diff --git a/test/core/event_engine/posix/BUILD b/test/core/event_engine/posix/BUILD
index 0d8ee42474b..6576c2157eb 100644
--- a/test/core/event_engine/posix/BUILD
+++ b/test/core/event_engine/posix/BUILD
@@ -224,6 +224,7 @@ grpc_cc_test(
external_deps = ["gtest"],
language = "C++",
tags = [
+ "no_mac",
"no_windows",
],
uses_event_engine = True,
diff --git a/test/core/event_engine/test_suite/BUILD b/test/core/event_engine/test_suite/BUILD
index 620380bd94c..8e8592896cf 100644
--- a/test/core/event_engine/test_suite/BUILD
+++ b/test/core/event_engine/test_suite/BUILD
@@ -40,6 +40,7 @@ grpc_cc_test(
name = "posix_event_engine_test",
srcs = ["posix_event_engine_test.cc"],
tags = [
+ "no_mac",
"no_windows",
],
uses_event_engine = True,
@@ -98,9 +99,11 @@ grpc_cc_test(
"no_linux",
"no_windows",
],
- uses_polling = False,
+ uses_polling = True,
deps = [
"//src/core:cf_event_engine",
+ "//test/core/event_engine/test_suite/posix:oracle_event_engine_posix",
+ "//test/core/event_engine/test_suite/tests:client",
"//test/core/event_engine/test_suite/tests:timer",
],
)
diff --git a/test/core/event_engine/test_suite/cf_event_engine_test.cc b/test/core/event_engine/test_suite/cf_event_engine_test.cc
index 1f3c800902c..1d222514cb6 100644
--- a/test/core/event_engine/test_suite/cf_event_engine_test.cc
+++ b/test/core/event_engine/test_suite/cf_event_engine_test.cc
@@ -19,6 +19,8 @@
#include "src/core/lib/event_engine/cf_engine/cf_engine.h"
#include "test/core/event_engine/test_suite/event_engine_test_framework.h"
+#include "test/core/event_engine/test_suite/posix/oracle_event_engine_posix.h"
+#include "test/core/event_engine/test_suite/tests/client_test.h"
#include "test/core/event_engine/test_suite/tests/timer_test.h"
#include "test/core/util/test_config.h"
@@ -28,8 +30,15 @@ int main(int argc, char** argv) {
auto factory = []() {
return std::make_unique();
};
- SetEventEngineFactories(factory, factory);
+ auto oracle_factory = []() {
+ return std::make_unique<
+ grpc_event_engine::experimental::PosixOracleEventEngine>();
+ };
+ SetEventEngineFactories(factory, oracle_factory);
grpc_event_engine::experimental::InitTimerTests();
+ grpc_event_engine::experimental::InitClientTests();
+ // TODO(vigneshbabu): remove when the experiment is over
+ grpc_core::ForceEnableExperiment("event_engine_client", true);
// TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
// until we clear out the iomgr shutdown code.
grpc_init();
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 91d92daee1a..eaed279d562 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -2052,6 +2052,11 @@ src/core/lib/debug/stats_data.cc \
src/core/lib/debug/stats_data.h \
src/core/lib/debug/trace.cc \
src/core/lib/debug/trace.h \
+src/core/lib/event_engine/cf_engine/cf_engine.cc \
+src/core/lib/event_engine/cf_engine/cf_engine.h \
+src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc \
+src/core/lib/event_engine/cf_engine/cfstream_endpoint.h \
+src/core/lib/event_engine/cf_engine/cftype_unique_ref.h \
src/core/lib/event_engine/channel_args_endpoint_config.cc \
src/core/lib/event_engine/channel_args_endpoint_config.h \
src/core/lib/event_engine/common_closures.h \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 310e775884f..b54798074d0 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -1830,6 +1830,11 @@ src/core/lib/debug/stats_data.cc \
src/core/lib/debug/stats_data.h \
src/core/lib/debug/trace.cc \
src/core/lib/debug/trace.h \
+src/core/lib/event_engine/cf_engine/cf_engine.cc \
+src/core/lib/event_engine/cf_engine/cf_engine.h \
+src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc \
+src/core/lib/event_engine/cf_engine/cfstream_endpoint.h \
+src/core/lib/event_engine/cf_engine/cftype_unique_ref.h \
src/core/lib/event_engine/channel_args_endpoint_config.cc \
src/core/lib/event_engine/channel_args_endpoint_config.h \
src/core/lib/event_engine/common_closures.h \
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index e8eed2dc675..60440ebed29 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -1345,6 +1345,28 @@
],
"uses_polling": true
},
+ {
+ "args": [],
+ "benchmark": false,
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": true,
+ "language": "c++",
+ "name": "cf_engine_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix"
+ ],
+ "uses_polling": true
+ },
{
"args": [],
"benchmark": false,
@@ -1365,7 +1387,7 @@
"mac",
"posix"
],
- "uses_polling": false
+ "uses_polling": true
},
{
"args": [],
@@ -5546,7 +5568,6 @@
"benchmark": false,
"ci_platforms": [
"linux",
- "mac",
"posix"
],
"cpu_cost": 1.0,
@@ -5558,7 +5579,6 @@
"name": "posix_event_engine_connect_test",
"platforms": [
"linux",
- "mac",
"posix"
],
"uses_polling": true
@@ -5568,7 +5588,6 @@
"benchmark": false,
"ci_platforms": [
"linux",
- "mac",
"posix"
],
"cpu_cost": 1.0,
@@ -5580,7 +5599,6 @@
"name": "posix_event_engine_test",
"platforms": [
"linux",
- "mac",
"posix"
],
"uses_polling": true