From ad2a5dd35510a41480c5cd7c6f5603a2fcb2a1f7 Mon Sep 17 00:00:00 2001 From: Hannah Shi Date: Thu, 11 May 2023 08:13:41 -0700 Subject: [PATCH] [ObjC] Cf event engine client (#33034) Added `//:gpr_platform` to cf_engine_test to fix build_cleaner check in the previous merge. More details in https://github.com/grpc/grpc/pull/33027 --- BUILD | 22 +- CMakeLists.txt | 61 ++- Makefile | 4 + build_autogenerated.yaml | 42 ++- config.m4 | 3 + config.w32 | 3 + gRPC-C++.podspec | 6 + gRPC-Core.podspec | 8 + grpc.gemspec | 5 + grpc.gyp | 6 + package.xml | 5 + src/core/BUILD | 35 +- .../lib/event_engine/cf_engine/cf_engine.cc | 96 ++++- .../lib/event_engine/cf_engine/cf_engine.h | 17 +- .../cf_engine/cfstream_endpoint.cc | 354 ++++++++++++++++++ .../cf_engine/cfstream_endpoint.h | 146 ++++++++ .../cf_engine/cftype_unique_ref.h | 79 ++++ .../default_event_engine_factory.cc | 16 +- src/core/lib/iomgr/cfstream_handle.cc | 2 +- src/core/lib/iomgr/endpoint_cfstream.cc | 18 +- src/core/lib/iomgr/ev_apple.cc | 24 +- src/core/lib/iomgr/iomgr_posix_cfstream.cc | 2 +- src/core/lib/iomgr/tcp_client_cfstream.cc | 17 +- src/python/grpcio/grpc_core_dependencies.py | 2 + test/core/event_engine/cf/BUILD | 34 ++ test/core/event_engine/cf/cf_engine_test.cc | 96 +++++ test/core/event_engine/posix/BUILD | 1 + test/core/event_engine/test_suite/BUILD | 5 +- .../test_suite/cf_event_engine_test.cc | 11 +- tools/doxygen/Doxyfile.c++.internal | 5 + tools/doxygen/Doxyfile.core.internal | 5 + tools/run_tests/generated/tests.json | 28 +- 32 files changed, 1093 insertions(+), 65 deletions(-) create mode 100644 src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc create mode 100644 src/core/lib/event_engine/cf_engine/cfstream_endpoint.h create mode 100644 src/core/lib/event_engine/cf_engine/cftype_unique_ref.h create mode 100644 test/core/event_engine/cf/BUILD create mode 100644 test/core/event_engine/cf/cf_engine_test.cc diff --git a/BUILD b/BUILD index ab6cdc11d98..22966d7268a 100644 --- a/BUILD +++ b/BUILD @@ -86,11 +86,26 @@ config_setting( values = {"crosstool_top": "//external:android/crosstool"}, ) +config_setting( + name = "macos", + values = {"apple_platform_type": "macos"}, +) + config_setting( name = "ios", values = {"apple_platform_type": "ios"}, ) +config_setting( + name = "tvos", + values = {"apple_platform_type": "tvos"}, +) + +config_setting( + name = "watchos", + values = {"apple_platform_type": "watchos"}, +) + config_setting( name = "systemd", values = {"define": "use_systemd=true"}, @@ -174,10 +189,15 @@ config_setting( ) config_setting( - name = "mac_x86_64", + name = "mac", values = {"cpu": "darwin"}, ) +config_setting( + name = "mac_x86_64", + values = {"cpu": "darwin_x86_64"}, +) + config_setting( name = "mac_arm64", values = {"cpu": "darwin_arm64"}, diff --git a/CMakeLists.txt b/CMakeLists.txt index 7546d63d75d..b0f940c7d65 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -863,6 +863,9 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx cel_authorization_engine_test) add_dependencies(buildtests_cxx certificate_provider_registry_test) add_dependencies(buildtests_cxx certificate_provider_store_test) + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + add_dependencies(buildtests_cxx cf_engine_test) + endif() if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx cf_event_engine_test) endif() @@ -1107,10 +1110,10 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx posix_engine_listener_utils_test) endif() - if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx posix_event_engine_connect_test) endif() - if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx posix_event_engine_test) endif() add_dependencies(buildtests_cxx prioritized_race_test) @@ -2047,6 +2050,8 @@ add_library(grpc src/core/lib/debug/stats.cc src/core/lib/debug/stats_data.cc src/core/lib/debug/trace.cc + src/core/lib/event_engine/cf_engine/cf_engine.cc + src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc src/core/lib/event_engine/channel_args_endpoint_config.cc src/core/lib/event_engine/default_event_engine.cc src/core/lib/event_engine/default_event_engine_factory.cc @@ -2745,6 +2750,8 @@ add_library(grpc_unsecure src/core/lib/debug/stats.cc src/core/lib/debug/stats_data.cc src/core/lib/debug/trace.cc + src/core/lib/event_engine/cf_engine/cf_engine.cc + src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc src/core/lib/event_engine/channel_args_endpoint_config.cc src/core/lib/event_engine/default_event_engine.cc src/core/lib/event_engine/default_event_engine_factory.cc @@ -4275,6 +4282,8 @@ add_library(grpc_authorization_provider src/core/lib/debug/stats.cc src/core/lib/debug/stats_data.cc src/core/lib/debug/trace.cc + src/core/lib/event_engine/cf_engine/cf_engine.cc + src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc src/core/lib/event_engine/channel_args_endpoint_config.cc src/core/lib/event_engine/default_event_engine.cc src/core/lib/event_engine/default_event_engine_factory.cc @@ -7470,15 +7479,55 @@ target_link_libraries(certificate_provider_store_test ) +endif() +if(gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + + add_executable(cf_engine_test + test/core/event_engine/cf/cf_engine_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc + ) + target_compile_features(cf_engine_test PUBLIC cxx_std_14) + target_include_directories(cf_engine_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} + ) + + target_link_libraries(cf_engine_test + ${_gRPC_BASELIB_LIBRARIES} + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ZLIB_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + ) + + +endif() endif() if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_executable(cf_event_engine_test - src/core/lib/event_engine/cf_engine/cf_engine.cc test/core/event_engine/event_engine_test_utils.cc test/core/event_engine/test_suite/cf_event_engine_test.cc test/core/event_engine/test_suite/event_engine_test_framework.cc + test/core/event_engine/test_suite/posix/oracle_event_engine_posix.cc + test/core/event_engine/test_suite/tests/client_test.cc test/core/event_engine/test_suite/tests/timer_test.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc @@ -11464,6 +11513,8 @@ add_executable(frame_test src/core/lib/debug/stats.cc src/core/lib/debug/stats_data.cc src/core/lib/debug/trace.cc + src/core/lib/event_engine/cf_engine/cf_engine.cc + src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc src/core/lib/event_engine/channel_args_endpoint_config.cc src/core/lib/event_engine/default_event_engine.cc src/core/lib/event_engine/default_event_engine_factory.cc @@ -16666,7 +16717,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) endif() endif() if(gRPC_BUILD_TESTS) -if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) add_executable(posix_event_engine_connect_test test/core/event_engine/event_engine_test_utils.cc @@ -16709,7 +16760,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) endif() endif() if(gRPC_BUILD_TESTS) -if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) add_executable(posix_event_engine_test test/core/event_engine/event_engine_test_utils.cc diff --git a/Makefile b/Makefile index 6a3791ae0ce..1dde9ff0511 100644 --- a/Makefile +++ b/Makefile @@ -1432,6 +1432,8 @@ LIBGRPC_SRC = \ src/core/lib/debug/stats.cc \ src/core/lib/debug/stats_data.cc \ src/core/lib/debug/trace.cc \ + src/core/lib/event_engine/cf_engine/cf_engine.cc \ + src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc \ src/core/lib/event_engine/channel_args_endpoint_config.cc \ src/core/lib/event_engine/default_event_engine.cc \ src/core/lib/event_engine/default_event_engine_factory.cc \ @@ -1984,6 +1986,8 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/debug/stats.cc \ src/core/lib/debug/stats_data.cc \ src/core/lib/debug/trace.cc \ + src/core/lib/event_engine/cf_engine/cf_engine.cc \ + src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc \ src/core/lib/event_engine/channel_args_endpoint_config.cc \ src/core/lib/event_engine/default_event_engine.cc \ src/core/lib/event_engine/default_event_engine_factory.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 5ed94d2f898..f723640d935 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -676,6 +676,9 @@ libs: - src/core/lib/debug/stats.h - src/core/lib/debug/stats_data.h - src/core/lib/debug/trace.h + - src/core/lib/event_engine/cf_engine/cf_engine.h + - src/core/lib/event_engine/cf_engine/cfstream_endpoint.h + - src/core/lib/event_engine/cf_engine/cftype_unique_ref.h - src/core/lib/event_engine/channel_args_endpoint_config.h - src/core/lib/event_engine/common_closures.h - src/core/lib/event_engine/default_event_engine.h @@ -1476,6 +1479,8 @@ libs: - src/core/lib/debug/stats.cc - src/core/lib/debug/stats_data.cc - src/core/lib/debug/trace.cc + - src/core/lib/event_engine/cf_engine/cf_engine.cc + - src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc - src/core/lib/event_engine/channel_args_endpoint_config.cc - src/core/lib/event_engine/default_event_engine.cc - src/core/lib/event_engine/default_event_engine_factory.cc @@ -2046,6 +2051,9 @@ libs: - src/core/lib/debug/stats.h - src/core/lib/debug/stats_data.h - src/core/lib/debug/trace.h + - src/core/lib/event_engine/cf_engine/cf_engine.h + - src/core/lib/event_engine/cf_engine/cfstream_endpoint.h + - src/core/lib/event_engine/cf_engine/cftype_unique_ref.h - src/core/lib/event_engine/channel_args_endpoint_config.h - src/core/lib/event_engine/common_closures.h - src/core/lib/event_engine/default_event_engine.h @@ -2458,6 +2466,8 @@ libs: - src/core/lib/debug/stats.cc - src/core/lib/debug/stats_data.cc - src/core/lib/debug/trace.cc + - src/core/lib/event_engine/cf_engine/cf_engine.cc + - src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc - src/core/lib/event_engine/channel_args_endpoint_config.cc - src/core/lib/event_engine/default_event_engine.cc - src/core/lib/event_engine/default_event_engine_factory.cc @@ -3536,6 +3546,9 @@ libs: - src/core/lib/debug/stats.h - src/core/lib/debug/stats_data.h - src/core/lib/debug/trace.h + - src/core/lib/event_engine/cf_engine/cf_engine.h + - src/core/lib/event_engine/cf_engine/cfstream_endpoint.h + - src/core/lib/event_engine/cf_engine/cftype_unique_ref.h - src/core/lib/event_engine/channel_args_endpoint_config.h - src/core/lib/event_engine/common_closures.h - src/core/lib/event_engine/default_event_engine.h @@ -3827,6 +3840,8 @@ libs: - src/core/lib/debug/stats.cc - src/core/lib/debug/stats_data.cc - src/core/lib/debug/trace.cc + - src/core/lib/event_engine/cf_engine/cf_engine.cc + - src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc - src/core/lib/event_engine/channel_args_endpoint_config.cc - src/core/lib/event_engine/default_event_engine.cc - src/core/lib/event_engine/default_event_engine_factory.cc @@ -5440,20 +5455,35 @@ targets: - test/core/xds/certificate_provider_store_test.cc deps: - grpc_test_util +- name: cf_engine_test + gtest: true + build: test + language: c++ + headers: [] + src: + - test/core/event_engine/cf/cf_engine_test.cc + deps: + - grpc_test_util + platforms: + - linux + - posix + - mac - name: cf_event_engine_test gtest: true build: test language: c++ headers: - - src/core/lib/event_engine/cf_engine/cf_engine.h - test/core/event_engine/event_engine_test_utils.h - test/core/event_engine/test_suite/event_engine_test_framework.h + - test/core/event_engine/test_suite/posix/oracle_event_engine_posix.h + - test/core/event_engine/test_suite/tests/client_test.h - test/core/event_engine/test_suite/tests/timer_test.h src: - - src/core/lib/event_engine/cf_engine/cf_engine.cc - test/core/event_engine/event_engine_test_utils.cc - test/core/event_engine/test_suite/cf_event_engine_test.cc - test/core/event_engine/test_suite/event_engine_test_framework.cc + - test/core/event_engine/test_suite/posix/oracle_event_engine_posix.cc + - test/core/event_engine/test_suite/tests/client_test.cc - test/core/event_engine/test_suite/tests/timer_test.cc deps: - grpc_unsecure @@ -5462,7 +5492,6 @@ targets: - linux - posix - mac - uses_polling: false - name: cfstream_test gtest: true build: test @@ -7450,6 +7479,9 @@ targets: - src/core/lib/debug/stats.h - src/core/lib/debug/stats_data.h - src/core/lib/debug/trace.h + - src/core/lib/event_engine/cf_engine/cf_engine.h + - src/core/lib/event_engine/cf_engine/cfstream_endpoint.h + - src/core/lib/event_engine/cf_engine/cftype_unique_ref.h - src/core/lib/event_engine/channel_args_endpoint_config.h - src/core/lib/event_engine/common_closures.h - src/core/lib/event_engine/default_event_engine.h @@ -7723,6 +7755,8 @@ targets: - src/core/lib/debug/stats.cc - src/core/lib/debug/stats_data.cc - src/core/lib/debug/trace.cc + - src/core/lib/event_engine/cf_engine/cf_engine.cc + - src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc - src/core/lib/event_engine/channel_args_endpoint_config.cc - src/core/lib/event_engine/default_event_engine.cc - src/core/lib/event_engine/default_event_engine_factory.cc @@ -10128,7 +10162,6 @@ targets: platforms: - linux - posix - - mac - name: posix_event_engine_test gtest: true build: test @@ -10154,7 +10187,6 @@ targets: platforms: - linux - posix - - mac - name: prioritized_race_test gtest: true build: test diff --git a/config.m4 b/config.m4 index 93493c33f11..058372d3705 100644 --- a/config.m4 +++ b/config.m4 @@ -514,6 +514,8 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/debug/stats.cc \ src/core/lib/debug/stats_data.cc \ src/core/lib/debug/trace.cc \ + src/core/lib/event_engine/cf_engine/cf_engine.cc \ + src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc \ src/core/lib/event_engine/channel_args_endpoint_config.cc \ src/core/lib/event_engine/default_event_engine.cc \ src/core/lib/event_engine/default_event_engine_factory.cc \ @@ -1458,6 +1460,7 @@ if test "$PHP_GRPC" != "no"; then PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/config) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/debug) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine) + PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine/cf_engine) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine/posix_engine) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine/thread_pool) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/event_engine/windows) diff --git a/config.w32 b/config.w32 index 2c38b5d04b7..e921e465fc0 100644 --- a/config.w32 +++ b/config.w32 @@ -479,6 +479,8 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\debug\\stats.cc " + "src\\core\\lib\\debug\\stats_data.cc " + "src\\core\\lib\\debug\\trace.cc " + + "src\\core\\lib\\event_engine\\cf_engine\\cf_engine.cc " + + "src\\core\\lib\\event_engine\\cf_engine\\cfstream_endpoint.cc " + "src\\core\\lib\\event_engine\\channel_args_endpoint_config.cc " + "src\\core\\lib\\event_engine\\default_event_engine.cc " + "src\\core\\lib\\event_engine\\default_event_engine_factory.cc " + @@ -1590,6 +1592,7 @@ if (PHP_GRPC != "no") { FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\config"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\debug"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine\\cf_engine"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine\\posix_engine"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine\\thread_pool"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\event_engine\\windows"); diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 504de1898b8..516786acfa4 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -746,6 +746,9 @@ Pod::Spec.new do |s| 'src/core/lib/debug/stats.h', 'src/core/lib/debug/stats_data.h', 'src/core/lib/debug/trace.h', + 'src/core/lib/event_engine/cf_engine/cf_engine.h', + 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.h', + 'src/core/lib/event_engine/cf_engine/cftype_unique_ref.h', 'src/core/lib/event_engine/channel_args_endpoint_config.h', 'src/core/lib/event_engine/common_closures.h', 'src/core/lib/event_engine/default_event_engine.h', @@ -1779,6 +1782,9 @@ Pod::Spec.new do |s| 'src/core/lib/debug/stats.h', 'src/core/lib/debug/stats_data.h', 'src/core/lib/debug/trace.h', + 'src/core/lib/event_engine/cf_engine/cf_engine.h', + 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.h', + 'src/core/lib/event_engine/cf_engine/cftype_unique_ref.h', 'src/core/lib/event_engine/channel_args_endpoint_config.h', 'src/core/lib/event_engine/common_closures.h', 'src/core/lib/event_engine/default_event_engine.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 3cb76b1d17c..22e82789fbc 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1150,6 +1150,11 @@ Pod::Spec.new do |s| 'src/core/lib/debug/stats_data.h', 'src/core/lib/debug/trace.cc', 'src/core/lib/debug/trace.h', + 'src/core/lib/event_engine/cf_engine/cf_engine.cc', + 'src/core/lib/event_engine/cf_engine/cf_engine.h', + 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc', + 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.h', + 'src/core/lib/event_engine/cf_engine/cftype_unique_ref.h', 'src/core/lib/event_engine/channel_args_endpoint_config.cc', 'src/core/lib/event_engine/channel_args_endpoint_config.h', 'src/core/lib/event_engine/common_closures.h', @@ -2502,6 +2507,9 @@ Pod::Spec.new do |s| 'src/core/lib/debug/stats.h', 'src/core/lib/debug/stats_data.h', 'src/core/lib/debug/trace.h', + 'src/core/lib/event_engine/cf_engine/cf_engine.h', + 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.h', + 'src/core/lib/event_engine/cf_engine/cftype_unique_ref.h', 'src/core/lib/event_engine/channel_args_endpoint_config.h', 'src/core/lib/event_engine/common_closures.h', 'src/core/lib/event_engine/default_event_engine.h', diff --git a/grpc.gemspec b/grpc.gemspec index efff2618f5c..2455d7bd126 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1056,6 +1056,11 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/debug/stats_data.h ) s.files += %w( src/core/lib/debug/trace.cc ) s.files += %w( src/core/lib/debug/trace.h ) + s.files += %w( src/core/lib/event_engine/cf_engine/cf_engine.cc ) + s.files += %w( src/core/lib/event_engine/cf_engine/cf_engine.h ) + s.files += %w( src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc ) + s.files += %w( src/core/lib/event_engine/cf_engine/cfstream_endpoint.h ) + s.files += %w( src/core/lib/event_engine/cf_engine/cftype_unique_ref.h ) s.files += %w( src/core/lib/event_engine/channel_args_endpoint_config.cc ) s.files += %w( src/core/lib/event_engine/channel_args_endpoint_config.h ) s.files += %w( src/core/lib/event_engine/common_closures.h ) diff --git a/grpc.gyp b/grpc.gyp index 6aca97209e5..749984e59bb 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -735,6 +735,8 @@ 'src/core/lib/debug/stats.cc', 'src/core/lib/debug/stats_data.cc', 'src/core/lib/debug/trace.cc', + 'src/core/lib/event_engine/cf_engine/cf_engine.cc', + 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc', 'src/core/lib/event_engine/channel_args_endpoint_config.cc', 'src/core/lib/event_engine/default_event_engine.cc', 'src/core/lib/event_engine/default_event_engine_factory.cc', @@ -1226,6 +1228,8 @@ 'src/core/lib/debug/stats.cc', 'src/core/lib/debug/stats_data.cc', 'src/core/lib/debug/trace.cc', + 'src/core/lib/event_engine/cf_engine/cf_engine.cc', + 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc', 'src/core/lib/event_engine/channel_args_endpoint_config.cc', 'src/core/lib/event_engine/default_event_engine.cc', 'src/core/lib/event_engine/default_event_engine_factory.cc', @@ -1741,6 +1745,8 @@ 'src/core/lib/debug/stats.cc', 'src/core/lib/debug/stats_data.cc', 'src/core/lib/debug/trace.cc', + 'src/core/lib/event_engine/cf_engine/cf_engine.cc', + 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc', 'src/core/lib/event_engine/channel_args_endpoint_config.cc', 'src/core/lib/event_engine/default_event_engine.cc', 'src/core/lib/event_engine/default_event_engine_factory.cc', diff --git a/package.xml b/package.xml index 0ac426b684b..c3d78e43d21 100644 --- a/package.xml +++ b/package.xml @@ -1038,6 +1038,11 @@ + + + + + diff --git a/src/core/BUILD b/src/core/BUILD index a53c9117b85..f13ccda5998 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -2097,17 +2097,33 @@ grpc_cc_library( grpc_cc_library( name = "cf_event_engine", - srcs = ["lib/event_engine/cf_engine/cf_engine.cc"], - hdrs = ["lib/event_engine/cf_engine/cf_engine.h"], + srcs = [ + "lib/event_engine/cf_engine/cf_engine.cc", + "lib/event_engine/cf_engine/cfstream_endpoint.cc", + ], + hdrs = [ + "lib/event_engine/cf_engine/cf_engine.h", + "lib/event_engine/cf_engine/cfstream_endpoint.h", + "lib/event_engine/cf_engine/cftype_unique_ref.h", + ], + external_deps = ["absl/strings:str_format"], deps = [ "event_engine_common", + "event_engine_tcp_socket_utils", "event_engine_thread_pool", "event_engine_trace", "event_engine_utils", "init_internally", + "posix_event_engine_closure", + "posix_event_engine_event_poller", + "posix_event_engine_lockfree_event", "posix_event_engine_timer_manager", + "ref_counted", + "strerror", "//:event_engine_base_hdrs", "//:gpr", + "//:ref_counted_ptr", + "//:sockaddr_utils", ], ) @@ -2180,6 +2196,21 @@ grpc_cc_library( "//:windows": ["windows_event_engine"], "//:windows_msvc": ["windows_event_engine"], "//:windows_other": ["windows_event_engine"], + "//:mac": [ + "posix_event_engine", + "cf_event_engine", + ], + "//:mac_x86_64": [ + "posix_event_engine", + "cf_event_engine", + ], + "//:mac_arm64": [ + "posix_event_engine", + "cf_event_engine", + ], + "//:ios": ["cf_event_engine"], + "//:tvos": ["cf_event_engine"], + "//:watchos": ["cf_event_engine"], "//conditions:default": ["posix_event_engine"], }], deps = [ diff --git a/src/core/lib/event_engine/cf_engine/cf_engine.cc b/src/core/lib/event_engine/cf_engine/cf_engine.cc index 86b368f9910..ecd5e4fe218 100644 --- a/src/core/lib/event_engine/cf_engine/cf_engine.cc +++ b/src/core/lib/event_engine/cf_engine/cf_engine.cc @@ -1,4 +1,4 @@ -// Copyright 2022 The gRPC Authors +// Copyright 2023 The gRPC Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,10 +16,14 @@ #ifdef GPR_APPLE +#include + #include #include "src/core/lib/event_engine/cf_engine/cf_engine.h" +#include "src/core/lib/event_engine/cf_engine/cfstream_endpoint.h" #include "src/core/lib/event_engine/posix_engine/timer_manager.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/event_engine/thread_pool/thread_pool.h" #include "src/core/lib/event_engine/trace.h" #include "src/core/lib/event_engine/utils.h" @@ -38,7 +42,7 @@ struct CFEventEngine::Closure final : public EventEngine::Closure { GRPC_EVENT_ENGINE_TRACE("CFEventEngine:%p executing callback:%s", engine, HandleToString(handle).c_str()); { - grpc_core::MutexLock lock(&engine->mu_); + grpc_core::MutexLock lock(&engine->task_mu_); engine->known_handles_.erase(handle); } cb(); @@ -53,7 +57,7 @@ CFEventEngine::CFEventEngine() CFEventEngine::~CFEventEngine() { { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&task_mu_); if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { for (auto handle : known_handles_) { gpr_log(GPR_ERROR, @@ -77,14 +81,76 @@ CFEventEngine::CreateListener( } CFEventEngine::ConnectionHandle CFEventEngine::Connect( - OnConnectCallback /* on_connect */, const ResolvedAddress& /* addr */, - const EndpointConfig& /* args */, MemoryAllocator /* memory_allocator */, - Duration /* timeout */) { - grpc_core::Crash("unimplemented"); + OnConnectCallback on_connect, const ResolvedAddress& addr, + const EndpointConfig& /* args */, MemoryAllocator memory_allocator, + Duration timeout) { + auto endpoint_ptr = new CFStreamEndpoint( + std::static_pointer_cast(shared_from_this()), + std::move(memory_allocator)); + + ConnectionHandle handle{reinterpret_cast(endpoint_ptr), 0}; + { + grpc_core::MutexLock lock(&conn_mu_); + conn_handles_.insert(handle); + } + + auto deadline_timer = + RunAfter(timeout, [handle, that = std::static_pointer_cast( + shared_from_this())]() { + that->CancelConnectInternal( + handle, absl::DeadlineExceededError("Connect timed out")); + }); + + auto on_connect2 = + [that = std::static_pointer_cast(shared_from_this()), + deadline_timer, handle, + on_connect = std::move(on_connect)](absl::Status status) mutable { + // best effort canceling deadline timer + that->Cancel(deadline_timer); + + { + grpc_core::MutexLock lock(&that->conn_mu_); + that->conn_handles_.erase(handle); + } + + auto endpoint_ptr = reinterpret_cast(handle.keys[0]); + + if (!status.ok()) { + on_connect(std::move(status)); + delete endpoint_ptr; + return; + } + + on_connect(std::unique_ptr(endpoint_ptr)); + }; + + endpoint_ptr->Connect(std::move(on_connect2), addr); + + return handle; } -bool CFEventEngine::CancelConnect(ConnectionHandle /* handle */) { - grpc_core::Crash("unimplemented"); +bool CFEventEngine::CancelConnect(ConnectionHandle handle) { + CancelConnectInternal(handle, absl::CancelledError("CancelConnect")); + // on_connect will always be called, even if cancellation is successful + return false; +} + +bool CFEventEngine::CancelConnectInternal(ConnectionHandle handle, + absl::Status status) { + grpc_core::MutexLock lock(&conn_mu_); + + if (!conn_handles_.contains(handle)) { + GRPC_EVENT_ENGINE_TRACE( + "Unknown connection handle: %s", + HandleToString(handle).c_str()); + return false; + } + conn_handles_.erase(handle); + + // keep the `conn_mu_` lock to prevent endpoint_ptr from being deleted + + auto endpoint_ptr = reinterpret_cast(handle.keys[0]); + return endpoint_ptr->CancelConnect(status); } bool CFEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); } @@ -94,12 +160,12 @@ std::unique_ptr CFEventEngine::GetDNSResolver( grpc_core::Crash("unimplemented"); } -void CFEventEngine::Run(EventEngine::Closure* /* closure */) { - grpc_core::Crash("unimplemented"); +void CFEventEngine::Run(EventEngine::Closure* closure) { + thread_pool_->Run(closure); } -void CFEventEngine::Run(absl::AnyInvocable /* closure */) { - grpc_core::Crash("unimplemented"); +void CFEventEngine::Run(absl::AnyInvocable closure) { + thread_pool_->Run(std::move(closure)); } EventEngine::TaskHandle CFEventEngine::RunAfter(Duration when, @@ -113,7 +179,7 @@ EventEngine::TaskHandle CFEventEngine::RunAfter( } bool CFEventEngine::Cancel(TaskHandle handle) { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&task_mu_); if (!known_handles_.contains(handle)) return false; auto* cd = reinterpret_cast(handle.keys[0]); bool r = timer_manager_.TimerCancel(&cd->timer); @@ -130,7 +196,7 @@ EventEngine::TaskHandle CFEventEngine::RunAfterInternal( cd->engine = this; EventEngine::TaskHandle handle{reinterpret_cast(cd), aba_token_.fetch_add(1)}; - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&task_mu_); known_handles_.insert(handle); cd->handle = handle; GRPC_EVENT_ENGINE_TRACE("CFEventEngine:%p scheduling callback:%s", this, diff --git a/src/core/lib/event_engine/cf_engine/cf_engine.h b/src/core/lib/event_engine/cf_engine/cf_engine.h index 7b9a7f4a804..3e55ce85616 100644 --- a/src/core/lib/event_engine/cf_engine/cf_engine.h +++ b/src/core/lib/event_engine/cf_engine/cf_engine.h @@ -1,4 +1,4 @@ -// Copyright 2022 The gRPC Authors +// Copyright 2023 The gRPC Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,6 +20,9 @@ #include #include "src/core/lib/event_engine/handle_containers.h" +#include "src/core/lib/event_engine/posix_engine/event_poller.h" +#include "src/core/lib/event_engine/posix_engine/lockfree_event.h" +#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" #include "src/core/lib/event_engine/posix_engine/timer_manager.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/surface/init_internally.h" @@ -28,6 +31,7 @@ namespace grpc_event_engine { namespace experimental { class CFEventEngine : public EventEngine, + public Scheduler, public grpc_core::KeepsGrpcInitialized { public: CFEventEngine(); @@ -60,9 +64,16 @@ class CFEventEngine : public EventEngine, struct Closure; EventEngine::TaskHandle RunAfterInternal(Duration when, absl::AnyInvocable cb); - grpc_core::Mutex mu_; - TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_); + + bool CancelConnectInternal(ConnectionHandle handle, absl::Status status); + + grpc_core::Mutex task_mu_; + TaskHandleSet known_handles_ ABSL_GUARDED_BY(task_mu_); std::atomic aba_token_{0}; + + grpc_core::Mutex conn_mu_; + ConnectionHandleSet conn_handles_ ABSL_GUARDED_BY(conn_mu_); + std::shared_ptr thread_pool_; TimerManager timer_manager_; }; diff --git a/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc new file mode 100644 index 00000000000..f77a626d7f5 --- /dev/null +++ b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc @@ -0,0 +1,354 @@ +// Copyright 2023 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#ifdef GPR_APPLE + +#include "src/core/lib/event_engine/cf_engine/cfstream_endpoint.h" +#include "src/core/lib/event_engine/trace.h" +#include "src/core/lib/gprpp/strerror.h" + +namespace grpc_event_engine { +namespace experimental { + +namespace { + +int kDefaultReadBufferSize = 8192; + +absl::Status CFErrorToStatus(CFTypeUniqueRef cf_error) { + if (cf_error == nullptr) { + return absl::OkStatus(); + } + CFErrorDomain cf_domain = CFErrorGetDomain((cf_error)); + CFIndex code = CFErrorGetCode((cf_error)); + CFTypeUniqueRef cf_desc = CFErrorCopyDescription((cf_error)); + char domain_buf[256]; + char desc_buf[256]; + CFStringGetCString(cf_domain, domain_buf, 256, kCFStringEncodingUTF8); + CFStringGetCString(cf_desc, desc_buf, 256, kCFStringEncodingUTF8); + return absl::Status(absl::StatusCode::kUnknown, + absl::StrFormat("(domain:%s, code:%ld, description:%s)", + domain_buf, code, desc_buf)); +} + +absl::StatusOr CFReadStreamLocallAddress( + CFReadStreamRef stream) { + CFTypeUniqueRef cf_native_handle = static_cast( + CFReadStreamCopyProperty(stream, kCFStreamPropertySocketNativeHandle)); + CFSocketNativeHandle socket; + CFDataGetBytes(cf_native_handle, CFRangeMake(0, sizeof(CFSocketNativeHandle)), + (UInt8*)&socket); + EventEngine::ResolvedAddress addr; + socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES; + if (getsockname(socket, const_cast(addr.address()), &len) < 0) { + return absl::InternalError( + absl::StrCat("getsockname:", grpc_core::StrError(errno))); + } + return EventEngine::ResolvedAddress(addr.address(), len); +} + +} // namespace + +bool CFStreamEndpointImpl::CancelConnect(absl::Status status) { + GRPC_EVENT_ENGINE_ENDPOINT_TRACE( + "CFStreamEndpointImpl::CancelConnect: status: %s, this: %p", + status.ToString().c_str(), this); + + return open_event_.SetShutdown(std::move(status)); +} + +void CFStreamEndpointImpl::Connect( + absl::AnyInvocable on_connect, + EventEngine::ResolvedAddress addr) { + auto addr_uri = ResolvedAddressToURI(addr); + + if (!addr_uri.ok()) { + on_connect(std::move(addr_uri).status()); + return; + } + + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Connect: %s", + addr_uri.value().c_str()); + + peer_address_ = std::move(addr); + auto host_port = ResolvedAddressToNormalizedString(peer_address_); + if (!host_port.ok()) { + on_connect(std::move(host_port).status()); + return; + } + + peer_address_string_ = host_port.value(); + GRPC_EVENT_ENGINE_ENDPOINT_TRACE( + "CFStreamEndpointImpl::Connect, host_port: %s", host_port->c_str()); + + std::string host_string; + std::string port_string; + grpc_core::SplitHostPort(host_port.value(), &host_string, &port_string); + CFStringRef host = CFStringCreateWithCString(NULL, host_string.c_str(), + kCFStringEncodingUTF8); + int port = ResolvedAddressGetPort(peer_address_); + CFStreamCreatePairWithSocketToHost(NULL, host, port, &cf_read_stream_, + &cf_write_stream_); + + CFStreamClientContext cf_context = {0, this, Retain, Release, nullptr}; + CFReadStreamSetClient( + cf_read_stream_, + kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable | + kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, + ReadCallback, &cf_context); + CFWriteStreamSetClient( + cf_write_stream_, + kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes | + kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, + WriteCallback, &cf_context); + CFReadStreamSetDispatchQueue(cf_read_stream_, + dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)); + CFWriteStreamSetDispatchQueue( + cf_write_stream_, dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)); + + if (!CFReadStreamOpen(cf_read_stream_)) { + auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_)); + on_connect(std::move(status)); + return; + } + + if (!CFWriteStreamOpen(cf_write_stream_)) { + auto status = CFErrorToStatus(CFWriteStreamCopyError(cf_write_stream_)); + on_connect(std::move(status)); + return; + } + + open_event_.NotifyOn(new PosixEngineClosure( + [that = Ref(), + on_connect = std::move(on_connect)](absl::Status status) mutable { + if (!status.ok()) { + on_connect(std::move(status)); + return; + } + + auto local_addr = CFReadStreamLocallAddress(that->cf_read_stream_); + if (!local_addr.ok()) { + on_connect(std::move(local_addr).status()); + return; + } + + that->local_address_ = local_addr.value(); + that->local_address_string_ = + *ResolvedAddressToURI(that->local_address_); + on_connect(absl::OkStatus()); + }, + false /* is_permanent */)); +} + +/* static */ void CFStreamEndpointImpl::ReadCallback( + CFReadStreamRef stream, CFStreamEventType type, + void* client_callback_info) { + auto self = static_cast(client_callback_info); + + GRPC_EVENT_ENGINE_ENDPOINT_TRACE( + "CFStreamEndpointImpl::ReadCallback, type: %lu, this: %p", type, self); + + switch (type) { + case kCFStreamEventOpenCompleted: + // wait for write stream open completed to signal connection ready + break; + case kCFStreamEventHasBytesAvailable: + ABSL_FALLTHROUGH_INTENDED; + case kCFStreamEventEndEncountered: + self->read_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: { + auto status = CFErrorToStatus(CFReadStreamCopyError(stream)); + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream Read error: %s", + status.ToString().c_str()); + + self->open_event_.SetShutdown(status); + self->read_event_.SetShutdown(status); + self->write_event_.SetShutdown(status); + } break; + default: + GPR_UNREACHABLE_CODE(return); + } +} + +/* static */ +void CFStreamEndpointImpl::WriteCallback(CFWriteStreamRef stream, + CFStreamEventType type, + void* client_callback_info) { + auto self = static_cast(client_callback_info); + GRPC_EVENT_ENGINE_ENDPOINT_TRACE( + "CFStreamEndpointImpl::WriteCallback, type: %lu, this: %p", type, self); + + switch (type) { + case kCFStreamEventOpenCompleted: + self->open_event_.SetReady(); + break; + case kCFStreamEventCanAcceptBytes: + ABSL_FALLTHROUGH_INTENDED; + case kCFStreamEventEndEncountered: + self->write_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: { + auto status = CFErrorToStatus(CFWriteStreamCopyError(stream)); + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream Write error: %s", + status.ToString().c_str()); + + self->open_event_.SetShutdown(status); + self->read_event_.SetShutdown(status); + self->write_event_.SetShutdown(status); + } break; + default: + GPR_UNREACHABLE_CODE(return); + } +} + +CFStreamEndpointImpl::CFStreamEndpointImpl( + std::shared_ptr engine, MemoryAllocator memory_allocator) + : engine_(std::move(engine)), + memory_allocator_(std::move(memory_allocator)), + open_event_(engine_.get()), + read_event_(engine_.get()), + write_event_(engine_.get()) { + open_event_.InitEvent(); + read_event_.InitEvent(); + write_event_.InitEvent(); +} + +CFStreamEndpointImpl::~CFStreamEndpointImpl() { + open_event_.DestroyEvent(); + read_event_.DestroyEvent(); + write_event_.DestroyEvent(); +} + +void CFStreamEndpointImpl::Shutdown() { + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Shutdown: this: %p", + this); + + auto shutdownStatus = + absl::Status(absl::StatusCode::kUnknown, + absl::StrFormat("Shutting down CFStreamEndpointImpl")); + open_event_.SetShutdown(shutdownStatus); + read_event_.SetShutdown(shutdownStatus); + write_event_.SetShutdown(shutdownStatus); + + CFReadStreamSetClient(cf_read_stream_, kCFStreamEventNone, nullptr, nullptr); + CFWriteStreamSetClient(cf_write_stream_, kCFStreamEventNone, nullptr, + nullptr); + CFReadStreamClose(cf_read_stream_); + CFWriteStreamClose(cf_write_stream_); +} + +bool CFStreamEndpointImpl::Read( + absl::AnyInvocable on_read, SliceBuffer* buffer, + const EventEngine::Endpoint::ReadArgs* /* args */) { + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Read, this: %p", + this); + + read_event_.NotifyOn(new PosixEngineClosure( + [that = Ref(), on_read = std::move(on_read), + buffer](absl::Status status) mutable { + if (status.ok()) { + that->DoRead(std::move(on_read), buffer); + } else { + on_read(status); + } + }, + false /* is_permanent*/)); + + return false; +} + +void CFStreamEndpointImpl::DoRead( + absl::AnyInvocable on_read, SliceBuffer* buffer) { + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::DoRead, this: %p", + this); + + auto buffer_index = buffer->AppendIndexed( + Slice(memory_allocator_.MakeSlice(kDefaultReadBufferSize))); + + CFIndex read_size = CFReadStreamRead( + cf_read_stream_, + internal::SliceCast(buffer->MutableSliceAt(buffer_index)) + .begin(), + kDefaultReadBufferSize); + + if (read_size < 0) { + auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_)); + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream read error: %s, read_size: %ld", + status.ToString().c_str(), read_size); + on_read(status); + return; + } + + buffer->RemoveLastNBytes(buffer->Length() - read_size); + on_read(absl::OkStatus()); +} + +bool CFStreamEndpointImpl::Write( + absl::AnyInvocable on_writable, SliceBuffer* data, + const EventEngine::Endpoint::WriteArgs* /* args */) { + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Write, this: %p", + this); + + write_event_.NotifyOn(new PosixEngineClosure( + [that = Ref(), on_writable = std::move(on_writable), + data](absl::Status status) mutable { + if (status.ok()) { + that->DoWrite(std::move(on_writable), data); + } else { + on_writable(status); + } + }, + false /* is_permanent*/)); + + return false; +} + +void CFStreamEndpointImpl::DoWrite( + absl::AnyInvocable on_writable, SliceBuffer* data) { + GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::DoWrite, this: %p", + this); + + size_t total_written_size = 0; + for (size_t i = 0; i < data->Count(); i++) { + auto slice = data->RefSlice(i); + size_t written_size = + CFWriteStreamWrite(cf_write_stream_, slice.begin(), slice.size()); + + total_written_size += written_size; + if (written_size < slice.size()) { + SliceBuffer written; + data->MoveFirstNBytesIntoSliceBuffer(total_written_size, written); + + write_event_.NotifyOn(new PosixEngineClosure( + [that = Ref(), on_writable = std::move(on_writable), + data](absl::Status status) mutable { + if (status.ok()) { + that->DoWrite(std::move(on_writable), data); + } else { + on_writable(status); + } + }, + false /* is_permanent*/)); + return; + } + } + on_writable(absl::OkStatus()); +} + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GPR_APPLE diff --git a/src/core/lib/event_engine/cf_engine/cfstream_endpoint.h b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.h new file mode 100644 index 00000000000..832efda4412 --- /dev/null +++ b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.h @@ -0,0 +1,146 @@ +// Copyright 2023 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFSTREAM_ENDPOINT_H +#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFSTREAM_ENDPOINT_H +#include + +#ifdef GPR_APPLE + +#include + +#include "absl/strings/str_format.h" + +#include + +#include "src/core/lib/address_utils/sockaddr_utils.h" +#include "src/core/lib/event_engine/cf_engine/cf_engine.h" +#include "src/core/lib/event_engine/cf_engine/cftype_unique_ref.h" +#include "src/core/lib/event_engine/posix_engine/lockfree_event.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" +#include "src/core/lib/gprpp/host_port.h" +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" + +namespace grpc_event_engine { +namespace experimental { + +class CFStreamEndpointImpl + : public grpc_core::RefCounted { + public: + CFStreamEndpointImpl(std::shared_ptr engine, + MemoryAllocator memory_allocator); + ~CFStreamEndpointImpl(); + + void Shutdown(); + + bool Read(absl::AnyInvocable on_read, SliceBuffer* buffer, + const EventEngine::Endpoint::ReadArgs* args); + bool Write(absl::AnyInvocable on_writable, + SliceBuffer* data, const EventEngine::Endpoint::WriteArgs* args); + + const EventEngine::ResolvedAddress& GetPeerAddress() const { + return peer_address_; + } + const EventEngine::ResolvedAddress& GetLocalAddress() const { + return local_address_; + } + + public: + void Connect(absl::AnyInvocable on_connect, + EventEngine::ResolvedAddress addr); + bool CancelConnect(absl::Status status); + + private: + void DoWrite(absl::AnyInvocable on_writable, + SliceBuffer* data); + void DoRead(absl::AnyInvocable on_read, + SliceBuffer* buffer); + + private: + static void* Retain(void* info) { + auto that = static_cast(info); + return that->Ref().release(); + } + + static void Release(void* info) { + auto that = static_cast(info); + that->Unref(); + } + + static void ReadCallback(CFReadStreamRef stream, CFStreamEventType type, + void* client_callback_info); + static void WriteCallback(CFWriteStreamRef stream, CFStreamEventType type, + void* client_callback_info); + + private: + CFTypeUniqueRef cf_read_stream_; + CFTypeUniqueRef cf_write_stream_; + + std::shared_ptr engine_; + + EventEngine::ResolvedAddress peer_address_; + EventEngine::ResolvedAddress local_address_; + std::string peer_address_string_; + std::string local_address_string_; + MemoryAllocator memory_allocator_; + + LockfreeEvent open_event_; + LockfreeEvent read_event_; + LockfreeEvent write_event_; +}; + +class CFStreamEndpoint : public EventEngine::Endpoint { + public: + CFStreamEndpoint(std::shared_ptr engine, + MemoryAllocator memory_allocator) { + impl_ = grpc_core::MakeRefCounted( + std::move(engine), std::move(memory_allocator)); + } + ~CFStreamEndpoint() override { impl_->Shutdown(); } + + bool Read(absl::AnyInvocable on_read, SliceBuffer* buffer, + const ReadArgs* args) override { + return impl_->Read(std::move(on_read), buffer, args); + } + bool Write(absl::AnyInvocable on_writable, + SliceBuffer* data, const WriteArgs* args) override { + return impl_->Write(std::move(on_writable), data, args); + } + + const EventEngine::ResolvedAddress& GetPeerAddress() const override { + return impl_->GetPeerAddress(); + } + const EventEngine::ResolvedAddress& GetLocalAddress() const override { + return impl_->GetLocalAddress(); + } + + public: + void Connect(absl::AnyInvocable on_connect, + EventEngine::ResolvedAddress addr) { + impl_->Connect(std::move(on_connect), std::move(addr)); + } + bool CancelConnect(absl::Status status) { + return impl_->CancelConnect(std::move(status)); + } + + private: + grpc_core::RefCountedPtr impl_; +}; + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GPR_APPLE + +#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFSTREAM_ENDPOINT_H diff --git a/src/core/lib/event_engine/cf_engine/cftype_unique_ref.h b/src/core/lib/event_engine/cf_engine/cftype_unique_ref.h new file mode 100644 index 00000000000..c7915170f62 --- /dev/null +++ b/src/core/lib/event_engine/cf_engine/cftype_unique_ref.h @@ -0,0 +1,79 @@ +// Copyright 2023 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFTYPE_UNIQUE_REF_H +#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFTYPE_UNIQUE_REF_H +#include + +#ifdef GPR_APPLE + +#include + +namespace grpc_event_engine { +namespace experimental { + +template +class CFTypeUniqueRef { + static_assert(std::is_convertible::value, + "T should be `CFXxxRef` type"); + + public: + /* implicit */ + CFTypeUniqueRef(T cf_type_ref = nullptr) : cf_type_ref_(cf_type_ref) {} + ~CFTypeUniqueRef() { reset(); } + + CFTypeUniqueRef(CFTypeUniqueRef const&) = delete; + CFTypeUniqueRef& operator=(CFTypeUniqueRef const&) = delete; + + CFTypeUniqueRef(CFTypeUniqueRef&& other) : cf_type_ref_(other.release()){}; + CFTypeUniqueRef& operator=(CFTypeUniqueRef&& other) { + reset(other.release()); + return *this; + } + + operator T() { return cf_type_ref_; } + + // Note: this is for passing a CFTypeRef as output parameter to a CF API, the + // current ref is released (if any) regardless of whether new value is set + T* operator&() { + reset(); + return &cf_type_ref_; + } + + T release() { + T old = cf_type_ref_; + cf_type_ref_ = nullptr; + return old; + } + + void reset(T other = nullptr) { + if (cf_type_ref_ == other) { + return; + } + T old = cf_type_ref_; + cf_type_ref_ = other; + if (old) { + CFRelease(old); + } + } + + private: + T cf_type_ref_; +}; + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GPR_APPLE + +#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_CF_ENGINE_CFTYPE_UNIQUE_REF_H diff --git a/src/core/lib/event_engine/default_event_engine_factory.cc b/src/core/lib/event_engine/default_event_engine_factory.cc index a4311bc9668..10bfe898650 100644 --- a/src/core/lib/event_engine/default_event_engine_factory.cc +++ b/src/core/lib/event_engine/default_event_engine_factory.cc @@ -20,7 +20,7 @@ #include -#ifdef GPR_WINDOWS +#if defined(GPR_WINDOWS) #include "src/core/lib/event_engine/windows/windows_engine.h" namespace grpc_event_engine { @@ -32,7 +32,19 @@ std::unique_ptr DefaultEventEngineFactory() { } // namespace experimental } // namespace grpc_event_engine -#else // not GPR_WINDOWS +#elif defined(GRPC_CFSTREAM) +#include "src/core/lib/event_engine/cf_engine/cf_engine.h" + +namespace grpc_event_engine { +namespace experimental { + +std::unique_ptr DefaultEventEngineFactory() { + return std::make_unique(); +} + +} // namespace experimental +} // namespace grpc_event_engine +#else #include "src/core/lib/event_engine/posix_engine/posix_engine.h" namespace grpc_event_engine { diff --git a/src/core/lib/iomgr/cfstream_handle.cc b/src/core/lib/iomgr/cfstream_handle.cc index 434a0199e8a..2978139b606 100644 --- a/src/core/lib/iomgr/cfstream_handle.cc +++ b/src/core/lib/iomgr/cfstream_handle.cc @@ -188,7 +188,7 @@ void CFStreamHandle::Ref(const char* file, int line, const char* reason) { void CFStreamHandle::Unref(const char* file, int line, const char* reason) { if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count); - gpr_log(GPR_DEBUG, + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, reason, val, val - 1); } diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc index edfea7c802f..673a0b9f368 100644 --- a/src/core/lib/iomgr/endpoint_cfstream.cc +++ b/src/core/lib/iomgr/endpoint_cfstream.cc @@ -240,7 +240,7 @@ static void WriteAction(void* arg, grpc_error_handle error) { } static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, bool urgent, + grpc_closure* cb, bool /*urgent*/, int /*min_progress_size*/) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { @@ -258,7 +258,8 @@ static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices, } static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, void* arg, int /*max_frame_size*/) { + grpc_closure* cb, void* /*arg*/, + int /*max_frame_size*/) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu", @@ -304,14 +305,15 @@ absl::string_view CFStreamGetLocalAddress(grpc_endpoint* ep) { return ep_impl->local_address; } -int CFStreamGetFD(grpc_endpoint* ep) { return 0; } +int CFStreamGetFD(grpc_endpoint* /*ep*/) { return 0; } -bool CFStreamCanTrackErr(grpc_endpoint* ep) { return false; } +bool CFStreamCanTrackErr(grpc_endpoint* /*ep*/) { return false; } -void CFStreamAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {} -void CFStreamAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} -void CFStreamDeleteFromPollsetSet(grpc_endpoint* ep, - grpc_pollset_set* pollset) {} +void CFStreamAddToPollset(grpc_endpoint* /*ep*/, grpc_pollset* /*pollset*/) {} +void CFStreamAddToPollsetSet(grpc_endpoint* /*ep*/, + grpc_pollset_set* /*pollset*/) {} +void CFStreamDeleteFromPollsetSet(grpc_endpoint* /*ep*/, + grpc_pollset_set* /*pollset*/) {} static const grpc_endpoint_vtable vtable = {CFStreamRead, CFStreamWrite, diff --git a/src/core/lib/iomgr/ev_apple.cc b/src/core/lib/iomgr/ev_apple.cc index efc7639ac91..ba6c21d670c 100644 --- a/src/core/lib/iomgr/ev_apple.cc +++ b/src/core/lib/iomgr/ev_apple.cc @@ -118,7 +118,7 @@ static void grpc_apple_register_write_stream_queue( /// be issued to the run loop when a network event happens and will be driven by /// the global run loop thread gGlobalRunLoopThread. static void grpc_apple_register_read_stream_run_loop( - CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) { + CFReadStreamRef read_stream, dispatch_queue_t /*dispatch_queue*/) { GRPC_POLLING_TRACE("Register read stream: %p", read_stream); grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu); CFReadStreamScheduleWithRunLoop(read_stream, gGlobalRunLoopContext->run_loop, @@ -131,7 +131,7 @@ static void grpc_apple_register_read_stream_run_loop( /// be issued to the run loop when a network event happens, and will be driven /// by the global run loop thread gGlobalRunLoopThread. static void grpc_apple_register_write_stream_run_loop( - CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) { + CFWriteStreamRef write_stream, dispatch_queue_t /*dispatch_queue*/) { GRPC_POLLING_TRACE("Register write stream: %p", write_stream); grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu); CFWriteStreamScheduleWithRunLoop( @@ -163,7 +163,7 @@ void grpc_apple_register_write_stream(CFWriteStreamRef write_stream, /// Drive the run loop in a global singleton thread until the global run loop is /// shutdown. -static void GlobalRunLoopFunc(void* arg) { +static void GlobalRunLoopFunc(void* /*arg*/) { grpc_core::LockableAndReleasableMutexLock lock(&gGlobalRunLoopContext->mu); gGlobalRunLoopContext->run_loop = CFRunLoopGetCurrent(); gGlobalRunLoopContext->init_cv.Signal(); @@ -342,15 +342,15 @@ grpc_pollset_vtable grpc_apple_pollset_vtable = { // pollset_set implementation grpc_pollset_set* pollset_set_create(void) { return nullptr; } -void pollset_set_destroy(grpc_pollset_set* pollset_set) {} -void pollset_set_add_pollset(grpc_pollset_set* pollset_set, - grpc_pollset* pollset) {} -void pollset_set_del_pollset(grpc_pollset_set* pollset_set, - grpc_pollset* pollset) {} -void pollset_set_add_pollset_set(grpc_pollset_set* bag, - grpc_pollset_set* item) {} -void pollset_set_del_pollset_set(grpc_pollset_set* bag, - grpc_pollset_set* item) {} +void pollset_set_destroy(grpc_pollset_set* /*pollset_set*/) {} +void pollset_set_add_pollset(grpc_pollset_set* /*pollset_set*/, + grpc_pollset* /*pollset*/) {} +void pollset_set_del_pollset(grpc_pollset_set* /*pollset_set*/, + grpc_pollset* /*pollset*/) {} +void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/, + grpc_pollset_set* /*item*/) {} +void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/, + grpc_pollset_set* /*item*/) {} grpc_pollset_set_vtable grpc_apple_pollset_set_vtable = { pollset_set_create, pollset_set_destroy, diff --git a/src/core/lib/iomgr/iomgr_posix_cfstream.cc b/src/core/lib/iomgr/iomgr_posix_cfstream.cc index ddb514000fe..8e2d2186664 100644 --- a/src/core/lib/iomgr/iomgr_posix_cfstream.cc +++ b/src/core/lib/iomgr/iomgr_posix_cfstream.cc @@ -73,7 +73,7 @@ static bool apple_iomgr_platform_is_any_background_poller_thread(void) { } static bool apple_iomgr_platform_add_closure_to_background_poller( - grpc_closure* closure, grpc_error_handle error) { + grpc_closure* /*closure*/, grpc_error_handle /*error*/) { return false; } diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc index 7be3637ac42..09e7c5d799a 100644 --- a/src/core/lib/iomgr/tcp_client_cfstream.cc +++ b/src/core/lib/iomgr/tcp_client_cfstream.cc @@ -33,6 +33,7 @@ #include #include "src/core/lib/address_utils/sockaddr_utils.h" +#include "src/core/lib/event_engine/shim.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/iomgr/cfstream_handle.h" @@ -40,6 +41,7 @@ #include "src/core/lib/iomgr/endpoint_cfstream.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error_cfstream.h" +#include "src/core/lib/iomgr/event_engine_shims/tcp_client.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/iomgr/timer.h" @@ -149,9 +151,14 @@ static void ParseResolvedAddress(const grpc_resolved_address* addr, static int64_t CFStreamClientConnect( grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, - const grpc_event_engine::experimental::EndpointConfig& /*config*/, + grpc_pollset_set* /*interested_parties*/, + const grpc_event_engine::experimental::EndpointConfig& config, const grpc_resolved_address* resolved_addr, grpc_core::Timestamp deadline) { + if (grpc_event_engine::experimental::UseEventEngineClient()) { + return grpc_event_engine::experimental::event_engine_tcp_client_connect( + closure, ep, config, resolved_addr, deadline); + } + auto addr_uri = grpc_sockaddr_to_uri(resolved_addr); if (!addr_uri.ok()) { grpc_error_handle error = GRPC_ERROR_CREATE(addr_uri.status().ToString()); @@ -198,7 +205,11 @@ static int64_t CFStreamClientConnect( return 0; } -static bool CFStreamClientCancelConnect(int64_t /*connection_handle*/) { +static bool CFStreamClientCancelConnect(int64_t connection_handle) { + if (grpc_event_engine::experimental::UseEventEngineClient()) { + return grpc_event_engine::experimental:: + event_engine_tcp_client_cancel_connect(connection_handle); + } return false; } diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index e6c288401fd..151465b521b 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -488,6 +488,8 @@ CORE_SOURCE_FILES = [ 'src/core/lib/debug/stats.cc', 'src/core/lib/debug/stats_data.cc', 'src/core/lib/debug/trace.cc', + 'src/core/lib/event_engine/cf_engine/cf_engine.cc', + 'src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc', 'src/core/lib/event_engine/channel_args_endpoint_config.cc', 'src/core/lib/event_engine/default_event_engine.cc', 'src/core/lib/event_engine/default_event_engine_factory.cc', diff --git a/test/core/event_engine/cf/BUILD b/test/core/event_engine/cf/BUILD new file mode 100644 index 00000000000..9bc735bc860 --- /dev/null +++ b/test/core/event_engine/cf/BUILD @@ -0,0 +1,34 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("//bazel:grpc_build_system.bzl", "grpc_cc_test") + +licenses(["notice"]) + +grpc_cc_test( + name = "cf_engine_test", + timeout = "short", + srcs = ["cf_engine_test.cc"], + external_deps = ["gtest"], + language = "C++", + tags = [ + "no_linux", + "no_windows", + ], + deps = [ + "//:gpr_platform", + "//src/core:cf_event_engine", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/event_engine/cf/cf_engine_test.cc b/test/core/event_engine/cf/cf_engine_test.cc new file mode 100644 index 00000000000..f50b40f82fa --- /dev/null +++ b/test/core/event_engine/cf/cf_engine_test.cc @@ -0,0 +1,96 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#ifdef GPR_APPLE + +#include + +#include "absl/status/status.h" +#include "gtest/gtest.h" + +#include +#include + +#include "src/core/lib/event_engine/cf_engine/cf_engine.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" +#include "src/core/lib/resource_quota/memory_quota.h" +#include "src/core/lib/resource_quota/resource_quota.h" +#include "test/core/util/port.h" + +using namespace std::chrono_literals; + +namespace grpc_event_engine { +namespace experimental { + +TEST(CFEventEngineTest, TestConnectionTimeout) { + // use a non-routable IP so connection will timeout + auto resolved_addr = URIToResolvedAddress("ipv4:10.255.255.255:1234"); + GPR_ASSERT(resolved_addr.ok()); + + grpc_core::MemoryQuota memory_quota("cf_engine_test"); + grpc_core::Notification client_signal; + auto cf_engine = std::make_shared(); + + ChannelArgsEndpointConfig config(grpc_core::ChannelArgs().Set( + GRPC_ARG_RESOURCE_QUOTA, grpc_core::ResourceQuota::Default())); + cf_engine->Connect( + [&client_signal](auto endpoint) { + EXPECT_EQ(endpoint.status().code(), + absl::StatusCode::kDeadlineExceeded); + client_signal.Notify(); + }, + *resolved_addr, config, memory_quota.CreateMemoryAllocator("conn1"), 1ms); + + client_signal.WaitForNotification(); +} + +TEST(CFEventEngineTest, TestConnectionCancelled) { + // use a non-routable IP so to cancel connection before timeout + auto resolved_addr = URIToResolvedAddress("ipv4:10.255.255.255:1234"); + GPR_ASSERT(resolved_addr.ok()); + + grpc_core::MemoryQuota memory_quota("cf_engine_test"); + grpc_core::Notification client_signal; + auto cf_engine = std::make_shared(); + + ChannelArgsEndpointConfig config(grpc_core::ChannelArgs().Set( + GRPC_ARG_RESOURCE_QUOTA, grpc_core::ResourceQuota::Default())); + auto conn_handle = cf_engine->Connect( + [&client_signal](auto endpoint) { + EXPECT_EQ(endpoint.status().code(), absl::StatusCode::kCancelled); + client_signal.Notify(); + }, + *resolved_addr, config, memory_quota.CreateMemoryAllocator("conn1"), 1h); + + cf_engine->CancelConnect(conn_handle); + client_signal.WaitForNotification(); +} + +} // namespace experimental +} // namespace grpc_event_engine + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc_init(); + int status = RUN_ALL_TESTS(); + grpc_shutdown(); + return status; +} + +#else // not GPR_APPLE +int main(int /* argc */, char** /* argv */) { return 0; } +#endif diff --git a/test/core/event_engine/posix/BUILD b/test/core/event_engine/posix/BUILD index 0d8ee42474b..6576c2157eb 100644 --- a/test/core/event_engine/posix/BUILD +++ b/test/core/event_engine/posix/BUILD @@ -224,6 +224,7 @@ grpc_cc_test( external_deps = ["gtest"], language = "C++", tags = [ + "no_mac", "no_windows", ], uses_event_engine = True, diff --git a/test/core/event_engine/test_suite/BUILD b/test/core/event_engine/test_suite/BUILD index 620380bd94c..8e8592896cf 100644 --- a/test/core/event_engine/test_suite/BUILD +++ b/test/core/event_engine/test_suite/BUILD @@ -40,6 +40,7 @@ grpc_cc_test( name = "posix_event_engine_test", srcs = ["posix_event_engine_test.cc"], tags = [ + "no_mac", "no_windows", ], uses_event_engine = True, @@ -98,9 +99,11 @@ grpc_cc_test( "no_linux", "no_windows", ], - uses_polling = False, + uses_polling = True, deps = [ "//src/core:cf_event_engine", + "//test/core/event_engine/test_suite/posix:oracle_event_engine_posix", + "//test/core/event_engine/test_suite/tests:client", "//test/core/event_engine/test_suite/tests:timer", ], ) diff --git a/test/core/event_engine/test_suite/cf_event_engine_test.cc b/test/core/event_engine/test_suite/cf_event_engine_test.cc index 1f3c800902c..1d222514cb6 100644 --- a/test/core/event_engine/test_suite/cf_event_engine_test.cc +++ b/test/core/event_engine/test_suite/cf_event_engine_test.cc @@ -19,6 +19,8 @@ #include "src/core/lib/event_engine/cf_engine/cf_engine.h" #include "test/core/event_engine/test_suite/event_engine_test_framework.h" +#include "test/core/event_engine/test_suite/posix/oracle_event_engine_posix.h" +#include "test/core/event_engine/test_suite/tests/client_test.h" #include "test/core/event_engine/test_suite/tests/timer_test.h" #include "test/core/util/test_config.h" @@ -28,8 +30,15 @@ int main(int argc, char** argv) { auto factory = []() { return std::make_unique(); }; - SetEventEngineFactories(factory, factory); + auto oracle_factory = []() { + return std::make_unique< + grpc_event_engine::experimental::PosixOracleEventEngine>(); + }; + SetEventEngineFactories(factory, oracle_factory); grpc_event_engine::experimental::InitTimerTests(); + grpc_event_engine::experimental::InitClientTests(); + // TODO(vigneshbabu): remove when the experiment is over + grpc_core::ForceEnableExperiment("event_engine_client", true); // TODO(ctiller): EventEngine temporarily needs grpc to be initialized first // until we clear out the iomgr shutdown code. grpc_init(); diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 91d92daee1a..eaed279d562 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2052,6 +2052,11 @@ src/core/lib/debug/stats_data.cc \ src/core/lib/debug/stats_data.h \ src/core/lib/debug/trace.cc \ src/core/lib/debug/trace.h \ +src/core/lib/event_engine/cf_engine/cf_engine.cc \ +src/core/lib/event_engine/cf_engine/cf_engine.h \ +src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc \ +src/core/lib/event_engine/cf_engine/cfstream_endpoint.h \ +src/core/lib/event_engine/cf_engine/cftype_unique_ref.h \ src/core/lib/event_engine/channel_args_endpoint_config.cc \ src/core/lib/event_engine/channel_args_endpoint_config.h \ src/core/lib/event_engine/common_closures.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 310e775884f..b54798074d0 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1830,6 +1830,11 @@ src/core/lib/debug/stats_data.cc \ src/core/lib/debug/stats_data.h \ src/core/lib/debug/trace.cc \ src/core/lib/debug/trace.h \ +src/core/lib/event_engine/cf_engine/cf_engine.cc \ +src/core/lib/event_engine/cf_engine/cf_engine.h \ +src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc \ +src/core/lib/event_engine/cf_engine/cfstream_endpoint.h \ +src/core/lib/event_engine/cf_engine/cftype_unique_ref.h \ src/core/lib/event_engine/channel_args_endpoint_config.cc \ src/core/lib/event_engine/channel_args_endpoint_config.h \ src/core/lib/event_engine/common_closures.h \ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index e8eed2dc675..60440ebed29 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -1345,6 +1345,28 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "cf_engine_test", + "platforms": [ + "linux", + "mac", + "posix" + ], + "uses_polling": true + }, { "args": [], "benchmark": false, @@ -1365,7 +1387,7 @@ "mac", "posix" ], - "uses_polling": false + "uses_polling": true }, { "args": [], @@ -5546,7 +5568,6 @@ "benchmark": false, "ci_platforms": [ "linux", - "mac", "posix" ], "cpu_cost": 1.0, @@ -5558,7 +5579,6 @@ "name": "posix_event_engine_connect_test", "platforms": [ "linux", - "mac", "posix" ], "uses_polling": true @@ -5568,7 +5588,6 @@ "benchmark": false, "ci_platforms": [ "linux", - "mac", "posix" ], "cpu_cost": 1.0, @@ -5580,7 +5599,6 @@ "name": "posix_event_engine_test", "platforms": [ "linux", - "mac", "posix" ], "uses_polling": true