From 8cf04e9a5405c3f6923276eb01ca0122f4223533 Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Fri, 20 Jan 2023 19:47:14 -0800 Subject: [PATCH] [EventEngine] Modify iomgr to allow creation of event engine clients and client side endpoints (#31661) * [WIP] EventEngine iomgr endpoint shims * [WIP] EventEngine::Endpoint iomgr shims for the PosixEventEngine * Util functions to help with posix engine listener implementation * sanity * update comments in posix_engine_listener_utils.h * review comments * iwyu * revert prev commit * iwyu * update build * update * regenerate projects * regenerate projects * minor fixes * update BUILD * sanity * update build * regenerate projects * fix unused parameter * sanity * update * sanity * regenerate_projects * remove unused variable * start * update * regenerate_projects * sanity * update * fixes * update * regenerate_projects * update * fix sanity and msan failure * more fixes * build failure * update * fix * sanity * fixes * update * regenerate projects * fix sanity * review comments * An EventEngine subclass to be implemented by all posix based event engines * sanity * comments * update * review comments * re-word * fix * update * review comments * regenerate projects * syntax fix * add lock free event benchmark * releasable mutex lock * fix build isue * update * start * regenerate projects * update * fix * windows build * update * windows portability issue * update * update * update * update * format * update * update * update * update * update * fix sanity * regenerate projects * update * iwyu * Fix resolved address length related bugs in tcp_socket_utils and listener_utils * iwyu * cleanup src/core/lib/event_engine/tcp_socket_utils.cc * iwyu * fix * regenerate projects * fix sanity * re-write endpoint shim * more re-write * cleanup * update * review comments * build issue * more build issue fixes plus adding event_engine_trace * even more build issue fixes * iwyu * add static_cast * update * remove redundant code * update * deduplicate * iwyu * Fix review comments and regenerate_projects * sanity * review comments * fix include guards Co-authored-by: AJ Heller --- BUILD | 54 ++- CMakeLists.txt | 12 + Makefile | 6 + build_autogenerated.yaml | 31 ++ config.m4 | 4 + config.w32 | 4 + gRPC-C++.podspec | 8 + gRPC-Core.podspec | 11 + grpc.gemspec | 7 + grpc.gyp | 9 + include/grpc/event_engine/slice_buffer.h | 12 + package.xml | 7 + src/core/BUILD | 30 +- .../posix_engine/posix_endpoint.h | 7 +- .../event_engine/posix_engine/posix_engine.cc | 12 +- src/core/lib/event_engine/resolved_address.cc | 19 + .../event_engine/resolved_address_internal.h | 34 ++ src/core/lib/event_engine/shim.cc | 36 ++ src/core/lib/event_engine/shim.h | 27 ++ src/core/lib/event_engine/tcp_socket_utils.cc | 16 + src/core/lib/event_engine/tcp_socket_utils.h | 5 + .../lib/iomgr/event_engine_shims/endpoint.cc | 388 ++++++++++++++++++ .../lib/iomgr/event_engine_shims/endpoint.h | 34 ++ .../iomgr/event_engine_shims/tcp_client.cc | 82 ++++ .../lib/iomgr/event_engine_shims/tcp_client.h | 44 ++ src/core/lib/iomgr/tcp_client_posix.cc | 12 + src/python/grpcio/grpc_core_dependencies.py | 3 + .../event_engine/posix/posix_endpoint_test.cc | 9 +- .../posix/posix_event_engine_connect_test.cc | 18 +- .../event_engine/test_suite/client_test.cc | 15 +- .../test_suite/event_engine_test_utils.cc | 21 +- .../test_suite/event_engine_test_utils.h | 2 - .../event_engine/test_suite/server_test.cc | 14 +- test/core/iomgr/BUILD | 6 +- test/core/iomgr/endpoint_pair_test.cc | 83 +++- test/core/iomgr/endpoint_tests.cc | 35 +- test/core/util/BUILD | 1 + tools/doxygen/Doxyfile.c++.internal | 7 + tools/doxygen/Doxyfile.core.internal | 7 + 39 files changed, 1057 insertions(+), 75 deletions(-) create mode 100644 src/core/lib/event_engine/resolved_address_internal.h create mode 100644 src/core/lib/event_engine/shim.cc create mode 100644 src/core/lib/event_engine/shim.h create mode 100644 src/core/lib/iomgr/event_engine_shims/endpoint.cc create mode 100644 src/core/lib/iomgr/event_engine_shims/endpoint.h create mode 100644 src/core/lib/iomgr/event_engine_shims/tcp_client.cc create mode 100644 src/core/lib/iomgr/event_engine_shims/tcp_client.h diff --git a/BUILD b/BUILD index 200b37eba8d..1da9f989cc4 100644 --- a/BUILD +++ b/BUILD @@ -1173,7 +1173,6 @@ grpc_cc_library( grpc_cc_library( name = "grpc_base", srcs = [ - "//src/core:lib/address_utils/parse_address.cc", "//src/core:lib/channel/channel_stack.cc", "//src/core:lib/channel/channel_stack_builder_impl.cc", "//src/core:lib/channel/channel_trace.cc", @@ -1204,8 +1203,6 @@ grpc_cc_library( "//src/core:lib/iomgr/gethostname_fallback.cc", "//src/core:lib/iomgr/gethostname_host_name_max.cc", "//src/core:lib/iomgr/gethostname_sysconf.cc", - "//src/core:lib/iomgr/grpc_if_nametoindex_posix.cc", - "//src/core:lib/iomgr/grpc_if_nametoindex_unsupported.cc", "//src/core:lib/iomgr/internal_errqueue.cc", "//src/core:lib/iomgr/iocp_windows.cc", "//src/core:lib/iomgr/iomgr.cc", @@ -1273,9 +1270,15 @@ grpc_cc_library( "//src/core:lib/transport/timeout_encoding.cc", "//src/core:lib/transport/transport.cc", "//src/core:lib/transport/transport_op_string.cc", + ] + + # TODO(vigneshbabu): remove these + # These headers used to be vended by this target, but they have to be + # removed after landing event engine. + [ + "//src/core:lib/iomgr/event_engine_shims/endpoint.cc", + "//src/core:lib/iomgr/event_engine_shims/tcp_client.cc", ], hdrs = [ - "//src/core:lib/address_utils/parse_address.h", "//src/core:lib/channel/call_finalization.h", "//src/core:lib/channel/call_tracer.h", "//src/core:lib/channel/channel_stack.h", @@ -1303,7 +1306,6 @@ grpc_cc_library( "//src/core:lib/iomgr/ev_poll_posix.h", "//src/core:lib/iomgr/ev_posix.h", "//src/core:lib/iomgr/gethostname.h", - "//src/core:lib/iomgr/grpc_if_nametoindex.h", "//src/core:lib/iomgr/internal_errqueue.h", "//src/core:lib/iomgr/iocp_windows.h", "//src/core:lib/iomgr/iomgr.h", @@ -1358,6 +1360,13 @@ grpc_cc_library( "//src/core:lib/transport/timeout_encoding.h", "//src/core:lib/transport/transport.h", "//src/core:lib/transport/transport_impl.h", + ] + + # TODO(vigneshbabu): remove these + # These headers used to be vended by this target, but they have to be + # removed after landing event engine. + [ + "//src/core:lib/iomgr/event_engine_shims/endpoint.h", + "//src/core:lib/iomgr/event_engine_shims/tcp_client.h", ], defines = select({ "systemd": ["HAVE_LIBSYSTEMD"], @@ -1399,6 +1408,7 @@ grpc_cc_library( "grpc_trace", "iomgr_timer", "orphanable", + "parse_address", "promise", "ref_counted_ptr", "sockaddr_utils", @@ -1422,6 +1432,10 @@ grpc_cc_library( "//src/core:default_event_engine", "//src/core:dual_ref_counted", "//src/core:error", + "//src/core:event_engine_common", + "//src/core:event_engine_shim", + "//src/core:event_engine_tcp_socket_utils", + "//src/core:event_engine_trace", "//src/core:event_log", "//src/core:experiments", "//src/core:gpr_atm", @@ -1442,6 +1456,7 @@ grpc_cc_library( "//src/core:pipe", "//src/core:poll", "//src/core:pollset_set", + "//src/core:posix_event_engine_base_hdrs", "//src/core:promise_status", "//src/core:ref_counted", "//src/core:resolved_address", @@ -2484,6 +2499,34 @@ grpc_cc_library( deps = ["gpr"], ) +grpc_cc_library( + name = "parse_address", + srcs = [ + "//src/core:lib/address_utils/parse_address.cc", + "//src/core:lib/iomgr/grpc_if_nametoindex_posix.cc", + "//src/core:lib/iomgr/grpc_if_nametoindex_unsupported.cc", + ], + hdrs = [ + "//src/core:lib/address_utils/parse_address.h", + "//src/core:lib/iomgr/grpc_if_nametoindex.h", + ], + external_deps = [ + "absl/status", + "absl/status:statusor", + "absl/strings", + ], + visibility = ["@grpc:alt_grpc_base_legacy"], + deps = [ + "gpr", + "uri_parser", + "//src/core:error", + "//src/core:grpc_sockaddr", + "//src/core:iomgr_port", + "//src/core:resolved_address", + "//src/core:status_helper", + ], +) + grpc_cc_library( name = "backoff", srcs = [ @@ -2811,6 +2854,7 @@ grpc_cc_library( "grpc_trace", "iomgr_timer", "orphanable", + "parse_address", "ref_counted_ptr", "server_address", "sockaddr_utils", diff --git a/CMakeLists.txt b/CMakeLists.txt index 0b80f699032..8c0cac17b19 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2175,6 +2175,7 @@ add_library(grpc src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc src/core/lib/event_engine/resolved_address.cc + src/core/lib/event_engine/shim.cc src/core/lib/event_engine/slice.cc src/core/lib/event_engine/slice_buffer.cc src/core/lib/event_engine/tcp_socket_utils.cc @@ -2215,6 +2216,8 @@ add_library(grpc src/core/lib/iomgr/ev_poll_posix.cc src/core/lib/iomgr/ev_posix.cc src/core/lib/iomgr/ev_windows.cc + src/core/lib/iomgr/event_engine_shims/endpoint.cc + src/core/lib/iomgr/event_engine_shims/tcp_client.cc src/core/lib/iomgr/exec_ctx.cc src/core/lib/iomgr/executor.cc src/core/lib/iomgr/fork_posix.cc @@ -2848,6 +2851,7 @@ add_library(grpc_unsecure src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc src/core/lib/event_engine/resolved_address.cc + src/core/lib/event_engine/shim.cc src/core/lib/event_engine/slice.cc src/core/lib/event_engine/slice_buffer.cc src/core/lib/event_engine/tcp_socket_utils.cc @@ -2887,6 +2891,8 @@ add_library(grpc_unsecure src/core/lib/iomgr/ev_poll_posix.cc src/core/lib/iomgr/ev_posix.cc src/core/lib/iomgr/ev_windows.cc + src/core/lib/iomgr/event_engine_shims/endpoint.cc + src/core/lib/iomgr/event_engine_shims/tcp_client.cc src/core/lib/iomgr/exec_ctx.cc src/core/lib/iomgr/executor.cc src/core/lib/iomgr/fork_posix.cc @@ -4336,6 +4342,7 @@ add_library(grpc_authorization_provider src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc src/core/lib/event_engine/resolved_address.cc + src/core/lib/event_engine/shim.cc src/core/lib/event_engine/slice.cc src/core/lib/event_engine/slice_buffer.cc src/core/lib/event_engine/tcp_socket_utils.cc @@ -4372,6 +4379,8 @@ add_library(grpc_authorization_provider src/core/lib/iomgr/ev_poll_posix.cc src/core/lib/iomgr/ev_posix.cc src/core/lib/iomgr/ev_windows.cc + src/core/lib/iomgr/event_engine_shims/endpoint.cc + src/core/lib/iomgr/event_engine_shims/tcp_client.cc src/core/lib/iomgr/exec_ctx.cc src/core/lib/iomgr/executor.cc src/core/lib/iomgr/fork_posix.cc @@ -11396,6 +11405,7 @@ add_executable(frame_test src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc src/core/lib/event_engine/resolved_address.cc + src/core/lib/event_engine/shim.cc src/core/lib/event_engine/slice.cc src/core/lib/event_engine/slice_buffer.cc src/core/lib/event_engine/tcp_socket_utils.cc @@ -11432,6 +11442,8 @@ add_executable(frame_test src/core/lib/iomgr/ev_poll_posix.cc src/core/lib/iomgr/ev_posix.cc src/core/lib/iomgr/ev_windows.cc + src/core/lib/iomgr/event_engine_shims/endpoint.cc + src/core/lib/iomgr/event_engine_shims/tcp_client.cc src/core/lib/iomgr/exec_ctx.cc src/core/lib/iomgr/executor.cc src/core/lib/iomgr/fork_posix.cc diff --git a/Makefile b/Makefile index 171d8077987..06918792f07 100644 --- a/Makefile +++ b/Makefile @@ -1438,6 +1438,7 @@ LIBGRPC_SRC = \ src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc \ src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc \ src/core/lib/event_engine/resolved_address.cc \ + src/core/lib/event_engine/shim.cc \ src/core/lib/event_engine/slice.cc \ src/core/lib/event_engine/slice_buffer.cc \ src/core/lib/event_engine/tcp_socket_utils.cc \ @@ -1478,6 +1479,8 @@ LIBGRPC_SRC = \ src/core/lib/iomgr/ev_poll_posix.cc \ src/core/lib/iomgr/ev_posix.cc \ src/core/lib/iomgr/ev_windows.cc \ + src/core/lib/iomgr/event_engine_shims/endpoint.cc \ + src/core/lib/iomgr/event_engine_shims/tcp_client.cc \ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/executor.cc \ src/core/lib/iomgr/fork_posix.cc \ @@ -1970,6 +1973,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc \ src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc \ src/core/lib/event_engine/resolved_address.cc \ + src/core/lib/event_engine/shim.cc \ src/core/lib/event_engine/slice.cc \ src/core/lib/event_engine/slice_buffer.cc \ src/core/lib/event_engine/tcp_socket_utils.cc \ @@ -2009,6 +2013,8 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/iomgr/ev_poll_posix.cc \ src/core/lib/iomgr/ev_posix.cc \ src/core/lib/iomgr/ev_windows.cc \ + src/core/lib/iomgr/event_engine_shims/endpoint.cc \ + src/core/lib/iomgr/event_engine_shims/tcp_client.cc \ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/executor.cc \ src/core/lib/iomgr/fork_posix.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 363229e26c5..2e276b548f9 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -801,6 +801,8 @@ libs: - src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h - src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h + - src/core/lib/event_engine/resolved_address_internal.h + - src/core/lib/event_engine/shim.h - src/core/lib/event_engine/socket_notifier.h - src/core/lib/event_engine/tcp_socket_utils.h - src/core/lib/event_engine/thread_pool.h @@ -860,6 +862,8 @@ libs: - src/core/lib/iomgr/ev_epoll1_linux.h - src/core/lib/iomgr/ev_poll_posix.h - src/core/lib/iomgr/ev_posix.h + - src/core/lib/iomgr/event_engine_shims/endpoint.h + - src/core/lib/iomgr/event_engine_shims/tcp_client.h - src/core/lib/iomgr/exec_ctx.h - src/core/lib/iomgr/executor.h - src/core/lib/iomgr/gethostname.h @@ -1562,6 +1566,7 @@ libs: - src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc - src/core/lib/event_engine/resolved_address.cc + - src/core/lib/event_engine/shim.cc - src/core/lib/event_engine/slice.cc - src/core/lib/event_engine/slice_buffer.cc - src/core/lib/event_engine/tcp_socket_utils.cc @@ -1602,6 +1607,8 @@ libs: - src/core/lib/iomgr/ev_poll_posix.cc - src/core/lib/iomgr/ev_posix.cc - src/core/lib/iomgr/ev_windows.cc + - src/core/lib/iomgr/event_engine_shims/endpoint.cc + - src/core/lib/iomgr/event_engine_shims/tcp_client.cc - src/core/lib/iomgr/exec_ctx.cc - src/core/lib/iomgr/executor.cc - src/core/lib/iomgr/fork_posix.cc @@ -2116,6 +2123,8 @@ libs: - src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h - src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h + - src/core/lib/event_engine/resolved_address_internal.h + - src/core/lib/event_engine/shim.h - src/core/lib/event_engine/socket_notifier.h - src/core/lib/event_engine/tcp_socket_utils.h - src/core/lib/event_engine/thread_pool.h @@ -2174,6 +2183,8 @@ libs: - src/core/lib/iomgr/ev_epoll1_linux.h - src/core/lib/iomgr/ev_poll_posix.h - src/core/lib/iomgr/ev_posix.h + - src/core/lib/iomgr/event_engine_shims/endpoint.h + - src/core/lib/iomgr/event_engine_shims/tcp_client.h - src/core/lib/iomgr/exec_ctx.h - src/core/lib/iomgr/executor.h - src/core/lib/iomgr/gethostname.h @@ -2497,6 +2508,7 @@ libs: - src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc - src/core/lib/event_engine/resolved_address.cc + - src/core/lib/event_engine/shim.cc - src/core/lib/event_engine/slice.cc - src/core/lib/event_engine/slice_buffer.cc - src/core/lib/event_engine/tcp_socket_utils.cc @@ -2536,6 +2548,8 @@ libs: - src/core/lib/iomgr/ev_poll_posix.cc - src/core/lib/iomgr/ev_posix.cc - src/core/lib/iomgr/ev_windows.cc + - src/core/lib/iomgr/event_engine_shims/endpoint.cc + - src/core/lib/iomgr/event_engine_shims/tcp_client.cc - src/core/lib/iomgr/exec_ctx.cc - src/core/lib/iomgr/executor.cc - src/core/lib/iomgr/fork_posix.cc @@ -3557,6 +3571,8 @@ libs: - src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h - src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h + - src/core/lib/event_engine/resolved_address_internal.h + - src/core/lib/event_engine/shim.h - src/core/lib/event_engine/socket_notifier.h - src/core/lib/event_engine/tcp_socket_utils.h - src/core/lib/event_engine/thread_pool.h @@ -3611,6 +3627,8 @@ libs: - src/core/lib/iomgr/ev_epoll1_linux.h - src/core/lib/iomgr/ev_poll_posix.h - src/core/lib/iomgr/ev_posix.h + - src/core/lib/iomgr/event_engine_shims/endpoint.h + - src/core/lib/iomgr/event_engine_shims/tcp_client.h - src/core/lib/iomgr/exec_ctx.h - src/core/lib/iomgr/executor.h - src/core/lib/iomgr/gethostname.h @@ -3816,6 +3834,7 @@ libs: - src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc - src/core/lib/event_engine/resolved_address.cc + - src/core/lib/event_engine/shim.cc - src/core/lib/event_engine/slice.cc - src/core/lib/event_engine/slice_buffer.cc - src/core/lib/event_engine/tcp_socket_utils.cc @@ -3852,6 +3871,8 @@ libs: - src/core/lib/iomgr/ev_poll_posix.cc - src/core/lib/iomgr/ev_posix.cc - src/core/lib/iomgr/ev_windows.cc + - src/core/lib/iomgr/event_engine_shims/endpoint.cc + - src/core/lib/iomgr/event_engine_shims/tcp_client.cc - src/core/lib/iomgr/exec_ctx.cc - src/core/lib/iomgr/executor.cc - src/core/lib/iomgr/fork_posix.cc @@ -7339,6 +7360,8 @@ targets: - src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h - src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h + - src/core/lib/event_engine/resolved_address_internal.h + - src/core/lib/event_engine/shim.h - src/core/lib/event_engine/socket_notifier.h - src/core/lib/event_engine/tcp_socket_utils.h - src/core/lib/event_engine/thread_pool.h @@ -7393,6 +7416,8 @@ targets: - src/core/lib/iomgr/ev_epoll1_linux.h - src/core/lib/iomgr/ev_poll_posix.h - src/core/lib/iomgr/ev_posix.h + - src/core/lib/iomgr/event_engine_shims/endpoint.h + - src/core/lib/iomgr/event_engine_shims/tcp_client.h - src/core/lib/iomgr/exec_ctx.h - src/core/lib/iomgr/executor.h - src/core/lib/iomgr/gethostname.h @@ -7580,6 +7605,7 @@ targets: - src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc - src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc - src/core/lib/event_engine/resolved_address.cc + - src/core/lib/event_engine/shim.cc - src/core/lib/event_engine/slice.cc - src/core/lib/event_engine/slice_buffer.cc - src/core/lib/event_engine/tcp_socket_utils.cc @@ -7616,6 +7642,8 @@ targets: - src/core/lib/iomgr/ev_poll_posix.cc - src/core/lib/iomgr/ev_posix.cc - src/core/lib/iomgr/ev_windows.cc + - src/core/lib/iomgr/event_engine_shims/endpoint.cc + - src/core/lib/iomgr/event_engine_shims/tcp_client.cc - src/core/lib/iomgr/exec_ctx.cc - src/core/lib/iomgr/executor.cc - src/core/lib/iomgr/fork_posix.cc @@ -11493,6 +11521,9 @@ targets: language: c++ headers: - src/core/lib/event_engine/handle_containers.h + - src/core/lib/event_engine/resolved_address_internal.h + - src/core/lib/iomgr/port.h + - src/core/lib/iomgr/resolved_address.h - src/core/lib/slice/slice.h - src/core/lib/slice/slice_buffer.h - src/core/lib/slice/slice_internal.h diff --git a/config.m4 b/config.m4 index b592af708f0..5d2cfed9ae3 100644 --- a/config.m4 +++ b/config.m4 @@ -520,6 +520,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc \ src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc \ src/core/lib/event_engine/resolved_address.cc \ + src/core/lib/event_engine/shim.cc \ src/core/lib/event_engine/slice.cc \ src/core/lib/event_engine/slice_buffer.cc \ src/core/lib/event_engine/tcp_socket_utils.cc \ @@ -603,6 +604,8 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/iomgr/ev_poll_posix.cc \ src/core/lib/iomgr/ev_posix.cc \ src/core/lib/iomgr/ev_windows.cc \ + src/core/lib/iomgr/event_engine_shims/endpoint.cc \ + src/core/lib/iomgr/event_engine_shims/tcp_client.cc \ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/executor.cc \ src/core/lib/iomgr/fork_posix.cc \ @@ -1395,6 +1398,7 @@ if test "$PHP_GRPC" != "no"; then PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/handshaker) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/http) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/iomgr) + PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/iomgr/event_engine_shims) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/json) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/load_balancing) PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/matchers) diff --git a/config.w32 b/config.w32 index 04f916eaa46..69d0aa947ba 100644 --- a/config.w32 +++ b/config.w32 @@ -486,6 +486,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\event_engine\\posix_engine\\wakeup_fd_pipe.cc " + "src\\core\\lib\\event_engine\\posix_engine\\wakeup_fd_posix_default.cc " + "src\\core\\lib\\event_engine\\resolved_address.cc " + + "src\\core\\lib\\event_engine\\shim.cc " + "src\\core\\lib\\event_engine\\slice.cc " + "src\\core\\lib\\event_engine\\slice_buffer.cc " + "src\\core\\lib\\event_engine\\tcp_socket_utils.cc " + @@ -569,6 +570,8 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\iomgr\\ev_poll_posix.cc " + "src\\core\\lib\\iomgr\\ev_posix.cc " + "src\\core\\lib\\iomgr\\ev_windows.cc " + + "src\\core\\lib\\iomgr\\event_engine_shims\\endpoint.cc " + + "src\\core\\lib\\iomgr\\event_engine_shims\\tcp_client.cc " + "src\\core\\lib\\iomgr\\exec_ctx.cc " + "src\\core\\lib\\iomgr\\executor.cc " + "src\\core\\lib\\iomgr\\fork_posix.cc " + @@ -1525,6 +1528,7 @@ if (PHP_GRPC != "no") { FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\handshaker"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\http"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\iomgr"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\iomgr\\event_engine_shims"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\json"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\load_balancing"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\matchers"); diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 26804333e9f..9deabf2b158 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -751,6 +751,8 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h', 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h', 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h', + 'src/core/lib/event_engine/resolved_address_internal.h', + 'src/core/lib/event_engine/shim.h', 'src/core/lib/event_engine/socket_notifier.h', 'src/core/lib/event_engine/tcp_socket_utils.h', 'src/core/lib/event_engine/thread_pool.h', @@ -835,6 +837,8 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/ev_epoll1_linux.h', 'src/core/lib/iomgr/ev_poll_posix.h', 'src/core/lib/iomgr/ev_posix.h', + 'src/core/lib/iomgr/event_engine_shims/endpoint.h', + 'src/core/lib/iomgr/event_engine_shims/tcp_client.h', 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', 'src/core/lib/iomgr/gethostname.h', @@ -1667,6 +1671,8 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h', 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h', 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h', + 'src/core/lib/event_engine/resolved_address_internal.h', + 'src/core/lib/event_engine/shim.h', 'src/core/lib/event_engine/socket_notifier.h', 'src/core/lib/event_engine/tcp_socket_utils.h', 'src/core/lib/event_engine/thread_pool.h', @@ -1751,6 +1757,8 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/ev_epoll1_linux.h', 'src/core/lib/iomgr/ev_poll_posix.h', 'src/core/lib/iomgr/ev_posix.h', + 'src/core/lib/iomgr/event_engine_shims/endpoint.h', + 'src/core/lib/iomgr/event_engine_shims/tcp_client.h', 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', 'src/core/lib/iomgr/gethostname.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 54fc1a2cf78..8d8d1d8ceb4 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1161,6 +1161,9 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc', 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h', 'src/core/lib/event_engine/resolved_address.cc', + 'src/core/lib/event_engine/resolved_address_internal.h', + 'src/core/lib/event_engine/shim.cc', + 'src/core/lib/event_engine/shim.h', 'src/core/lib/event_engine/slice.cc', 'src/core/lib/event_engine/slice_buffer.cc', 'src/core/lib/event_engine/socket_notifier.h', @@ -1328,6 +1331,10 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/ev_posix.cc', 'src/core/lib/iomgr/ev_posix.h', 'src/core/lib/iomgr/ev_windows.cc', + 'src/core/lib/iomgr/event_engine_shims/endpoint.cc', + 'src/core/lib/iomgr/event_engine_shims/endpoint.h', + 'src/core/lib/iomgr/event_engine_shims/tcp_client.cc', + 'src/core/lib/iomgr/event_engine_shims/tcp_client.h', 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.cc', @@ -2339,6 +2346,8 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h', 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h', 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h', + 'src/core/lib/event_engine/resolved_address_internal.h', + 'src/core/lib/event_engine/shim.h', 'src/core/lib/event_engine/socket_notifier.h', 'src/core/lib/event_engine/tcp_socket_utils.h', 'src/core/lib/event_engine/thread_pool.h', @@ -2423,6 +2432,8 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/ev_epoll1_linux.h', 'src/core/lib/iomgr/ev_poll_posix.h', 'src/core/lib/iomgr/ev_posix.h', + 'src/core/lib/iomgr/event_engine_shims/endpoint.h', + 'src/core/lib/iomgr/event_engine_shims/tcp_client.h', 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', 'src/core/lib/iomgr/gethostname.h', diff --git a/grpc.gemspec b/grpc.gemspec index d39e0aa79b8..b47cc8b8afc 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1072,6 +1072,9 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc ) s.files += %w( src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h ) s.files += %w( src/core/lib/event_engine/resolved_address.cc ) + s.files += %w( src/core/lib/event_engine/resolved_address_internal.h ) + s.files += %w( src/core/lib/event_engine/shim.cc ) + s.files += %w( src/core/lib/event_engine/shim.h ) s.files += %w( src/core/lib/event_engine/slice.cc ) s.files += %w( src/core/lib/event_engine/slice_buffer.cc ) s.files += %w( src/core/lib/event_engine/socket_notifier.h ) @@ -1239,6 +1242,10 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/ev_posix.cc ) s.files += %w( src/core/lib/iomgr/ev_posix.h ) s.files += %w( src/core/lib/iomgr/ev_windows.cc ) + s.files += %w( src/core/lib/iomgr/event_engine_shims/endpoint.cc ) + s.files += %w( src/core/lib/iomgr/event_engine_shims/endpoint.h ) + s.files += %w( src/core/lib/iomgr/event_engine_shims/tcp_client.cc ) + s.files += %w( src/core/lib/iomgr/event_engine_shims/tcp_client.h ) s.files += %w( src/core/lib/iomgr/exec_ctx.cc ) s.files += %w( src/core/lib/iomgr/exec_ctx.h ) s.files += %w( src/core/lib/iomgr/executor.cc ) diff --git a/grpc.gyp b/grpc.gyp index c9c892d2d5c..b60de9a3cdf 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -851,6 +851,7 @@ 'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc', 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc', 'src/core/lib/event_engine/resolved_address.cc', + 'src/core/lib/event_engine/shim.cc', 'src/core/lib/event_engine/slice.cc', 'src/core/lib/event_engine/slice_buffer.cc', 'src/core/lib/event_engine/tcp_socket_utils.cc', @@ -891,6 +892,8 @@ 'src/core/lib/iomgr/ev_poll_posix.cc', 'src/core/lib/iomgr/ev_posix.cc', 'src/core/lib/iomgr/ev_windows.cc', + 'src/core/lib/iomgr/event_engine_shims/endpoint.cc', + 'src/core/lib/iomgr/event_engine_shims/tcp_client.cc', 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/executor.cc', 'src/core/lib/iomgr/fork_posix.cc', @@ -1325,6 +1328,7 @@ 'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc', 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc', 'src/core/lib/event_engine/resolved_address.cc', + 'src/core/lib/event_engine/shim.cc', 'src/core/lib/event_engine/slice.cc', 'src/core/lib/event_engine/slice_buffer.cc', 'src/core/lib/event_engine/tcp_socket_utils.cc', @@ -1364,6 +1368,8 @@ 'src/core/lib/iomgr/ev_poll_posix.cc', 'src/core/lib/iomgr/ev_posix.cc', 'src/core/lib/iomgr/ev_windows.cc', + 'src/core/lib/iomgr/event_engine_shims/endpoint.cc', + 'src/core/lib/iomgr/event_engine_shims/tcp_client.cc', 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/executor.cc', 'src/core/lib/iomgr/fork_posix.cc', @@ -1824,6 +1830,7 @@ 'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc', 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc', 'src/core/lib/event_engine/resolved_address.cc', + 'src/core/lib/event_engine/shim.cc', 'src/core/lib/event_engine/slice.cc', 'src/core/lib/event_engine/slice_buffer.cc', 'src/core/lib/event_engine/tcp_socket_utils.cc', @@ -1860,6 +1867,8 @@ 'src/core/lib/iomgr/ev_poll_posix.cc', 'src/core/lib/iomgr/ev_posix.cc', 'src/core/lib/iomgr/ev_windows.cc', + 'src/core/lib/iomgr/event_engine_shims/endpoint.cc', + 'src/core/lib/iomgr/event_engine_shims/tcp_client.cc', 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/executor.cc', 'src/core/lib/iomgr/fork_posix.cc', diff --git a/include/grpc/event_engine/slice_buffer.h b/include/grpc/event_engine/slice_buffer.h index 8a67b276f8f..4eaf48d8f77 100644 --- a/include/grpc/event_engine/slice_buffer.h +++ b/include/grpc/event_engine/slice_buffer.h @@ -136,7 +136,19 @@ class SliceBuffer { /// Return a pointer to the back raw grpc_slice_buffer grpc_slice_buffer* c_slice_buffer() { return &slice_buffer_; } + // Returns a SliceBuffer that transfers slices into this new SliceBuffer, + // leaving the input parameter empty. + static SliceBuffer TakeCSliceBuffer(grpc_slice_buffer& slice_buffer) { + return SliceBuffer(&slice_buffer); + } + private: + // Transfers slices into this new SliceBuffer, leaving the parameter empty. + // Does not take ownership of the slice_buffer argument. + explicit SliceBuffer(grpc_slice_buffer* slice_buffer) { + grpc_slice_buffer_init(&slice_buffer_); + grpc_slice_buffer_swap(&slice_buffer_, slice_buffer); + } /// The backing raw slice buffer. grpc_slice_buffer slice_buffer_; }; diff --git a/package.xml b/package.xml index 85b1f7d00e3..a92c23d2c46 100644 --- a/package.xml +++ b/package.xml @@ -1054,6 +1054,9 @@ + + + @@ -1221,6 +1224,10 @@ + + + + diff --git a/src/core/BUILD b/src/core/BUILD index 6d19833917e..2fecb3c706c 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -60,6 +60,7 @@ grpc_cc_library( ], hdrs = [ "lib/event_engine/handle_containers.h", + "lib/event_engine/resolved_address_internal.h", "//:include/grpc/event_engine/slice.h", "//:include/grpc/event_engine/slice_buffer.h", ], @@ -70,6 +71,7 @@ grpc_cc_library( "absl/utility", ], deps = [ + "resolved_address", "slice", "slice_buffer", "slice_cast", @@ -953,6 +955,7 @@ grpc_cc_library( "//:gpr", "//:grpc_base", "//:handshaker", + "//:parse_address", "//:ref_counted_ptr", "//:uri_parser", ], @@ -1872,11 +1875,11 @@ grpc_cc_library( deps = [ "event_engine_common", "event_engine_poller", + "event_engine_shim", "event_engine_tcp_socket_utils", "event_engine_thread_pool", "event_engine_trace", "event_engine_utils", - "experiments", "init_internally", "iomgr_port", "posix_event_engine_base_hdrs", @@ -2011,10 +2014,12 @@ grpc_cc_library( ], deps = [ "iomgr_port", + "resolved_address", "status_helper", "//:event_engine_base_hdrs", "//:gpr", "//:gpr_platform", + "//:parse_address", "//:uri_parser", ], ) @@ -2034,6 +2039,21 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "event_engine_shim", + srcs = [ + "lib/event_engine/shim.cc", + ], + hdrs = [ + "lib/event_engine/shim.h", + ], + deps = [ + "experiments", + "iomgr_port", + "//:gpr_platform", + ], +) + # NOTE: this target gets replaced inside Google's build system to be one that # integrates with other internal systems better. Please do not rename or fold # this into other targets. @@ -2634,6 +2654,7 @@ grpc_cc_library( "//:grpc_credentials_util", "//:grpc_security_base", "//:grpc_trace", + "//:parse_address", "//:promise", "//:ref_counted_ptr", "//:uri_parser", @@ -2768,6 +2789,7 @@ grpc_cc_library( "//:grpc_client_channel", "//:grpc_security_base", "//:handshaker", + "//:parse_address", "//:promise", "//:ref_counted_ptr", "//:sockaddr_utils", @@ -3153,6 +3175,7 @@ grpc_cc_library( "resolved_address", "//:gpr", "//:grpc_base", + "//:parse_address", "//:sockaddr_utils", ], ) @@ -3848,6 +3871,7 @@ grpc_cc_library( "//:grpc_trace", "//:iomgr_timer", "//:orphanable", + "//:parse_address", "//:ref_counted_ptr", "//:server_address", "//:sockaddr_utils", @@ -3921,6 +3945,7 @@ grpc_cc_library( "//:grpc_security_base", "//:grpc_service_config_impl", "//:grpc_trace", + "//:parse_address", "//:ref_counted_ptr", "//:sockaddr_utils", "//:uri_parser", @@ -4630,6 +4655,7 @@ grpc_cc_library( "//:grpc_base", "//:grpc_public_hdrs", "//:grpc_security_base", + "//:parse_address", "//:promise", "//:uri_parser", ], @@ -4731,9 +4757,9 @@ grpc_cc_library( "resolved_address", "//:config", "//:gpr", - "//:grpc_base", "//:grpc_resolver", "//:orphanable", + "//:parse_address", "//:server_address", "//:uri_parser", ], diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.h b/src/core/lib/event_engine/posix_engine/posix_endpoint.h index 5dee0b2666a..52e550205c8 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.h +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.h @@ -684,14 +684,13 @@ class PosixEndpoint : public PosixEndpointWithFdSupport { } int GetWrappedFd() override { - GPR_ASSERT(false && - "PosixEndpoint::GetWrappedFd not supported on this platform"); + grpc_core::Crash( + "PosixEndpoint::GetWrappedFd not supported on this platform"); } void Shutdown(absl::AnyInvocable release_fd)> on_release_fd) override { - GPR_ASSERT(false && - "PosixEndpoint::Shutdown not supported on this platform"); + grpc_core::Crash("PosixEndpoint::Shutdown not supported on this platform"); } ~PosixEndpoint() override = default; diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc index 2dc72fbfd75..9543b72d553 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -41,10 +41,10 @@ #include "src/core/lib/event_engine/posix.h" #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" #include "src/core/lib/event_engine/posix_engine/timer.h" +#include "src/core/lib/event_engine/shim.h" #include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/event_engine/trace.h" #include "src/core/lib/event_engine/utils.h" -#include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/sync.h" @@ -332,7 +332,7 @@ PosixEventEngine::PosixEventEngine(PosixEventPoller* poller) : connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)), executor_(std::make_shared()), timer_manager_(executor_) { - if (grpc_core::IsEventEngineClientEnabled()) { + if (UseEventEngineClient()) { poller_manager_ = std::make_shared(poller); } } @@ -341,7 +341,7 @@ PosixEventEngine::PosixEventEngine() : connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)), executor_(std::make_shared()), timer_manager_(executor_) { - if (grpc_core::IsEventEngineClientEnabled()) { + if (UseEventEngineClient()) { poller_manager_ = std::make_shared(executor_); if (poller_manager_->Poller() != nullptr) { executor_->Run([poller_manager = poller_manager_]() { @@ -539,7 +539,7 @@ EventEngine::ConnectionHandle PosixEventEngine::Connect( OnConnectCallback on_connect, const ResolvedAddress& addr, const EndpointConfig& args, MemoryAllocator memory_allocator, Duration timeout) { - if (!grpc_core::IsEventEngineClientEnabled()) { + if (!UseEventEngineClient()) { grpc_core::Crash("unimplemented"); } #ifdef GRPC_POSIX_SOCKET_TCP @@ -564,7 +564,7 @@ std::unique_ptr PosixEventEngine::CreatePosixEndpointFromFd(int fd, const EndpointConfig& config, MemoryAllocator memory_allocator) { - if (!grpc_core::IsEventEngineClientEnabled()) { + if (!UseEventEngineClient()) { grpc_core::Crash("unimplemented"); } #ifdef GRPC_POSIX_SOCKET_TCP @@ -613,7 +613,7 @@ PosixEventEngine::CreatePosixListener( absl::AnyInvocable on_shutdown, const EndpointConfig& config, std::unique_ptr memory_allocator_factory) { - if (!grpc_core::IsEventEngineClientEnabled()) { + if (!UseEventEngineClient()) { grpc_core::Crash("unimplemented"); } #ifdef GRPC_POSIX_SOCKET_TCP diff --git a/src/core/lib/event_engine/resolved_address.cc b/src/core/lib/event_engine/resolved_address.cc index 75cf2b08a96..49ea888b614 100644 --- a/src/core/lib/event_engine/resolved_address.cc +++ b/src/core/lib/event_engine/resolved_address.cc @@ -14,11 +14,15 @@ #include +#include "src/core/lib/iomgr/resolved_address.h" + #include #include #include +#include "src/core/lib/event_engine/resolved_address_internal.h" + // IWYU pragma: no_include namespace grpc_event_engine { @@ -37,5 +41,20 @@ const struct sockaddr* EventEngine::ResolvedAddress::address() const { socklen_t EventEngine::ResolvedAddress::size() const { return size_; } +EventEngine::ResolvedAddress CreateResolvedAddress( + const grpc_resolved_address& addr) { + return EventEngine::ResolvedAddress( + reinterpret_cast(addr.addr), addr.len); +} + +grpc_resolved_address CreateGRPCResolvedAddress( + const EventEngine::ResolvedAddress& ra) { + grpc_resolved_address grpc_addr; + memset(&grpc_addr, 0, sizeof(grpc_resolved_address)); + memcpy(grpc_addr.addr, ra.address(), ra.size()); + grpc_addr.len = ra.size(); + return grpc_addr; +} + } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/resolved_address_internal.h b/src/core/lib/event_engine/resolved_address_internal.h new file mode 100644 index 00000000000..c0de7cbf5a1 --- /dev/null +++ b/src/core/lib/event_engine/resolved_address_internal.h @@ -0,0 +1,34 @@ +// Copyright 2022 gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_RESOLVED_ADDRESS_INTERNAL_H +#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_RESOLVED_ADDRESS_INTERNAL_H +#include + +#include + +#include "src/core/lib/iomgr/resolved_address.h" + +namespace grpc_event_engine { +namespace experimental { + +EventEngine::ResolvedAddress CreateResolvedAddress( + const grpc_resolved_address& addr); + +grpc_resolved_address CreateGRPCResolvedAddress( + const EventEngine::ResolvedAddress& ra); + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_RESOLVED_ADDRESS_INTERNAL_H diff --git a/src/core/lib/event_engine/shim.cc b/src/core/lib/event_engine/shim.cc new file mode 100644 index 00000000000..f4249a42f04 --- /dev/null +++ b/src/core/lib/event_engine/shim.cc @@ -0,0 +1,36 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/event_engine/shim.h" + +#include "src/core/lib/experiments/experiments.h" +#include "src/core/lib/iomgr/port.h" + +namespace grpc_event_engine { +namespace experimental { + +bool UseEventEngineClient() { +// TODO(hork, eryu): Adjust the ifdefs accordingly when event engine's become +// available for other platforms. +#ifdef GRPC_POSIX_SOCKET_TCP + return grpc_core::IsEventEngineClientEnabled(); +#else + return false; +#endif +} + +} // namespace experimental +} // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/shim.h b/src/core/lib/event_engine/shim.h new file mode 100644 index 00000000000..918ed576df5 --- /dev/null +++ b/src/core/lib/event_engine/shim.h @@ -0,0 +1,27 @@ +// Copyright 2022 gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_SHIM_H +#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_SHIM_H + +#include + +namespace grpc_event_engine { +namespace experimental { + +bool UseEventEngineClient(); + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_SHIM_H diff --git a/src/core/lib/event_engine/tcp_socket_utils.cc b/src/core/lib/event_engine/tcp_socket_utils.cc index 0fff80240ee..4a198eac7fe 100644 --- a/src/core/lib/event_engine/tcp_socket_utils.cc +++ b/src/core/lib/event_engine/tcp_socket_utils.cc @@ -17,6 +17,7 @@ #include +#include "src/core/lib/address_utils/parse_address.h" #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON @@ -50,6 +51,7 @@ #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/gprpp/status_helper.h" +#include "src/core/lib/iomgr/resolved_address.h" #include "src/core/lib/uri/uri_parser.h" namespace grpc_event_engine { @@ -368,5 +370,19 @@ absl::StatusOr ResolvedAddressToURI( return uri->ToString(); } +absl::StatusOr URIToResolvedAddress( + std::string address_str) { + grpc_resolved_address addr; + absl::StatusOr uri = grpc_core::URI::Parse(address_str); + if (!uri.ok()) { + gpr_log(GPR_ERROR, "Failed to parse URI. Error: %s", + uri.status().ToString().c_str()); + } + GRPC_RETURN_IF_ERROR(uri.status()); + GPR_ASSERT(grpc_parse_uri(*uri, &addr)); + return EventEngine::ResolvedAddress( + reinterpret_cast(addr.addr), addr.len); +} + } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/tcp_socket_utils.h b/src/core/lib/event_engine/tcp_socket_utils.h index ea547be8b39..ac72d3ebc06 100644 --- a/src/core/lib/event_engine/tcp_socket_utils.h +++ b/src/core/lib/event_engine/tcp_socket_utils.h @@ -79,6 +79,11 @@ absl::StatusOr ResolvedAddressToNormalizedString( absl::StatusOr ResolvedAddressToURI( const EventEngine::ResolvedAddress& resolved_address); +// Given a URI string, returns the corresponding resolved address if the URI +// is valid. Otherwise it returns an appropriate error. +absl::StatusOr URIToResolvedAddress( + std::string address_str); + } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/iomgr/event_engine_shims/endpoint.cc b/src/core/lib/iomgr/event_engine_shims/endpoint.cc new file mode 100644 index 00000000000..ff727d51dff --- /dev/null +++ b/src/core/lib/iomgr/event_engine_shims/endpoint.cc @@ -0,0 +1,388 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include "src/core/lib/iomgr/event_engine_shims/endpoint.h" + +#include +#include + +#include "absl/status/status.h" +#include "absl/strings/string_view.h" + +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/event_engine/posix.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" +#include "src/core/lib/event_engine/trace.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/port.h" +#include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/transport/error_utils.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +namespace grpc_event_engine { +namespace experimental { + +namespace { + +constexpr int64_t kShutdownBit = static_cast(1) << 32; + +// A wrapper class to manage Event Engine endpoint ref counting and +// asynchronous shutdown. +class EventEngineEndpointWrapper { + public: + struct grpc_event_engine_endpoint { + grpc_endpoint base; + EventEngineEndpointWrapper* wrapper; + std::aligned_storage::type + read_buffer; + std::aligned_storage::type + write_buffer; + }; + + explicit EventEngineEndpointWrapper( + std::unique_ptr endpoint); + + int Fd() { + grpc_core::MutexLock lock(&mu_); + return fd_; + } + + absl::string_view PeerAddress() { + grpc_core::MutexLock lock(&mu_); + return peer_address_; + } + + absl::string_view LocalAddress() { + grpc_core::MutexLock lock(&mu_); + return local_address_; + } + + void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); } + void Unref() { + if (refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) { + delete this; + } + } + + // Returns a managed grpc_endpoint object. It retains ownership of the + // object. + grpc_endpoint* GetGrpcEndpoint() { return &eeep_->base; } + + // Read using the underlying EventEngine endpoint object. + void Read(absl::AnyInvocable on_read, SliceBuffer* buffer, + const EventEngine::Endpoint::ReadArgs* args) { + endpoint_->Read(std::move(on_read), buffer, args); + } + + // Write using the underlying EventEngine endpoint object + void Write(absl::AnyInvocable on_writable, + SliceBuffer* data, const EventEngine::Endpoint::WriteArgs* args) { + endpoint_->Write(std::move(on_writable), data, args); + } + + // Returns true if the endpoint is not yet shutdown. In that case, it also + // acquires a shutdown ref. Otherwise it returns false and doesn't modify + // the shutdown ref. + bool ShutdownRef() { + int64_t curr = shutdown_ref_.load(std::memory_order_acquire); + while (true) { + if (curr & kShutdownBit) { + return false; + } + if (shutdown_ref_.compare_exchange_strong(curr, curr + 1, + std::memory_order_acq_rel, + std::memory_order_relaxed)) { + return true; + } + } + } + + // Decrement the shutdown ref. If this is the last shutdown ref, it also + // deletes the underlying event engine endpoint. Deletion of the event + // engine endpoint should trigger execution of any pending read/write + // callbacks with NOT-OK status. + void ShutdownUnref() { + if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) == + kShutdownBit + 1) { + OnShutdownInternal(); + } + } + + // If trigger shutdown is called the first time, it sets the shutdown bit + // and decrements the shutdown ref. If trigger shutdown has been called + // before or in parallel, only one of them would win the race. The other + // invocation would simply return. + void TriggerShutdown() { + int64_t curr = shutdown_ref_.load(std::memory_order_acquire); + while (true) { + if (curr & kShutdownBit) { + return; + } + if (shutdown_ref_.compare_exchange_strong(curr, curr | kShutdownBit, + std::memory_order_acq_rel, + std::memory_order_relaxed)) { + Ref(); + if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) == + kShutdownBit + 1) { + OnShutdownInternal(); + } + return; + } + } + } + + private: + void OnShutdownInternal() { + { + grpc_core::MutexLock lock(&mu_); + fd_ = -1; + local_address_ = ""; + peer_address_ = ""; + } + endpoint_.reset(); + // For the Ref taken in TriggerShutdown + Unref(); + } + std::unique_ptr endpoint_; + std::unique_ptr eeep_; + std::atomic refs_{1}; + std::atomic shutdown_ref_{1}; + grpc_core::Mutex mu_; + std::string peer_address_; + std::string local_address_; + int fd_{-1}; +}; + +// Read from the endpoint and place the data in slices slice buffer. The +// provided closure is also invoked asynchronously. +void EndpointRead(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb, bool /* urgent */, int min_progress_size) { + auto* eeep = + reinterpret_cast( + ep); + if (!eeep->wrapper->ShutdownRef()) { + // Shutdown has already been triggered on the endpoint. + grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError()); + return; + } + + eeep->wrapper->Ref(); + EventEngine::Endpoint::ReadArgs read_args = {min_progress_size}; + + // TODO(vigneshbabu): Use SliceBufferCast<> here. + SliceBuffer* read_buffer = new (&eeep->read_buffer) + SliceBuffer(SliceBuffer::TakeCSliceBuffer(*slices)); + read_buffer->Clear(); + eeep->wrapper->Read( + [eeep, cb, slices](absl::Status status) { + auto* read_buffer = reinterpret_cast(&eeep->read_buffer); + grpc_slice_buffer_move_into(read_buffer->c_slice_buffer(), slices); + read_buffer->~SliceBuffer(); + if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { + size_t i; + gpr_log(GPR_INFO, "TCP: %p READ (peer=%s) error=%s", eeep->wrapper, + std::string(eeep->wrapper->PeerAddress()).c_str(), + status.ToString().c_str()); + if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) { + for (i = 0; i < slices->count; i++) { + char* dump = grpc_dump_slice(slices->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "READ DATA: %s", dump); + gpr_free(dump); + } + } + } + { + grpc_core::ApplicationCallbackExecCtx app_ctx; + grpc_core::ExecCtx exec_ctx; + grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status); + } + // For the ref taken in EndpointRead + eeep->wrapper->Unref(); + }, + read_buffer, &read_args); + + eeep->wrapper->ShutdownUnref(); +} + +// Write the data from slices and invoke the provided closure asynchronously +// after the write is complete. +void EndpointWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb, void* arg, int max_frame_size) { + auto* eeep = + reinterpret_cast( + ep); + if (!eeep->wrapper->ShutdownRef()) { + // Shutdown has already been triggered on the endpoint. + grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError()); + return; + } + + eeep->wrapper->Ref(); + EventEngine::Endpoint::WriteArgs write_args = {arg, max_frame_size}; + if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { + size_t i; + gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s)", eeep->wrapper, + std::string(eeep->wrapper->PeerAddress()).c_str()); + if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) { + for (i = 0; i < slices->count; i++) { + char* dump = + grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "WRITE DATA: %s", dump); + gpr_free(dump); + } + } + } + + // TODO(vigneshbabu): Use SliceBufferCast<> here. + SliceBuffer* write_buffer = new (&eeep->write_buffer) + SliceBuffer(SliceBuffer::TakeCSliceBuffer(*slices)); + eeep->wrapper->Write( + [eeep, cb](absl::Status status) { + auto* write_buffer = + reinterpret_cast(&eeep->write_buffer); + write_buffer->~SliceBuffer(); + if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { + gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s) error=%s", eeep->wrapper, + std::string(eeep->wrapper->PeerAddress()).c_str(), + status.ToString().c_str()); + } + { + grpc_core::ApplicationCallbackExecCtx app_ctx; + grpc_core::ExecCtx exec_ctx; + grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status); + } + // For the ref taken in EndpointWrite + eeep->wrapper->Unref(); + }, + write_buffer, &write_args); + + eeep->wrapper->ShutdownUnref(); +} + +void EndpointAddToPollset(grpc_endpoint* /* ep */, + grpc_pollset* /* pollset */) {} +void EndpointAddToPollsetSet(grpc_endpoint* /* ep */, + grpc_pollset_set* /* pollset */) {} +void EndpointDeleteFromPollsetSet(grpc_endpoint* /* ep */, + grpc_pollset_set* /* pollset */) {} +/// After shutdown, all endpoint operations except destroy are no-op, +/// and will return some kind of sane default (empty strings, nullptrs, etc). +/// It is the caller's responsibility to ensure that calls to EndpointShutdown +/// are synchronized. +void EndpointShutdown(grpc_endpoint* ep, grpc_error_handle why) { + auto* eeep = + reinterpret_cast( + ep); + if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) { + gpr_log(GPR_INFO, "TCP Endpoint %p shutdown why=%s", eeep->wrapper, + why.ToString().c_str()); + } + GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Shutdown:%s", eeep->wrapper, + why.ToString().c_str()); + eeep->wrapper->TriggerShutdown(); +} + +// Attempts to free the underlying data structures. +void EndpointDestroy(grpc_endpoint* ep) { + auto* eeep = + reinterpret_cast( + ep); + eeep->wrapper->Unref(); + GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Destroy", eeep->wrapper); +} + +absl::string_view EndpointGetPeerAddress(grpc_endpoint* ep) { + auto* eeep = + reinterpret_cast( + ep); + return eeep->wrapper->PeerAddress(); +} + +absl::string_view EndpointGetLocalAddress(grpc_endpoint* ep) { + auto* eeep = + reinterpret_cast( + ep); + return eeep->wrapper->LocalAddress(); +} + +int EndpointGetFd(grpc_endpoint* ep) { + auto* eeep = + reinterpret_cast( + ep); + return eeep->wrapper->Fd(); +} + +bool EndpointCanTrackErr(grpc_endpoint* /* ep */) { return false; } + +grpc_endpoint_vtable grpc_event_engine_endpoint_vtable = { + EndpointRead, + EndpointWrite, + EndpointAddToPollset, + EndpointAddToPollsetSet, + EndpointDeleteFromPollsetSet, + EndpointShutdown, + EndpointDestroy, + EndpointGetPeerAddress, + EndpointGetLocalAddress, + EndpointGetFd, + EndpointCanTrackErr}; + +EventEngineEndpointWrapper::EventEngineEndpointWrapper( + std::unique_ptr endpoint) + : endpoint_(std::move(endpoint)), + eeep_(std::make_unique()) { + eeep_->base.vtable = &grpc_event_engine_endpoint_vtable; + eeep_->wrapper = this; + auto local_addr = ResolvedAddressToURI(endpoint_->GetLocalAddress()); + if (local_addr.ok()) { + local_address_ = *local_addr; + } + auto peer_addr = ResolvedAddressToURI(endpoint_->GetPeerAddress()); + if (peer_addr.ok()) { + peer_address_ = *peer_addr; + } +#ifdef GRPC_POSIX_SOCKET_TCP + fd_ = reinterpret_cast(endpoint_.get()) + ->GetWrappedFd(); +#else // GRPC_POSIX_SOCKET_TCP + fd_ = -1; +#endif // GRPC_POSIX_SOCKET_TCP + GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Create", eeep_->wrapper); +} + +} // namespace + +grpc_endpoint* grpc_event_engine_endpoint_create( + std::unique_ptr ee_endpoint) { + GPR_DEBUG_ASSERT(ee_endpoint != nullptr); + auto wrapper = new EventEngineEndpointWrapper(std::move(ee_endpoint)); + return wrapper->GetGrpcEndpoint(); +} + +} // namespace experimental +} // namespace grpc_event_engine diff --git a/src/core/lib/iomgr/event_engine_shims/endpoint.h b/src/core/lib/iomgr/event_engine_shims/endpoint.h new file mode 100644 index 00000000000..430cc8cddf0 --- /dev/null +++ b/src/core/lib/iomgr/event_engine_shims/endpoint.h @@ -0,0 +1,34 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_ENDPOINT_H +#define GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_ENDPOINT_H +#include + +#include + +#include "src/core/lib/iomgr/endpoint.h" + +namespace grpc_event_engine { +namespace experimental { + +/// Creates an internal grpc_endpoint struct from an EventEngine Endpoint. +/// Server code needs to create grpc_endpoints after the EventEngine has made +/// connections. +grpc_endpoint* grpc_event_engine_endpoint_create( + std::unique_ptr ee_endpoint); + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_ENDPOINT_H diff --git a/src/core/lib/iomgr/event_engine_shims/tcp_client.cc b/src/core/lib/iomgr/event_engine_shims/tcp_client.cc new file mode 100644 index 00000000000..fead5a0bb37 --- /dev/null +++ b/src/core/lib/iomgr/event_engine_shims/tcp_client.cc @@ -0,0 +1,82 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include "src/core/lib/iomgr/event_engine_shims/tcp_client.h" + +#include "absl/status/status.h" +#include "absl/strings/string_view.h" + +#include +#include + +#include "src/core/lib/address_utils/sockaddr_utils.h" +#include "src/core/lib/event_engine/default_event_engine.h" +#include "src/core/lib/event_engine/resolved_address_internal.h" +#include "src/core/lib/event_engine/trace.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/event_engine_shims/endpoint.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/resource_quota/resource_quota.h" +#include "src/core/lib/transport/error_utils.h" + +namespace grpc_event_engine { +namespace experimental { + +int64_t event_engine_tcp_client_connect( + grpc_closure* on_connect, grpc_endpoint** endpoint, + const grpc_event_engine::experimental::EndpointConfig& config, + const grpc_resolved_address* addr, grpc_core::Timestamp deadline) { + auto resource_quota = reinterpret_cast( + config.GetVoidPointer(GRPC_ARG_RESOURCE_QUOTA)); + auto addr_uri = grpc_sockaddr_to_uri(addr); + EventEngine::ConnectionHandle handle = GetDefaultEventEngine()->Connect( + [on_connect, + endpoint](absl::StatusOr> ep) { + grpc_core::ApplicationCallbackExecCtx app_ctx; + grpc_core::ExecCtx exec_ctx; + absl::Status conn_status = ep.ok() ? absl::OkStatus() : ep.status(); + if (ep.ok()) { + *endpoint = grpc_event_engine_endpoint_create(std::move(*ep)); + } else { + *endpoint = nullptr; + } + GRPC_EVENT_ENGINE_TRACE("EventEngine::Connect Status: %s", + ep.status().ToString().c_str()); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_connect, + absl_status_to_grpc_error(conn_status)); + }, + CreateResolvedAddress(*addr), config, + resource_quota != nullptr + ? resource_quota->memory_quota()->CreateMemoryOwner( + absl::StrCat("tcp-client:", addr_uri.value())) + : grpc_event_engine::experimental::MemoryAllocator(), + std::max(grpc_core::Duration::Milliseconds(1), + deadline - grpc_core::Timestamp::Now())); + GRPC_EVENT_ENGINE_TRACE("EventEngine::Connect Peer: %s, handle: %" PRId64, + (*addr_uri).c_str(), + static_cast(handle.keys[0])); + return handle.keys[0]; +} + +bool event_engine_tcp_client_cancel_connect(int64_t connection_handle) { + GRPC_EVENT_ENGINE_TRACE("EventEngine::CancelConnect handle: %" PRId64, + connection_handle); + return GetDefaultEventEngine()->CancelConnect( + {static_cast(connection_handle), 0}); +} +} // namespace experimental +} // namespace grpc_event_engine diff --git a/src/core/lib/iomgr/event_engine_shims/tcp_client.h b/src/core/lib/iomgr/event_engine_shims/tcp_client.h new file mode 100644 index 00000000000..80f1c0cabc8 --- /dev/null +++ b/src/core/lib/iomgr/event_engine_shims/tcp_client.h @@ -0,0 +1,44 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_TCP_CLIENT_H +#define GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_TCP_CLIENT_H +#include + +#include + +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/resolved_address.h" + +namespace grpc_event_engine { +namespace experimental { + +/// Attempts to use event engine to connect to the specified remote address and +/// invokes the on_connect callback asynchronously upon connection +/// establishment, failure or timeout. It returns a 64 bit connection handle +/// which can later be used to cancel an in progress connection attempt. +int64_t event_engine_tcp_client_connect(grpc_closure* on_connect, + grpc_endpoint** endpoint, + const EndpointConfig& config, + const grpc_resolved_address* addr, + grpc_core::Timestamp deadline); + +/// Attempts to cancel an in progress connection attempt represented by the +/// passed in connection handle. It returns true if the cancellation attempt +/// succeeded. Otherwise it returns false. +bool event_engine_tcp_client_cancel_connect(int64_t connection_handle); + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_TCP_CLIENT_H diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 96ccdf298cd..5a7c77ae8fe 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -18,6 +18,7 @@ #include +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET_TCP_CLIENT @@ -35,9 +36,12 @@ #include #include "src/core/lib/address_utils/sockaddr_utils.h" +#include "src/core/lib/event_engine/resolved_address_internal.h" +#include "src/core/lib/event_engine/shim.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/event_engine_shims/tcp_client.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/sockaddr.h" @@ -400,6 +404,10 @@ static int64_t tcp_connect(grpc_closure* closure, grpc_endpoint** ep, const EndpointConfig& config, const grpc_resolved_address* addr, grpc_core::Timestamp deadline) { + if (grpc_event_engine::experimental::UseEventEngineClient()) { + return grpc_event_engine::experimental::event_engine_tcp_client_connect( + closure, ep, config, addr, deadline); + } grpc_resolved_address mapped_addr; grpc_core::PosixTcpOptions options(TcpOptionsFromEndpointConfig(config)); int fd = -1; @@ -415,6 +423,10 @@ static int64_t tcp_connect(grpc_closure* closure, grpc_endpoint** ep, } static bool tcp_cancel_connect(int64_t connection_handle) { + if (grpc_event_engine::experimental::UseEventEngineClient()) { + return grpc_event_engine::experimental:: + event_engine_tcp_client_cancel_connect(connection_handle); + } if (connection_handle <= 0) { return false; } diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 652e4c2d118..bb6171a6f70 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -495,6 +495,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc', 'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc', 'src/core/lib/event_engine/resolved_address.cc', + 'src/core/lib/event_engine/shim.cc', 'src/core/lib/event_engine/slice.cc', 'src/core/lib/event_engine/slice_buffer.cc', 'src/core/lib/event_engine/tcp_socket_utils.cc', @@ -578,6 +579,8 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/ev_poll_posix.cc', 'src/core/lib/iomgr/ev_posix.cc', 'src/core/lib/iomgr/ev_windows.cc', + 'src/core/lib/iomgr/event_engine_shims/endpoint.cc', + 'src/core/lib/iomgr/event_engine_shims/tcp_client.cc', 'src/core/lib/iomgr/exec_ctx.cc', 'src/core/lib/iomgr/executor.cc', 'src/core/lib/iomgr/fork_posix.cc', diff --git a/test/core/event_engine/posix/posix_endpoint_test.cc b/test/core/event_engine/posix/posix_endpoint_test.cc index cb2e5534088..8dabaf402dc 100644 --- a/test/core/event_engine/posix/posix_endpoint_test.cc +++ b/test/core/event_engine/posix/posix_endpoint_test.cc @@ -41,6 +41,7 @@ #include "src/core/lib/event_engine/posix_engine/posix_engine.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/gprpp/dual_ref_counted.h" #include "src/core/lib/gprpp/global_config.h" #include "src/core/lib/gprpp/memory.h" @@ -81,8 +82,8 @@ std::list CreateConnectedEndpoints( auto memory_quota = std::make_unique("bar"); std::string target_addr = absl::StrCat( "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); - EventEngine::ResolvedAddress resolved_addr = - URIToResolvedAddress(target_addr); + auto resolved_addr = URIToResolvedAddress(target_addr); + GPR_ASSERT(resolved_addr.ok()); std::unique_ptr server_endpoint; grpc_core::Notification* server_signal = new grpc_core::Notification(); @@ -108,12 +109,12 @@ std::list CreateConnectedEndpoints( std::make_unique("foo")); GPR_ASSERT(listener.ok()); - EXPECT_TRUE((*listener)->Bind(resolved_addr).ok()); + EXPECT_TRUE((*listener)->Bind(*resolved_addr).ok()); EXPECT_TRUE((*listener)->Start().ok()); // Create client socket and connect to the target address. for (int i = 0; i < num_connections; ++i) { - int client_fd = ConnectToServerOrDie(resolved_addr); + int client_fd = ConnectToServerOrDie(*resolved_addr); EventHandle* handle = poller.CreateHandle(client_fd, "test", poller.CanTrackErrors()); EXPECT_NE(handle, nullptr); diff --git a/test/core/event_engine/posix/posix_event_engine_connect_test.cc b/test/core/event_engine/posix/posix_event_engine_connect_test.cc index 02b3abf4e0a..8dc6a48dfae 100644 --- a/test/core/event_engine/posix/posix_event_engine_connect_test.cc +++ b/test/core/event_engine/posix/posix_event_engine_connect_test.cc @@ -147,10 +147,11 @@ TEST(PosixEventEngineTest, IndefiniteConnectTimeoutOrRstTest) { std::string target_addr = absl::StrCat( "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); auto resolved_addr = URIToResolvedAddress(target_addr); + GPR_ASSERT(resolved_addr.ok()); std::shared_ptr posix_ee = std::make_shared(); std::string resolved_addr_str = - ResolvedAddressToNormalizedString(resolved_addr).value(); - auto sockets = CreateConnectedSockets(resolved_addr); + ResolvedAddressToNormalizedString(*resolved_addr).value(); + auto sockets = CreateConnectedSockets(*resolved_addr); grpc_core::Notification signal; grpc_core::ChannelArgs args; auto quota = grpc_core::ResourceQuota::Default(); @@ -162,8 +163,8 @@ TEST(PosixEventEngineTest, IndefiniteConnectTimeoutOrRstTest) { EXPECT_EQ(status.status().code(), absl::StatusCode::kUnknown); signal.Notify(); }, - URIToResolvedAddress(target_addr), config, - memory_quota->CreateMemoryAllocator("conn-1"), 3s); + *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"), + 3s); signal.WaitForNotification(); for (auto sock : sockets) { close(sock); @@ -175,10 +176,11 @@ TEST(PosixEventEngineTest, IndefiniteConnectCancellationTest) { std::string target_addr = absl::StrCat( "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); auto resolved_addr = URIToResolvedAddress(target_addr); + GPR_ASSERT(resolved_addr.ok()); std::shared_ptr posix_ee = std::make_shared(); std::string resolved_addr_str = - ResolvedAddressToNormalizedString(resolved_addr).value(); - auto sockets = CreateConnectedSockets(resolved_addr); + ResolvedAddressToNormalizedString(*resolved_addr).value(); + auto sockets = CreateConnectedSockets(*resolved_addr); grpc_core::ChannelArgs args; auto quota = grpc_core::ResourceQuota::Default(); args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); @@ -189,8 +191,8 @@ TEST(PosixEventEngineTest, IndefiniteConnectCancellationTest) { FAIL() << "The on_connect callback should not have run since the " "connection attempt was cancelled."; }, - URIToResolvedAddress(target_addr), config, - memory_quota->CreateMemoryAllocator("conn-2"), 3s); + *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-2"), + 3s); if (connection_handle.keys[0] > 0) { ASSERT_TRUE(posix_ee->CancelConnect(connection_handle)); } diff --git a/test/core/event_engine/test_suite/client_test.cc b/test/core/event_engine/test_suite/client_test.cc index 7c998bcd3cd..6a39da640e2 100644 --- a/test/core/event_engine/test_suite/client_test.cc +++ b/test/core/event_engine/test_suite/client_test.cc @@ -39,6 +39,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/gprpp/notification.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/resource_quota/memory_quota.h" @@ -83,7 +84,7 @@ TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) { ASSERT_FALSE(status.ok()); signal.Notify(); }, - URIToResolvedAddress(target_addr), config, + *URIToResolvedAddress(target_addr), config, memory_quota->CreateMemoryAllocator("conn-1"), 24h); signal.WaitForNotification(); WaitForSingleOwner(std::move(test_ee)); @@ -101,6 +102,8 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { auto memory_quota = std::make_unique("bar"); std::string target_addr = absl::StrCat( "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); + auto resolved_addr = URIToResolvedAddress(target_addr); + GPR_ASSERT(resolved_addr.ok()); std::unique_ptr client_endpoint; std::unique_ptr server_endpoint; grpc_core::Notification client_signal; @@ -123,7 +126,7 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { [](absl::Status status) { GPR_ASSERT(status.ok()); }, config, std::make_unique("foo")); - ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + ASSERT_TRUE(listener->Bind(*resolved_addr).ok()); ASSERT_TRUE(listener->Start().ok()); test_ee->Connect( @@ -133,8 +136,8 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { client_endpoint = std::move(*endpoint); client_signal.Notify(); }, - URIToResolvedAddress(target_addr), config, - memory_quota->CreateMemoryAllocator("conn-1"), 24h); + *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"), + 24h); client_signal.WaitForNotification(); server_signal.WaitForNotification(); @@ -195,7 +198,7 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { for (int i = 0; i < kNumListenerAddresses; i++) { std::string target_addr = absl::StrCat( "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); - ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + ASSERT_TRUE(listener->Bind(*URIToResolvedAddress(target_addr)).ok()); target_addrs.push_back(target_addr); } ASSERT_TRUE(listener->Start().ok()); @@ -217,7 +220,7 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { client_endpoint = std::move(*endpoint); client_signal.Notify(); }, - URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), + *URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), client_config, memory_quota->CreateMemoryAllocator( absl::StrCat("conn-", std::to_string(i))), diff --git a/test/core/event_engine/test_suite/event_engine_test_utils.cc b/test/core/event_engine/test_suite/event_engine_test_utils.cc index d6eeafff2d4..79aa00c07e5 100644 --- a/test/core/event_engine/test_suite/event_engine_test_utils.cc +++ b/test/core/event_engine/test_suite/event_engine_test_utils.cc @@ -35,12 +35,10 @@ #include #include -#include "src/core/lib/address_utils/parse_address.h" #include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/gprpp/notification.h" -#include "src/core/lib/iomgr/resolved_address.h" #include "src/core/lib/resource_quota/memory_quota.h" -#include "src/core/lib/uri/uri_parser.h" // IWYU pragma: no_include @@ -84,19 +82,6 @@ void WaitForSingleOwner(std::shared_ptr&& engine) { } } -EventEngine::ResolvedAddress URIToResolvedAddress(std::string address_str) { - grpc_resolved_address addr; - absl::StatusOr uri = grpc_core::URI::Parse(address_str); - if (!uri.ok()) { - gpr_log(GPR_ERROR, "Failed to parse. Error: %s", - uri.status().ToString().c_str()); - GPR_ASSERT(uri.ok()); - } - GPR_ASSERT(grpc_parse_uri(*uri, &addr)); - return EventEngine::ResolvedAddress( - reinterpret_cast(addr.addr), addr.len); -} - void AppendStringToSliceBuffer(SliceBuffer* buf, std::string data) { buf->Append(Slice::FromCopiedString(data)); } @@ -201,7 +186,7 @@ absl::Status ConnectionManager::BindAndStartListener( std::shared_ptr listener((*status).release()); for (auto& addr : addrs) { - auto bind_status = listener->Bind(URIToResolvedAddress(addr)); + auto bind_status = listener->Bind(*URIToResolvedAddress(addr)); if (!bind_status.ok()) { gpr_log(GPR_ERROR, "Binding listener failed: %s", bind_status.status().ToString().c_str()); @@ -239,7 +224,7 @@ ConnectionManager::CreateConnection(std::string target_addr, last_in_progress_connection_.SetClientEndpoint(std::move(*status)); } }, - URIToResolvedAddress(target_addr), config, + *URIToResolvedAddress(target_addr), config, memory_quota_->CreateMemoryAllocator(conn_name), timeout); auto client_endpoint = last_in_progress_connection_.GetClientEndpoint(); diff --git a/test/core/event_engine/test_suite/event_engine_test_utils.h b/test/core/event_engine/test_suite/event_engine_test_utils.h index 58a6836ab74..2da9acfacb4 100644 --- a/test/core/event_engine/test_suite/event_engine_test_utils.h +++ b/test/core/event_engine/test_suite/event_engine_test_utils.h @@ -43,8 +43,6 @@ void AppendStringToSliceBuffer(SliceBuffer* buf, std::string data); std::string ExtractSliceBufferIntoString(SliceBuffer* buf); -EventEngine::ResolvedAddress URIToResolvedAddress(std::string address_str); - // Returns a random message with bounded length. std::string GetNextSendMessage(); diff --git a/test/core/event_engine/test_suite/server_test.cc b/test/core/event_engine/test_suite/server_test.cc index 84c322da9b8..ae0636b1384 100644 --- a/test/core/event_engine/test_suite/server_test.cc +++ b/test/core/event_engine/test_suite/server_test.cc @@ -35,9 +35,11 @@ #include #include #include +#include #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/gprpp/notification.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/resource_quota/memory_quota.h" @@ -75,6 +77,8 @@ TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) { auto memory_quota = std::make_unique("bar"); std::string target_addr = absl::StrCat( "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); + auto resolved_addr = URIToResolvedAddress(target_addr); + GPR_ASSERT(resolved_addr.ok()); std::unique_ptr client_endpoint; std::unique_ptr server_endpoint; grpc_core::Notification client_signal; @@ -96,7 +100,7 @@ TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) { std::move(accept_cb), [](absl::Status /*status*/) {}, config, std::make_unique("foo")); - ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + ASSERT_TRUE(listener->Bind(*resolved_addr).ok()); ASSERT_TRUE(listener->Start().ok()); oracle_ee->Connect( @@ -106,8 +110,8 @@ TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) { client_endpoint = std::move(*endpoint); client_signal.Notify(); }, - URIToResolvedAddress(target_addr), config, - memory_quota->CreateMemoryAllocator("conn-1"), 24h); + *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"), + 24h); client_signal.WaitForNotification(); server_signal.WaitForNotification(); @@ -169,7 +173,7 @@ TEST_F(EventEngineServerTest, for (int i = 0; i < kNumListenerAddresses; i++) { std::string target_addr = absl::StrCat( "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); - ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + ASSERT_TRUE(listener->Bind(*URIToResolvedAddress(target_addr)).ok()); target_addrs.push_back(target_addr); } ASSERT_TRUE(listener->Start().ok()); @@ -191,7 +195,7 @@ TEST_F(EventEngineServerTest, client_endpoint = std::move(*endpoint); client_signal.Notify(); }, - URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), + *URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), client_config, memory_quota->CreateMemoryAllocator( absl::StrCat("conn-", std::to_string(i))), diff --git a/test/core/iomgr/BUILD b/test/core/iomgr/BUILD index 830871407e3..266d9425674 100644 --- a/test/core/iomgr/BUILD +++ b/test/core/iomgr/BUILD @@ -61,7 +61,10 @@ grpc_cc_test( srcs = ["endpoint_pair_test.cc"], external_deps = ["gtest"], language = "C++", - tags = ["endpoint_test"], + tags = [ + "endpoint_test", + "event_engine_client_test", + ], deps = [ ":endpoint_tests", "//:gpr", @@ -245,6 +248,7 @@ grpc_cc_test( language = "C++", tags = [ "endpoint_test", + "event_engine_client_test", "no_windows", ], deps = [ diff --git a/test/core/iomgr/endpoint_pair_test.cc b/test/core/iomgr/endpoint_pair_test.cc index 9c06c6cbce5..d454fa24992 100644 --- a/test/core/iomgr/endpoint_pair_test.cc +++ b/test/core/iomgr/endpoint_pair_test.cc @@ -18,18 +18,94 @@ #include "src/core/lib/iomgr/endpoint_pair.h" +#include + #include +#include #include #include #include #include +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/event_engine/default_event_engine.h" +#include "src/core/lib/event_engine/shim.h" +#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/crash.h" +#include "src/core/lib/gprpp/notification.h" +#include "src/core/lib/iomgr/event_engine_shims/endpoint.h" +#include "src/core/lib/resource_quota/memory_quota.h" #include "test/core/iomgr/endpoint_tests.h" +#include "test/core/util/port.h" #include "test/core/util/test_config.h" +using namespace std::chrono_literals; + +namespace { + +using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig; +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::URIToResolvedAddress; + +grpc_endpoint_pair grpc_iomgr_event_engine_shim_endpoint_pair( + grpc_channel_args* c_args) { + grpc_core::ExecCtx ctx; + grpc_endpoint_pair p; + auto ee = grpc_event_engine::experimental::GetDefaultEventEngine(); + auto memory_quota = std::make_unique("bar"); + std::string target_addr = absl::StrCat( + "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); + auto resolved_addr = URIToResolvedAddress(target_addr); + GPR_ASSERT(resolved_addr.ok()); + std::unique_ptr client_endpoint; + std::unique_ptr server_endpoint; + grpc_core::Notification client_signal; + grpc_core::Notification server_signal; + + EventEngine::Listener::AcceptCallback accept_cb = + [&server_endpoint, &server_signal]( + std::unique_ptr ep, + grpc_core::MemoryAllocator /*memory_allocator*/) { + server_endpoint = std::move(ep); + server_signal.Notify(); + }; + + auto args = grpc_core::CoreConfiguration::Get() + .channel_args_preconditioning() + .PreconditionChannelArgs(c_args); + ChannelArgsEndpointConfig config(args); + auto listener = *ee->CreateListener( + std::move(accept_cb), [](absl::Status /*status*/) {}, config, + std::make_unique("foo")); + + GPR_ASSERT(listener->Bind(*resolved_addr).ok()); + GPR_ASSERT(listener->Start().ok()); + + ee->Connect( + [&client_endpoint, &client_signal]( + absl::StatusOr> endpoint) { + GPR_ASSERT(endpoint.ok()); + client_endpoint = std::move(*endpoint); + client_signal.Notify(); + }, + *resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"), + 24h); + + client_signal.WaitForNotification(); + server_signal.WaitForNotification(); + + p.client = grpc_event_engine::experimental::grpc_event_engine_endpoint_create( + std::move(client_endpoint)); + p.server = grpc_event_engine::experimental::grpc_event_engine_endpoint_create( + std::move(server_endpoint)); + return p; +} + +} // namespace + static gpr_mu* g_mu; static grpc_pollset* g_pollset; @@ -44,7 +120,12 @@ static grpc_endpoint_test_fixture create_fixture_endpoint_pair( a[0].type = GRPC_ARG_INTEGER; a[0].value.integer = static_cast(slice_size); grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - grpc_endpoint_pair p = grpc_iomgr_create_endpoint_pair("test", &args); + grpc_endpoint_pair p; + if (grpc_event_engine::experimental::UseEventEngineClient()) { + p = grpc_iomgr_event_engine_shim_endpoint_pair(&args); + } else { + p = grpc_iomgr_create_endpoint_pair("test", &args); + } f.client_ep = p.client; f.server_ep = p.server; grpc_endpoint_add_to_pollset(f.client_ep, g_pollset); diff --git a/test/core/iomgr/endpoint_tests.cc b/test/core/iomgr/endpoint_tests.cc index 77d36c82c60..1ff46da9765 100644 --- a/test/core/iomgr/endpoint_tests.cc +++ b/test/core/iomgr/endpoint_tests.cc @@ -30,6 +30,7 @@ #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/time.h" +#include "src/core/lib/iomgr/error.h" #include "src/core/lib/slice/slice_internal.h" #include "test/core/util/test_config.h" @@ -129,26 +130,34 @@ static void read_scheduler(void* data, grpc_error_handle /* error */) { /*urgent=*/false, /*min_progress_size=*/1); } +static void read_and_write_test_read_handler_read_done( + read_and_write_test_state* state, int read_done_state) { + gpr_log(GPR_DEBUG, "Read handler done"); + gpr_mu_lock(g_mu); + state->read_done = read_done_state; + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)); + gpr_mu_unlock(g_mu); +} + static void read_and_write_test_read_handler(void* data, grpc_error_handle error) { struct read_and_write_test_state* state = static_cast(data); - + if (!error.ok()) { + read_and_write_test_read_handler_read_done(state, 1); + return; + } state->bytes_read += count_slices( state->incoming.slices, state->incoming.count, &state->current_read_data); - if (state->bytes_read == state->target_bytes || !error.ok()) { - gpr_log(GPR_DEBUG, "Read handler done"); - gpr_mu_lock(g_mu); - state->read_done = 1 + (error.ok()); - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)); - gpr_mu_unlock(g_mu); - } else if (error.ok()) { - // We perform many reads one after another. If grpc_endpoint_read and the - // read_handler are both run inline, we might end up growing the stack - // beyond the limit. Schedule the read on ExecCtx to avoid this. - grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->read_scheduler, - absl::OkStatus()); + if (state->bytes_read == state->target_bytes) { + read_and_write_test_read_handler_read_done(state, 2); + return; } + // We perform many reads one after another. If grpc_endpoint_read and the + // read_handler are both run inline, we might end up growing the stack + // beyond the limit. Schedule the read on ExecCtx to avoid this. + grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->read_scheduler, + absl::OkStatus()); } static void write_scheduler(void* data, grpc_error_handle /* error */) { diff --git a/test/core/util/BUILD b/test/core/util/BUILD index dfd9a925b2c..9207d0d0f07 100644 --- a/test/core/util/BUILD +++ b/test/core/util/BUILD @@ -304,6 +304,7 @@ grpc_cc_library( "//:grpc", "//:grpc_client_channel", "//:orphanable", + "//:parse_address", "//:ref_counted_ptr", "//:server_address", "//:uri_parser", diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 5a03968ae3a..1ac978c8d07 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2066,6 +2066,9 @@ src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h \ src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc \ src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h \ src/core/lib/event_engine/resolved_address.cc \ +src/core/lib/event_engine/resolved_address_internal.h \ +src/core/lib/event_engine/shim.cc \ +src/core/lib/event_engine/shim.h \ src/core/lib/event_engine/slice.cc \ src/core/lib/event_engine/slice_buffer.cc \ src/core/lib/event_engine/socket_notifier.h \ @@ -2233,6 +2236,10 @@ src/core/lib/iomgr/ev_poll_posix.h \ src/core/lib/iomgr/ev_posix.cc \ src/core/lib/iomgr/ev_posix.h \ src/core/lib/iomgr/ev_windows.cc \ +src/core/lib/iomgr/event_engine_shims/endpoint.cc \ +src/core/lib/iomgr/event_engine_shims/endpoint.h \ +src/core/lib/iomgr/event_engine_shims/tcp_client.cc \ +src/core/lib/iomgr/event_engine_shims/tcp_client.h \ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/exec_ctx.h \ src/core/lib/iomgr/executor.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 9a94bf96246..8a6f4888d77 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1845,6 +1845,9 @@ src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h \ src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc \ src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h \ src/core/lib/event_engine/resolved_address.cc \ +src/core/lib/event_engine/resolved_address_internal.h \ +src/core/lib/event_engine/shim.cc \ +src/core/lib/event_engine/shim.h \ src/core/lib/event_engine/slice.cc \ src/core/lib/event_engine/slice_buffer.cc \ src/core/lib/event_engine/socket_notifier.h \ @@ -2015,6 +2018,10 @@ src/core/lib/iomgr/ev_poll_posix.h \ src/core/lib/iomgr/ev_posix.cc \ src/core/lib/iomgr/ev_posix.h \ src/core/lib/iomgr/ev_windows.cc \ +src/core/lib/iomgr/event_engine_shims/endpoint.cc \ +src/core/lib/iomgr/event_engine_shims/endpoint.h \ +src/core/lib/iomgr/event_engine_shims/tcp_client.cc \ +src/core/lib/iomgr/event_engine_shims/tcp_client.h \ src/core/lib/iomgr/exec_ctx.cc \ src/core/lib/iomgr/exec_ctx.h \ src/core/lib/iomgr/executor.cc \