[EventEngine] Modify iomgr to allow creation of event engine clients and client side endpoints (#31661)

* [WIP] EventEngine iomgr endpoint shims

* [WIP] EventEngine::Endpoint iomgr shims for the PosixEventEngine

* Util functions to help with posix engine listener implementation

* sanity

* update comments in posix_engine_listener_utils.h

* review comments

* iwyu

* revert prev commit

* iwyu

* update build

* update

* regenerate projects

* regenerate projects

* minor fixes

* update BUILD

* sanity

* update build

* regenerate projects

* fix unused parameter

* sanity

* update

* sanity

* regenerate_projects

* remove unused variable

* start

* update

* regenerate_projects

* sanity

* update

* fixes

* update

* regenerate_projects

* update

* fix sanity and msan failure

* more fixes

* build failure

* update

* fix

* sanity

* fixes

* update

* regenerate projects

* fix sanity

* review comments

* An EventEngine subclass to be implemented by all posix based event engines

* sanity

* comments

* update

* review comments

* re-word

* fix

* update

* review comments

* regenerate projects

* syntax fix

* add lock free event benchmark

* releasable mutex lock

* fix build isue

* update

* start

* regenerate projects

* update

* fix

* windows build

* update

* windows portability issue

* update

* update

* update

* update

* format

* update

* update

* update

* update

* update

* fix sanity

* regenerate projects

* update

* iwyu

* Fix resolved address length related bugs in tcp_socket_utils and listener_utils

* iwyu

* cleanup src/core/lib/event_engine/tcp_socket_utils.cc

* iwyu

* fix

* regenerate projects

* fix sanity

* re-write endpoint shim

* more re-write

* cleanup

* update

* review comments

* build issue

* more build issue fixes plus adding event_engine_trace

* even more build issue fixes

* iwyu

* add static_cast

* update

* remove redundant code

* update

* deduplicate

* iwyu

* Fix review comments and regenerate_projects

* sanity

* review comments

* fix include guards

Co-authored-by: AJ Heller <hork@google.com>
pull/32174/head
Vignesh Babu 2 years ago committed by GitHub
parent 2f0734272d
commit 8cf04e9a54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 54
      BUILD
  2. 12
      CMakeLists.txt
  3. 6
      Makefile
  4. 31
      build_autogenerated.yaml
  5. 4
      config.m4
  6. 4
      config.w32
  7. 8
      gRPC-C++.podspec
  8. 11
      gRPC-Core.podspec
  9. 7
      grpc.gemspec
  10. 9
      grpc.gyp
  11. 12
      include/grpc/event_engine/slice_buffer.h
  12. 7
      package.xml
  13. 30
      src/core/BUILD
  14. 5
      src/core/lib/event_engine/posix_engine/posix_endpoint.h
  15. 12
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  16. 19
      src/core/lib/event_engine/resolved_address.cc
  17. 34
      src/core/lib/event_engine/resolved_address_internal.h
  18. 36
      src/core/lib/event_engine/shim.cc
  19. 27
      src/core/lib/event_engine/shim.h
  20. 16
      src/core/lib/event_engine/tcp_socket_utils.cc
  21. 5
      src/core/lib/event_engine/tcp_socket_utils.h
  22. 388
      src/core/lib/iomgr/event_engine_shims/endpoint.cc
  23. 34
      src/core/lib/iomgr/event_engine_shims/endpoint.h
  24. 82
      src/core/lib/iomgr/event_engine_shims/tcp_client.cc
  25. 44
      src/core/lib/iomgr/event_engine_shims/tcp_client.h
  26. 12
      src/core/lib/iomgr/tcp_client_posix.cc
  27. 3
      src/python/grpcio/grpc_core_dependencies.py
  28. 9
      test/core/event_engine/posix/posix_endpoint_test.cc
  29. 18
      test/core/event_engine/posix/posix_event_engine_connect_test.cc
  30. 15
      test/core/event_engine/test_suite/client_test.cc
  31. 21
      test/core/event_engine/test_suite/event_engine_test_utils.cc
  32. 2
      test/core/event_engine/test_suite/event_engine_test_utils.h
  33. 14
      test/core/event_engine/test_suite/server_test.cc
  34. 6
      test/core/iomgr/BUILD
  35. 83
      test/core/iomgr/endpoint_pair_test.cc
  36. 27
      test/core/iomgr/endpoint_tests.cc
  37. 1
      test/core/util/BUILD
  38. 7
      tools/doxygen/Doxyfile.c++.internal
  39. 7
      tools/doxygen/Doxyfile.core.internal

54
BUILD

@ -1173,7 +1173,6 @@ grpc_cc_library(
grpc_cc_library(
name = "grpc_base",
srcs = [
"//src/core:lib/address_utils/parse_address.cc",
"//src/core:lib/channel/channel_stack.cc",
"//src/core:lib/channel/channel_stack_builder_impl.cc",
"//src/core:lib/channel/channel_trace.cc",
@ -1204,8 +1203,6 @@ grpc_cc_library(
"//src/core:lib/iomgr/gethostname_fallback.cc",
"//src/core:lib/iomgr/gethostname_host_name_max.cc",
"//src/core:lib/iomgr/gethostname_sysconf.cc",
"//src/core:lib/iomgr/grpc_if_nametoindex_posix.cc",
"//src/core:lib/iomgr/grpc_if_nametoindex_unsupported.cc",
"//src/core:lib/iomgr/internal_errqueue.cc",
"//src/core:lib/iomgr/iocp_windows.cc",
"//src/core:lib/iomgr/iomgr.cc",
@ -1273,9 +1270,15 @@ grpc_cc_library(
"//src/core:lib/transport/timeout_encoding.cc",
"//src/core:lib/transport/transport.cc",
"//src/core:lib/transport/transport_op_string.cc",
] +
# TODO(vigneshbabu): remove these
# These headers used to be vended by this target, but they have to be
# removed after landing event engine.
[
"//src/core:lib/iomgr/event_engine_shims/endpoint.cc",
"//src/core:lib/iomgr/event_engine_shims/tcp_client.cc",
],
hdrs = [
"//src/core:lib/address_utils/parse_address.h",
"//src/core:lib/channel/call_finalization.h",
"//src/core:lib/channel/call_tracer.h",
"//src/core:lib/channel/channel_stack.h",
@ -1303,7 +1306,6 @@ grpc_cc_library(
"//src/core:lib/iomgr/ev_poll_posix.h",
"//src/core:lib/iomgr/ev_posix.h",
"//src/core:lib/iomgr/gethostname.h",
"//src/core:lib/iomgr/grpc_if_nametoindex.h",
"//src/core:lib/iomgr/internal_errqueue.h",
"//src/core:lib/iomgr/iocp_windows.h",
"//src/core:lib/iomgr/iomgr.h",
@ -1358,6 +1360,13 @@ grpc_cc_library(
"//src/core:lib/transport/timeout_encoding.h",
"//src/core:lib/transport/transport.h",
"//src/core:lib/transport/transport_impl.h",
] +
# TODO(vigneshbabu): remove these
# These headers used to be vended by this target, but they have to be
# removed after landing event engine.
[
"//src/core:lib/iomgr/event_engine_shims/endpoint.h",
"//src/core:lib/iomgr/event_engine_shims/tcp_client.h",
],
defines = select({
"systemd": ["HAVE_LIBSYSTEMD"],
@ -1399,6 +1408,7 @@ grpc_cc_library(
"grpc_trace",
"iomgr_timer",
"orphanable",
"parse_address",
"promise",
"ref_counted_ptr",
"sockaddr_utils",
@ -1422,6 +1432,10 @@ grpc_cc_library(
"//src/core:default_event_engine",
"//src/core:dual_ref_counted",
"//src/core:error",
"//src/core:event_engine_common",
"//src/core:event_engine_shim",
"//src/core:event_engine_tcp_socket_utils",
"//src/core:event_engine_trace",
"//src/core:event_log",
"//src/core:experiments",
"//src/core:gpr_atm",
@ -1442,6 +1456,7 @@ grpc_cc_library(
"//src/core:pipe",
"//src/core:poll",
"//src/core:pollset_set",
"//src/core:posix_event_engine_base_hdrs",
"//src/core:promise_status",
"//src/core:ref_counted",
"//src/core:resolved_address",
@ -2484,6 +2499,34 @@ grpc_cc_library(
deps = ["gpr"],
)
grpc_cc_library(
name = "parse_address",
srcs = [
"//src/core:lib/address_utils/parse_address.cc",
"//src/core:lib/iomgr/grpc_if_nametoindex_posix.cc",
"//src/core:lib/iomgr/grpc_if_nametoindex_unsupported.cc",
],
hdrs = [
"//src/core:lib/address_utils/parse_address.h",
"//src/core:lib/iomgr/grpc_if_nametoindex.h",
],
external_deps = [
"absl/status",
"absl/status:statusor",
"absl/strings",
],
visibility = ["@grpc:alt_grpc_base_legacy"],
deps = [
"gpr",
"uri_parser",
"//src/core:error",
"//src/core:grpc_sockaddr",
"//src/core:iomgr_port",
"//src/core:resolved_address",
"//src/core:status_helper",
],
)
grpc_cc_library(
name = "backoff",
srcs = [
@ -2811,6 +2854,7 @@ grpc_cc_library(
"grpc_trace",
"iomgr_timer",
"orphanable",
"parse_address",
"ref_counted_ptr",
"server_address",
"sockaddr_utils",

12
CMakeLists.txt generated

@ -2175,6 +2175,7 @@ add_library(grpc
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
src/core/lib/event_engine/resolved_address.cc
src/core/lib/event_engine/shim.cc
src/core/lib/event_engine/slice.cc
src/core/lib/event_engine/slice_buffer.cc
src/core/lib/event_engine/tcp_socket_utils.cc
@ -2215,6 +2216,8 @@ add_library(grpc
src/core/lib/iomgr/ev_poll_posix.cc
src/core/lib/iomgr/ev_posix.cc
src/core/lib/iomgr/ev_windows.cc
src/core/lib/iomgr/event_engine_shims/endpoint.cc
src/core/lib/iomgr/event_engine_shims/tcp_client.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/fork_posix.cc
@ -2848,6 +2851,7 @@ add_library(grpc_unsecure
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
src/core/lib/event_engine/resolved_address.cc
src/core/lib/event_engine/shim.cc
src/core/lib/event_engine/slice.cc
src/core/lib/event_engine/slice_buffer.cc
src/core/lib/event_engine/tcp_socket_utils.cc
@ -2887,6 +2891,8 @@ add_library(grpc_unsecure
src/core/lib/iomgr/ev_poll_posix.cc
src/core/lib/iomgr/ev_posix.cc
src/core/lib/iomgr/ev_windows.cc
src/core/lib/iomgr/event_engine_shims/endpoint.cc
src/core/lib/iomgr/event_engine_shims/tcp_client.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/fork_posix.cc
@ -4336,6 +4342,7 @@ add_library(grpc_authorization_provider
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
src/core/lib/event_engine/resolved_address.cc
src/core/lib/event_engine/shim.cc
src/core/lib/event_engine/slice.cc
src/core/lib/event_engine/slice_buffer.cc
src/core/lib/event_engine/tcp_socket_utils.cc
@ -4372,6 +4379,8 @@ add_library(grpc_authorization_provider
src/core/lib/iomgr/ev_poll_posix.cc
src/core/lib/iomgr/ev_posix.cc
src/core/lib/iomgr/ev_windows.cc
src/core/lib/iomgr/event_engine_shims/endpoint.cc
src/core/lib/iomgr/event_engine_shims/tcp_client.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/fork_posix.cc
@ -11396,6 +11405,7 @@ add_executable(frame_test
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
src/core/lib/event_engine/resolved_address.cc
src/core/lib/event_engine/shim.cc
src/core/lib/event_engine/slice.cc
src/core/lib/event_engine/slice_buffer.cc
src/core/lib/event_engine/tcp_socket_utils.cc
@ -11432,6 +11442,8 @@ add_executable(frame_test
src/core/lib/iomgr/ev_poll_posix.cc
src/core/lib/iomgr/ev_posix.cc
src/core/lib/iomgr/ev_windows.cc
src/core/lib/iomgr/event_engine_shims/endpoint.cc
src/core/lib/iomgr/event_engine_shims/tcp_client.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/executor.cc
src/core/lib/iomgr/fork_posix.cc

6
Makefile generated

@ -1438,6 +1438,7 @@ LIBGRPC_SRC = \
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc \
src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/shim.cc \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/tcp_socket_utils.cc \
@ -1478,6 +1479,8 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/ev_poll_posix.cc \
src/core/lib/iomgr/ev_posix.cc \
src/core/lib/iomgr/ev_windows.cc \
src/core/lib/iomgr/event_engine_shims/endpoint.cc \
src/core/lib/iomgr/event_engine_shims/tcp_client.cc \
src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/fork_posix.cc \
@ -1970,6 +1973,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc \
src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/shim.cc \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/tcp_socket_utils.cc \
@ -2009,6 +2013,8 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/ev_poll_posix.cc \
src/core/lib/iomgr/ev_posix.cc \
src/core/lib/iomgr/ev_windows.cc \
src/core/lib/iomgr/event_engine_shims/endpoint.cc \
src/core/lib/iomgr/event_engine_shims/tcp_client.cc \
src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/fork_posix.cc \

@ -801,6 +801,8 @@ libs:
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h
- src/core/lib/event_engine/resolved_address_internal.h
- src/core/lib/event_engine/shim.h
- src/core/lib/event_engine/socket_notifier.h
- src/core/lib/event_engine/tcp_socket_utils.h
- src/core/lib/event_engine/thread_pool.h
@ -860,6 +862,8 @@ libs:
- src/core/lib/iomgr/ev_epoll1_linux.h
- src/core/lib/iomgr/ev_poll_posix.h
- src/core/lib/iomgr/ev_posix.h
- src/core/lib/iomgr/event_engine_shims/endpoint.h
- src/core/lib/iomgr/event_engine_shims/tcp_client.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/gethostname.h
@ -1562,6 +1566,7 @@ libs:
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
- src/core/lib/event_engine/resolved_address.cc
- src/core/lib/event_engine/shim.cc
- src/core/lib/event_engine/slice.cc
- src/core/lib/event_engine/slice_buffer.cc
- src/core/lib/event_engine/tcp_socket_utils.cc
@ -1602,6 +1607,8 @@ libs:
- src/core/lib/iomgr/ev_poll_posix.cc
- src/core/lib/iomgr/ev_posix.cc
- src/core/lib/iomgr/ev_windows.cc
- src/core/lib/iomgr/event_engine_shims/endpoint.cc
- src/core/lib/iomgr/event_engine_shims/tcp_client.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/fork_posix.cc
@ -2116,6 +2123,8 @@ libs:
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h
- src/core/lib/event_engine/resolved_address_internal.h
- src/core/lib/event_engine/shim.h
- src/core/lib/event_engine/socket_notifier.h
- src/core/lib/event_engine/tcp_socket_utils.h
- src/core/lib/event_engine/thread_pool.h
@ -2174,6 +2183,8 @@ libs:
- src/core/lib/iomgr/ev_epoll1_linux.h
- src/core/lib/iomgr/ev_poll_posix.h
- src/core/lib/iomgr/ev_posix.h
- src/core/lib/iomgr/event_engine_shims/endpoint.h
- src/core/lib/iomgr/event_engine_shims/tcp_client.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/gethostname.h
@ -2497,6 +2508,7 @@ libs:
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
- src/core/lib/event_engine/resolved_address.cc
- src/core/lib/event_engine/shim.cc
- src/core/lib/event_engine/slice.cc
- src/core/lib/event_engine/slice_buffer.cc
- src/core/lib/event_engine/tcp_socket_utils.cc
@ -2536,6 +2548,8 @@ libs:
- src/core/lib/iomgr/ev_poll_posix.cc
- src/core/lib/iomgr/ev_posix.cc
- src/core/lib/iomgr/ev_windows.cc
- src/core/lib/iomgr/event_engine_shims/endpoint.cc
- src/core/lib/iomgr/event_engine_shims/tcp_client.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/fork_posix.cc
@ -3557,6 +3571,8 @@ libs:
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h
- src/core/lib/event_engine/resolved_address_internal.h
- src/core/lib/event_engine/shim.h
- src/core/lib/event_engine/socket_notifier.h
- src/core/lib/event_engine/tcp_socket_utils.h
- src/core/lib/event_engine/thread_pool.h
@ -3611,6 +3627,8 @@ libs:
- src/core/lib/iomgr/ev_epoll1_linux.h
- src/core/lib/iomgr/ev_poll_posix.h
- src/core/lib/iomgr/ev_posix.h
- src/core/lib/iomgr/event_engine_shims/endpoint.h
- src/core/lib/iomgr/event_engine_shims/tcp_client.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/gethostname.h
@ -3816,6 +3834,7 @@ libs:
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
- src/core/lib/event_engine/resolved_address.cc
- src/core/lib/event_engine/shim.cc
- src/core/lib/event_engine/slice.cc
- src/core/lib/event_engine/slice_buffer.cc
- src/core/lib/event_engine/tcp_socket_utils.cc
@ -3852,6 +3871,8 @@ libs:
- src/core/lib/iomgr/ev_poll_posix.cc
- src/core/lib/iomgr/ev_posix.cc
- src/core/lib/iomgr/ev_windows.cc
- src/core/lib/iomgr/event_engine_shims/endpoint.cc
- src/core/lib/iomgr/event_engine_shims/tcp_client.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/fork_posix.cc
@ -7339,6 +7360,8 @@ targets:
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h
- src/core/lib/event_engine/resolved_address_internal.h
- src/core/lib/event_engine/shim.h
- src/core/lib/event_engine/socket_notifier.h
- src/core/lib/event_engine/tcp_socket_utils.h
- src/core/lib/event_engine/thread_pool.h
@ -7393,6 +7416,8 @@ targets:
- src/core/lib/iomgr/ev_epoll1_linux.h
- src/core/lib/iomgr/ev_poll_posix.h
- src/core/lib/iomgr/ev_posix.h
- src/core/lib/iomgr/event_engine_shims/endpoint.h
- src/core/lib/iomgr/event_engine_shims/tcp_client.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/executor.h
- src/core/lib/iomgr/gethostname.h
@ -7580,6 +7605,7 @@ targets:
- src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc
- src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc
- src/core/lib/event_engine/resolved_address.cc
- src/core/lib/event_engine/shim.cc
- src/core/lib/event_engine/slice.cc
- src/core/lib/event_engine/slice_buffer.cc
- src/core/lib/event_engine/tcp_socket_utils.cc
@ -7616,6 +7642,8 @@ targets:
- src/core/lib/iomgr/ev_poll_posix.cc
- src/core/lib/iomgr/ev_posix.cc
- src/core/lib/iomgr/ev_windows.cc
- src/core/lib/iomgr/event_engine_shims/endpoint.cc
- src/core/lib/iomgr/event_engine_shims/tcp_client.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/executor.cc
- src/core/lib/iomgr/fork_posix.cc
@ -11493,6 +11521,9 @@ targets:
language: c++
headers:
- src/core/lib/event_engine/handle_containers.h
- src/core/lib/event_engine/resolved_address_internal.h
- src/core/lib/iomgr/port.h
- src/core/lib/iomgr/resolved_address.h
- src/core/lib/slice/slice.h
- src/core/lib/slice/slice_buffer.h
- src/core/lib/slice/slice_internal.h

4
config.m4 generated

@ -520,6 +520,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc \
src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/shim.cc \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/tcp_socket_utils.cc \
@ -603,6 +604,8 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/ev_poll_posix.cc \
src/core/lib/iomgr/ev_posix.cc \
src/core/lib/iomgr/ev_windows.cc \
src/core/lib/iomgr/event_engine_shims/endpoint.cc \
src/core/lib/iomgr/event_engine_shims/tcp_client.cc \
src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/executor.cc \
src/core/lib/iomgr/fork_posix.cc \
@ -1395,6 +1398,7 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/handshaker)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/http)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/iomgr)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/iomgr/event_engine_shims)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/json)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/load_balancing)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/matchers)

4
config.w32 generated

@ -486,6 +486,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\event_engine\\posix_engine\\wakeup_fd_pipe.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\wakeup_fd_posix_default.cc " +
"src\\core\\lib\\event_engine\\resolved_address.cc " +
"src\\core\\lib\\event_engine\\shim.cc " +
"src\\core\\lib\\event_engine\\slice.cc " +
"src\\core\\lib\\event_engine\\slice_buffer.cc " +
"src\\core\\lib\\event_engine\\tcp_socket_utils.cc " +
@ -569,6 +570,8 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\iomgr\\ev_poll_posix.cc " +
"src\\core\\lib\\iomgr\\ev_posix.cc " +
"src\\core\\lib\\iomgr\\ev_windows.cc " +
"src\\core\\lib\\iomgr\\event_engine_shims\\endpoint.cc " +
"src\\core\\lib\\iomgr\\event_engine_shims\\tcp_client.cc " +
"src\\core\\lib\\iomgr\\exec_ctx.cc " +
"src\\core\\lib\\iomgr\\executor.cc " +
"src\\core\\lib\\iomgr\\fork_posix.cc " +
@ -1525,6 +1528,7 @@ if (PHP_GRPC != "no") {
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\handshaker");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\http");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\iomgr");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\iomgr\\event_engine_shims");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\json");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\load_balancing");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\matchers");

8
gRPC-C++.podspec generated

@ -751,6 +751,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h',
'src/core/lib/event_engine/resolved_address_internal.h',
'src/core/lib/event_engine/shim.h',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/thread_pool.h',
@ -835,6 +837,8 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/ev_epoll1_linux.h',
'src/core/lib/iomgr/ev_poll_posix.h',
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/event_engine_shims/endpoint.h',
'src/core/lib/iomgr/event_engine_shims/tcp_client.h',
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/gethostname.h',
@ -1667,6 +1671,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h',
'src/core/lib/event_engine/resolved_address_internal.h',
'src/core/lib/event_engine/shim.h',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/thread_pool.h',
@ -1751,6 +1757,8 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/ev_epoll1_linux.h',
'src/core/lib/iomgr/ev_poll_posix.h',
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/event_engine_shims/endpoint.h',
'src/core/lib/iomgr/event_engine_shims/tcp_client.h',
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/gethostname.h',

11
gRPC-Core.podspec generated

@ -1161,6 +1161,9 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h',
'src/core/lib/event_engine/resolved_address.cc',
'src/core/lib/event_engine/resolved_address_internal.h',
'src/core/lib/event_engine/shim.cc',
'src/core/lib/event_engine/shim.h',
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/socket_notifier.h',
@ -1328,6 +1331,10 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/ev_posix.cc',
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/ev_windows.cc',
'src/core/lib/iomgr/event_engine_shims/endpoint.cc',
'src/core/lib/iomgr/event_engine_shims/endpoint.h',
'src/core/lib/iomgr/event_engine_shims/tcp_client.cc',
'src/core/lib/iomgr/event_engine_shims/tcp_client.h',
'src/core/lib/iomgr/exec_ctx.cc',
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.cc',
@ -2339,6 +2346,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h',
'src/core/lib/event_engine/resolved_address_internal.h',
'src/core/lib/event_engine/shim.h',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/thread_pool.h',
@ -2423,6 +2432,8 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/ev_epoll1_linux.h',
'src/core/lib/iomgr/ev_poll_posix.h',
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/event_engine_shims/endpoint.h',
'src/core/lib/iomgr/event_engine_shims/tcp_client.h',
'src/core/lib/iomgr/exec_ctx.h',
'src/core/lib/iomgr/executor.h',
'src/core/lib/iomgr/gethostname.h',

7
grpc.gemspec generated

@ -1072,6 +1072,9 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc )
s.files += %w( src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h )
s.files += %w( src/core/lib/event_engine/resolved_address.cc )
s.files += %w( src/core/lib/event_engine/resolved_address_internal.h )
s.files += %w( src/core/lib/event_engine/shim.cc )
s.files += %w( src/core/lib/event_engine/shim.h )
s.files += %w( src/core/lib/event_engine/slice.cc )
s.files += %w( src/core/lib/event_engine/slice_buffer.cc )
s.files += %w( src/core/lib/event_engine/socket_notifier.h )
@ -1239,6 +1242,10 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/ev_posix.cc )
s.files += %w( src/core/lib/iomgr/ev_posix.h )
s.files += %w( src/core/lib/iomgr/ev_windows.cc )
s.files += %w( src/core/lib/iomgr/event_engine_shims/endpoint.cc )
s.files += %w( src/core/lib/iomgr/event_engine_shims/endpoint.h )
s.files += %w( src/core/lib/iomgr/event_engine_shims/tcp_client.cc )
s.files += %w( src/core/lib/iomgr/event_engine_shims/tcp_client.h )
s.files += %w( src/core/lib/iomgr/exec_ctx.cc )
s.files += %w( src/core/lib/iomgr/exec_ctx.h )
s.files += %w( src/core/lib/iomgr/executor.cc )

9
grpc.gyp generated

@ -851,6 +851,7 @@
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc',
'src/core/lib/event_engine/resolved_address.cc',
'src/core/lib/event_engine/shim.cc',
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/tcp_socket_utils.cc',
@ -891,6 +892,8 @@
'src/core/lib/iomgr/ev_poll_posix.cc',
'src/core/lib/iomgr/ev_posix.cc',
'src/core/lib/iomgr/ev_windows.cc',
'src/core/lib/iomgr/event_engine_shims/endpoint.cc',
'src/core/lib/iomgr/event_engine_shims/tcp_client.cc',
'src/core/lib/iomgr/exec_ctx.cc',
'src/core/lib/iomgr/executor.cc',
'src/core/lib/iomgr/fork_posix.cc',
@ -1325,6 +1328,7 @@
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc',
'src/core/lib/event_engine/resolved_address.cc',
'src/core/lib/event_engine/shim.cc',
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/tcp_socket_utils.cc',
@ -1364,6 +1368,8 @@
'src/core/lib/iomgr/ev_poll_posix.cc',
'src/core/lib/iomgr/ev_posix.cc',
'src/core/lib/iomgr/ev_windows.cc',
'src/core/lib/iomgr/event_engine_shims/endpoint.cc',
'src/core/lib/iomgr/event_engine_shims/tcp_client.cc',
'src/core/lib/iomgr/exec_ctx.cc',
'src/core/lib/iomgr/executor.cc',
'src/core/lib/iomgr/fork_posix.cc',
@ -1824,6 +1830,7 @@
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc',
'src/core/lib/event_engine/resolved_address.cc',
'src/core/lib/event_engine/shim.cc',
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/tcp_socket_utils.cc',
@ -1860,6 +1867,8 @@
'src/core/lib/iomgr/ev_poll_posix.cc',
'src/core/lib/iomgr/ev_posix.cc',
'src/core/lib/iomgr/ev_windows.cc',
'src/core/lib/iomgr/event_engine_shims/endpoint.cc',
'src/core/lib/iomgr/event_engine_shims/tcp_client.cc',
'src/core/lib/iomgr/exec_ctx.cc',
'src/core/lib/iomgr/executor.cc',
'src/core/lib/iomgr/fork_posix.cc',

@ -136,7 +136,19 @@ class SliceBuffer {
/// Return a pointer to the back raw grpc_slice_buffer
grpc_slice_buffer* c_slice_buffer() { return &slice_buffer_; }
// Returns a SliceBuffer that transfers slices into this new SliceBuffer,
// leaving the input parameter empty.
static SliceBuffer TakeCSliceBuffer(grpc_slice_buffer& slice_buffer) {
return SliceBuffer(&slice_buffer);
}
private:
// Transfers slices into this new SliceBuffer, leaving the parameter empty.
// Does not take ownership of the slice_buffer argument.
explicit SliceBuffer(grpc_slice_buffer* slice_buffer) {
grpc_slice_buffer_init(&slice_buffer_);
grpc_slice_buffer_swap(&slice_buffer_, slice_buffer);
}
/// The backing raw slice buffer.
grpc_slice_buffer slice_buffer_;
};

7
package.xml generated

@ -1054,6 +1054,9 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/resolved_address.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/resolved_address_internal.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/shim.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/shim.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/slice.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/slice_buffer.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/socket_notifier.h" role="src" />
@ -1221,6 +1224,10 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_posix.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_windows.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/event_engine_shims/endpoint.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/event_engine_shims/endpoint.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/event_engine_shims/tcp_client.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/event_engine_shims/tcp_client.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/exec_ctx.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/exec_ctx.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/executor.cc" role="src" />

@ -60,6 +60,7 @@ grpc_cc_library(
],
hdrs = [
"lib/event_engine/handle_containers.h",
"lib/event_engine/resolved_address_internal.h",
"//:include/grpc/event_engine/slice.h",
"//:include/grpc/event_engine/slice_buffer.h",
],
@ -70,6 +71,7 @@ grpc_cc_library(
"absl/utility",
],
deps = [
"resolved_address",
"slice",
"slice_buffer",
"slice_cast",
@ -953,6 +955,7 @@ grpc_cc_library(
"//:gpr",
"//:grpc_base",
"//:handshaker",
"//:parse_address",
"//:ref_counted_ptr",
"//:uri_parser",
],
@ -1872,11 +1875,11 @@ grpc_cc_library(
deps = [
"event_engine_common",
"event_engine_poller",
"event_engine_shim",
"event_engine_tcp_socket_utils",
"event_engine_thread_pool",
"event_engine_trace",
"event_engine_utils",
"experiments",
"init_internally",
"iomgr_port",
"posix_event_engine_base_hdrs",
@ -2011,10 +2014,12 @@ grpc_cc_library(
],
deps = [
"iomgr_port",
"resolved_address",
"status_helper",
"//:event_engine_base_hdrs",
"//:gpr",
"//:gpr_platform",
"//:parse_address",
"//:uri_parser",
],
)
@ -2034,6 +2039,21 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "event_engine_shim",
srcs = [
"lib/event_engine/shim.cc",
],
hdrs = [
"lib/event_engine/shim.h",
],
deps = [
"experiments",
"iomgr_port",
"//:gpr_platform",
],
)
# NOTE: this target gets replaced inside Google's build system to be one that
# integrates with other internal systems better. Please do not rename or fold
# this into other targets.
@ -2634,6 +2654,7 @@ grpc_cc_library(
"//:grpc_credentials_util",
"//:grpc_security_base",
"//:grpc_trace",
"//:parse_address",
"//:promise",
"//:ref_counted_ptr",
"//:uri_parser",
@ -2768,6 +2789,7 @@ grpc_cc_library(
"//:grpc_client_channel",
"//:grpc_security_base",
"//:handshaker",
"//:parse_address",
"//:promise",
"//:ref_counted_ptr",
"//:sockaddr_utils",
@ -3153,6 +3175,7 @@ grpc_cc_library(
"resolved_address",
"//:gpr",
"//:grpc_base",
"//:parse_address",
"//:sockaddr_utils",
],
)
@ -3848,6 +3871,7 @@ grpc_cc_library(
"//:grpc_trace",
"//:iomgr_timer",
"//:orphanable",
"//:parse_address",
"//:ref_counted_ptr",
"//:server_address",
"//:sockaddr_utils",
@ -3921,6 +3945,7 @@ grpc_cc_library(
"//:grpc_security_base",
"//:grpc_service_config_impl",
"//:grpc_trace",
"//:parse_address",
"//:ref_counted_ptr",
"//:sockaddr_utils",
"//:uri_parser",
@ -4630,6 +4655,7 @@ grpc_cc_library(
"//:grpc_base",
"//:grpc_public_hdrs",
"//:grpc_security_base",
"//:parse_address",
"//:promise",
"//:uri_parser",
],
@ -4731,9 +4757,9 @@ grpc_cc_library(
"resolved_address",
"//:config",
"//:gpr",
"//:grpc_base",
"//:grpc_resolver",
"//:orphanable",
"//:parse_address",
"//:server_address",
"//:uri_parser",
],

@ -684,14 +684,13 @@ class PosixEndpoint : public PosixEndpointWithFdSupport {
}
int GetWrappedFd() override {
GPR_ASSERT(false &&
grpc_core::Crash(
"PosixEndpoint::GetWrappedFd not supported on this platform");
}
void Shutdown(absl::AnyInvocable<void(absl::StatusOr<int> release_fd)>
on_release_fd) override {
GPR_ASSERT(false &&
"PosixEndpoint::Shutdown not supported on this platform");
grpc_core::Crash("PosixEndpoint::Shutdown not supported on this platform");
}
~PosixEndpoint() override = default;

@ -41,10 +41,10 @@
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/posix_engine/timer.h"
#include "src/core/lib/event_engine/shim.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/utils.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/sync.h"
@ -332,7 +332,7 @@ PosixEventEngine::PosixEventEngine(PosixEventPoller* poller)
: connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
executor_(std::make_shared<ThreadPool>()),
timer_manager_(executor_) {
if (grpc_core::IsEventEngineClientEnabled()) {
if (UseEventEngineClient()) {
poller_manager_ = std::make_shared<PosixEnginePollerManager>(poller);
}
}
@ -341,7 +341,7 @@ PosixEventEngine::PosixEventEngine()
: connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
executor_(std::make_shared<ThreadPool>()),
timer_manager_(executor_) {
if (grpc_core::IsEventEngineClientEnabled()) {
if (UseEventEngineClient()) {
poller_manager_ = std::make_shared<PosixEnginePollerManager>(executor_);
if (poller_manager_->Poller() != nullptr) {
executor_->Run([poller_manager = poller_manager_]() {
@ -539,7 +539,7 @@ EventEngine::ConnectionHandle PosixEventEngine::Connect(
OnConnectCallback on_connect, const ResolvedAddress& addr,
const EndpointConfig& args, MemoryAllocator memory_allocator,
Duration timeout) {
if (!grpc_core::IsEventEngineClientEnabled()) {
if (!UseEventEngineClient()) {
grpc_core::Crash("unimplemented");
}
#ifdef GRPC_POSIX_SOCKET_TCP
@ -564,7 +564,7 @@ std::unique_ptr<PosixEndpointWithFdSupport>
PosixEventEngine::CreatePosixEndpointFromFd(int fd,
const EndpointConfig& config,
MemoryAllocator memory_allocator) {
if (!grpc_core::IsEventEngineClientEnabled()) {
if (!UseEventEngineClient()) {
grpc_core::Crash("unimplemented");
}
#ifdef GRPC_POSIX_SOCKET_TCP
@ -613,7 +613,7 @@ PosixEventEngine::CreatePosixListener(
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
if (!grpc_core::IsEventEngineClientEnabled()) {
if (!UseEventEngineClient()) {
grpc_core::Crash("unimplemented");
}
#ifdef GRPC_POSIX_SOCKET_TCP

@ -14,11 +14,15 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/resolved_address.h"
#include <string.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/resolved_address_internal.h"
// IWYU pragma: no_include <sys/socket.h>
namespace grpc_event_engine {
@ -37,5 +41,20 @@ const struct sockaddr* EventEngine::ResolvedAddress::address() const {
socklen_t EventEngine::ResolvedAddress::size() const { return size_; }
EventEngine::ResolvedAddress CreateResolvedAddress(
const grpc_resolved_address& addr) {
return EventEngine::ResolvedAddress(
reinterpret_cast<const sockaddr*>(addr.addr), addr.len);
}
grpc_resolved_address CreateGRPCResolvedAddress(
const EventEngine::ResolvedAddress& ra) {
grpc_resolved_address grpc_addr;
memset(&grpc_addr, 0, sizeof(grpc_resolved_address));
memcpy(grpc_addr.addr, ra.address(), ra.size());
grpc_addr.len = ra.size();
return grpc_addr;
}
} // namespace experimental
} // namespace grpc_event_engine

@ -0,0 +1,34 @@
// Copyright 2022 gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_RESOLVED_ADDRESS_INTERNAL_H
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_RESOLVED_ADDRESS_INTERNAL_H
#include <grpc/support/port_platform.h>
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/iomgr/resolved_address.h"
namespace grpc_event_engine {
namespace experimental {
EventEngine::ResolvedAddress CreateResolvedAddress(
const grpc_resolved_address& addr);
grpc_resolved_address CreateGRPCResolvedAddress(
const EventEngine::ResolvedAddress& ra);
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_RESOLVED_ADDRESS_INTERNAL_H

@ -0,0 +1,36 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/shim.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/iomgr/port.h"
namespace grpc_event_engine {
namespace experimental {
bool UseEventEngineClient() {
// TODO(hork, eryu): Adjust the ifdefs accordingly when event engine's become
// available for other platforms.
#ifdef GRPC_POSIX_SOCKET_TCP
return grpc_core::IsEventEngineClientEnabled();
#else
return false;
#endif
}
} // namespace experimental
} // namespace grpc_event_engine

@ -0,0 +1,27 @@
// Copyright 2022 gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_SHIM_H
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_SHIM_H
#include <grpc/support/port_platform.h>
namespace grpc_event_engine {
namespace experimental {
bool UseEventEngineClient();
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_SHIM_H

@ -17,6 +17,7 @@
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
@ -50,6 +51,7 @@
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/uri/uri_parser.h"
namespace grpc_event_engine {
@ -368,5 +370,19 @@ absl::StatusOr<std::string> ResolvedAddressToURI(
return uri->ToString();
}
absl::StatusOr<EventEngine::ResolvedAddress> URIToResolvedAddress(
std::string address_str) {
grpc_resolved_address addr;
absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(address_str);
if (!uri.ok()) {
gpr_log(GPR_ERROR, "Failed to parse URI. Error: %s",
uri.status().ToString().c_str());
}
GRPC_RETURN_IF_ERROR(uri.status());
GPR_ASSERT(grpc_parse_uri(*uri, &addr));
return EventEngine::ResolvedAddress(
reinterpret_cast<const sockaddr*>(addr.addr), addr.len);
}
} // namespace experimental
} // namespace grpc_event_engine

@ -79,6 +79,11 @@ absl::StatusOr<std::string> ResolvedAddressToNormalizedString(
absl::StatusOr<std::string> ResolvedAddressToURI(
const EventEngine::ResolvedAddress& resolved_address);
// Given a URI string, returns the corresponding resolved address if the URI
// is valid. Otherwise it returns an appropriate error.
absl::StatusOr<EventEngine::ResolvedAddress> URIToResolvedAddress(
std::string address_str);
} // namespace experimental
} // namespace grpc_event_engine

@ -0,0 +1,388 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
#include <atomic>
#include <memory>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/slice.h>
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/transport/error_utils.h"
extern grpc_core::TraceFlag grpc_tcp_trace;
namespace grpc_event_engine {
namespace experimental {
namespace {
constexpr int64_t kShutdownBit = static_cast<int64_t>(1) << 32;
// A wrapper class to manage Event Engine endpoint ref counting and
// asynchronous shutdown.
class EventEngineEndpointWrapper {
public:
struct grpc_event_engine_endpoint {
grpc_endpoint base;
EventEngineEndpointWrapper* wrapper;
std::aligned_storage<sizeof(SliceBuffer), alignof(SliceBuffer)>::type
read_buffer;
std::aligned_storage<sizeof(SliceBuffer), alignof(SliceBuffer)>::type
write_buffer;
};
explicit EventEngineEndpointWrapper(
std::unique_ptr<EventEngine::Endpoint> endpoint);
int Fd() {
grpc_core::MutexLock lock(&mu_);
return fd_;
}
absl::string_view PeerAddress() {
grpc_core::MutexLock lock(&mu_);
return peer_address_;
}
absl::string_view LocalAddress() {
grpc_core::MutexLock lock(&mu_);
return local_address_;
}
void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
void Unref() {
if (refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
delete this;
}
}
// Returns a managed grpc_endpoint object. It retains ownership of the
// object.
grpc_endpoint* GetGrpcEndpoint() { return &eeep_->base; }
// Read using the underlying EventEngine endpoint object.
void Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
const EventEngine::Endpoint::ReadArgs* args) {
endpoint_->Read(std::move(on_read), buffer, args);
}
// Write using the underlying EventEngine endpoint object
void Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const EventEngine::Endpoint::WriteArgs* args) {
endpoint_->Write(std::move(on_writable), data, args);
}
// Returns true if the endpoint is not yet shutdown. In that case, it also
// acquires a shutdown ref. Otherwise it returns false and doesn't modify
// the shutdown ref.
bool ShutdownRef() {
int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
while (true) {
if (curr & kShutdownBit) {
return false;
}
if (shutdown_ref_.compare_exchange_strong(curr, curr + 1,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
return true;
}
}
}
// Decrement the shutdown ref. If this is the last shutdown ref, it also
// deletes the underlying event engine endpoint. Deletion of the event
// engine endpoint should trigger execution of any pending read/write
// callbacks with NOT-OK status.
void ShutdownUnref() {
if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
kShutdownBit + 1) {
OnShutdownInternal();
}
}
// If trigger shutdown is called the first time, it sets the shutdown bit
// and decrements the shutdown ref. If trigger shutdown has been called
// before or in parallel, only one of them would win the race. The other
// invocation would simply return.
void TriggerShutdown() {
int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
while (true) {
if (curr & kShutdownBit) {
return;
}
if (shutdown_ref_.compare_exchange_strong(curr, curr | kShutdownBit,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
Ref();
if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
kShutdownBit + 1) {
OnShutdownInternal();
}
return;
}
}
}
private:
void OnShutdownInternal() {
{
grpc_core::MutexLock lock(&mu_);
fd_ = -1;
local_address_ = "";
peer_address_ = "";
}
endpoint_.reset();
// For the Ref taken in TriggerShutdown
Unref();
}
std::unique_ptr<EventEngine::Endpoint> endpoint_;
std::unique_ptr<grpc_event_engine_endpoint> eeep_;
std::atomic<int64_t> refs_{1};
std::atomic<int64_t> shutdown_ref_{1};
grpc_core::Mutex mu_;
std::string peer_address_;
std::string local_address_;
int fd_{-1};
};
// Read from the endpoint and place the data in slices slice buffer. The
// provided closure is also invoked asynchronously.
void EndpointRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool /* urgent */, int min_progress_size) {
auto* eeep =
reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
ep);
if (!eeep->wrapper->ShutdownRef()) {
// Shutdown has already been triggered on the endpoint.
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError());
return;
}
eeep->wrapper->Ref();
EventEngine::Endpoint::ReadArgs read_args = {min_progress_size};
// TODO(vigneshbabu): Use SliceBufferCast<> here.
SliceBuffer* read_buffer = new (&eeep->read_buffer)
SliceBuffer(SliceBuffer::TakeCSliceBuffer(*slices));
read_buffer->Clear();
eeep->wrapper->Read(
[eeep, cb, slices](absl::Status status) {
auto* read_buffer = reinterpret_cast<SliceBuffer*>(&eeep->read_buffer);
grpc_slice_buffer_move_into(read_buffer->c_slice_buffer(), slices);
read_buffer->~SliceBuffer();
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
size_t i;
gpr_log(GPR_INFO, "TCP: %p READ (peer=%s) error=%s", eeep->wrapper,
std::string(eeep->wrapper->PeerAddress()).c_str(),
status.ToString().c_str());
if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
for (i = 0; i < slices->count; i++) {
char* dump = grpc_dump_slice(slices->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ DATA: %s", dump);
gpr_free(dump);
}
}
}
{
grpc_core::ApplicationCallbackExecCtx app_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
}
// For the ref taken in EndpointRead
eeep->wrapper->Unref();
},
read_buffer, &read_args);
eeep->wrapper->ShutdownUnref();
}
// Write the data from slices and invoke the provided closure asynchronously
// after the write is complete.
void EndpointWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, void* arg, int max_frame_size) {
auto* eeep =
reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
ep);
if (!eeep->wrapper->ShutdownRef()) {
// Shutdown has already been triggered on the endpoint.
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::CancelledError());
return;
}
eeep->wrapper->Ref();
EventEngine::Endpoint::WriteArgs write_args = {arg, max_frame_size};
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
size_t i;
gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s)", eeep->wrapper,
std::string(eeep->wrapper->PeerAddress()).c_str());
if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
for (i = 0; i < slices->count; i++) {
char* dump =
grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE DATA: %s", dump);
gpr_free(dump);
}
}
}
// TODO(vigneshbabu): Use SliceBufferCast<> here.
SliceBuffer* write_buffer = new (&eeep->write_buffer)
SliceBuffer(SliceBuffer::TakeCSliceBuffer(*slices));
eeep->wrapper->Write(
[eeep, cb](absl::Status status) {
auto* write_buffer =
reinterpret_cast<SliceBuffer*>(&eeep->write_buffer);
write_buffer->~SliceBuffer();
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s) error=%s", eeep->wrapper,
std::string(eeep->wrapper->PeerAddress()).c_str(),
status.ToString().c_str());
}
{
grpc_core::ApplicationCallbackExecCtx app_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
}
// For the ref taken in EndpointWrite
eeep->wrapper->Unref();
},
write_buffer, &write_args);
eeep->wrapper->ShutdownUnref();
}
void EndpointAddToPollset(grpc_endpoint* /* ep */,
grpc_pollset* /* pollset */) {}
void EndpointAddToPollsetSet(grpc_endpoint* /* ep */,
grpc_pollset_set* /* pollset */) {}
void EndpointDeleteFromPollsetSet(grpc_endpoint* /* ep */,
grpc_pollset_set* /* pollset */) {}
/// After shutdown, all endpoint operations except destroy are no-op,
/// and will return some kind of sane default (empty strings, nullptrs, etc).
/// It is the caller's responsibility to ensure that calls to EndpointShutdown
/// are synchronized.
void EndpointShutdown(grpc_endpoint* ep, grpc_error_handle why) {
auto* eeep =
reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
ep);
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "TCP Endpoint %p shutdown why=%s", eeep->wrapper,
why.ToString().c_str());
}
GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Shutdown:%s", eeep->wrapper,
why.ToString().c_str());
eeep->wrapper->TriggerShutdown();
}
// Attempts to free the underlying data structures.
void EndpointDestroy(grpc_endpoint* ep) {
auto* eeep =
reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
ep);
eeep->wrapper->Unref();
GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Destroy", eeep->wrapper);
}
absl::string_view EndpointGetPeerAddress(grpc_endpoint* ep) {
auto* eeep =
reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
ep);
return eeep->wrapper->PeerAddress();
}
absl::string_view EndpointGetLocalAddress(grpc_endpoint* ep) {
auto* eeep =
reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
ep);
return eeep->wrapper->LocalAddress();
}
int EndpointGetFd(grpc_endpoint* ep) {
auto* eeep =
reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
ep);
return eeep->wrapper->Fd();
}
bool EndpointCanTrackErr(grpc_endpoint* /* ep */) { return false; }
grpc_endpoint_vtable grpc_event_engine_endpoint_vtable = {
EndpointRead,
EndpointWrite,
EndpointAddToPollset,
EndpointAddToPollsetSet,
EndpointDeleteFromPollsetSet,
EndpointShutdown,
EndpointDestroy,
EndpointGetPeerAddress,
EndpointGetLocalAddress,
EndpointGetFd,
EndpointCanTrackErr};
EventEngineEndpointWrapper::EventEngineEndpointWrapper(
std::unique_ptr<EventEngine::Endpoint> endpoint)
: endpoint_(std::move(endpoint)),
eeep_(std::make_unique<grpc_event_engine_endpoint>()) {
eeep_->base.vtable = &grpc_event_engine_endpoint_vtable;
eeep_->wrapper = this;
auto local_addr = ResolvedAddressToURI(endpoint_->GetLocalAddress());
if (local_addr.ok()) {
local_address_ = *local_addr;
}
auto peer_addr = ResolvedAddressToURI(endpoint_->GetPeerAddress());
if (peer_addr.ok()) {
peer_address_ = *peer_addr;
}
#ifdef GRPC_POSIX_SOCKET_TCP
fd_ = reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
->GetWrappedFd();
#else // GRPC_POSIX_SOCKET_TCP
fd_ = -1;
#endif // GRPC_POSIX_SOCKET_TCP
GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Create", eeep_->wrapper);
}
} // namespace
grpc_endpoint* grpc_event_engine_endpoint_create(
std::unique_ptr<EventEngine::Endpoint> ee_endpoint) {
GPR_DEBUG_ASSERT(ee_endpoint != nullptr);
auto wrapper = new EventEngineEndpointWrapper(std::move(ee_endpoint));
return wrapper->GetGrpcEndpoint();
}
} // namespace experimental
} // namespace grpc_event_engine

@ -0,0 +1,34 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_ENDPOINT_H
#define GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_ENDPOINT_H
#include <grpc/support/port_platform.h>
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/iomgr/endpoint.h"
namespace grpc_event_engine {
namespace experimental {
/// Creates an internal grpc_endpoint struct from an EventEngine Endpoint.
/// Server code needs to create grpc_endpoints after the EventEngine has made
/// connections.
grpc_endpoint* grpc_event_engine_endpoint_create(
std::unique_ptr<EventEngine::Endpoint> ee_endpoint);
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_ENDPOINT_H

@ -0,0 +1,82 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/event_engine_shims/tcp_client.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/time.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/event_engine/resolved_address_internal.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/transport/error_utils.h"
namespace grpc_event_engine {
namespace experimental {
int64_t event_engine_tcp_client_connect(
grpc_closure* on_connect, grpc_endpoint** endpoint,
const grpc_event_engine::experimental::EndpointConfig& config,
const grpc_resolved_address* addr, grpc_core::Timestamp deadline) {
auto resource_quota = reinterpret_cast<grpc_core::ResourceQuota*>(
config.GetVoidPointer(GRPC_ARG_RESOURCE_QUOTA));
auto addr_uri = grpc_sockaddr_to_uri(addr);
EventEngine::ConnectionHandle handle = GetDefaultEventEngine()->Connect(
[on_connect,
endpoint](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> ep) {
grpc_core::ApplicationCallbackExecCtx app_ctx;
grpc_core::ExecCtx exec_ctx;
absl::Status conn_status = ep.ok() ? absl::OkStatus() : ep.status();
if (ep.ok()) {
*endpoint = grpc_event_engine_endpoint_create(std::move(*ep));
} else {
*endpoint = nullptr;
}
GRPC_EVENT_ENGINE_TRACE("EventEngine::Connect Status: %s",
ep.status().ToString().c_str());
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_connect,
absl_status_to_grpc_error(conn_status));
},
CreateResolvedAddress(*addr), config,
resource_quota != nullptr
? resource_quota->memory_quota()->CreateMemoryOwner(
absl::StrCat("tcp-client:", addr_uri.value()))
: grpc_event_engine::experimental::MemoryAllocator(),
std::max(grpc_core::Duration::Milliseconds(1),
deadline - grpc_core::Timestamp::Now()));
GRPC_EVENT_ENGINE_TRACE("EventEngine::Connect Peer: %s, handle: %" PRId64,
(*addr_uri).c_str(),
static_cast<int64_t>(handle.keys[0]));
return handle.keys[0];
}
bool event_engine_tcp_client_cancel_connect(int64_t connection_handle) {
GRPC_EVENT_ENGINE_TRACE("EventEngine::CancelConnect handle: %" PRId64,
connection_handle);
return GetDefaultEventEngine()->CancelConnect(
{static_cast<intptr_t>(connection_handle), 0});
}
} // namespace experimental
} // namespace grpc_event_engine

@ -0,0 +1,44 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_TCP_CLIENT_H
#define GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_TCP_CLIENT_H
#include <grpc/support/port_platform.h>
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/resolved_address.h"
namespace grpc_event_engine {
namespace experimental {
/// Attempts to use event engine to connect to the specified remote address and
/// invokes the on_connect callback asynchronously upon connection
/// establishment, failure or timeout. It returns a 64 bit connection handle
/// which can later be used to cancel an in progress connection attempt.
int64_t event_engine_tcp_client_connect(grpc_closure* on_connect,
grpc_endpoint** endpoint,
const EndpointConfig& config,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline);
/// Attempts to cancel an in progress connection attempt represented by the
/// passed in connection handle. It returns true if the cancellation attempt
/// succeeded. Otherwise it returns false.
bool event_engine_tcp_client_cancel_connect(int64_t connection_handle);
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_TCP_CLIENT_H

@ -18,6 +18,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
@ -35,9 +36,12 @@
#include <grpc/support/time.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/event_engine/resolved_address_internal.h"
#include "src/core/lib/event_engine/shim.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/event_engine_shims/tcp_client.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/sockaddr.h"
@ -400,6 +404,10 @@ static int64_t tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
const EndpointConfig& config,
const grpc_resolved_address* addr,
grpc_core::Timestamp deadline) {
if (grpc_event_engine::experimental::UseEventEngineClient()) {
return grpc_event_engine::experimental::event_engine_tcp_client_connect(
closure, ep, config, addr, deadline);
}
grpc_resolved_address mapped_addr;
grpc_core::PosixTcpOptions options(TcpOptionsFromEndpointConfig(config));
int fd = -1;
@ -415,6 +423,10 @@ static int64_t tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
}
static bool tcp_cancel_connect(int64_t connection_handle) {
if (grpc_event_engine::experimental::UseEventEngineClient()) {
return grpc_event_engine::experimental::
event_engine_tcp_client_cancel_connect(connection_handle);
}
if (connection_handle <= 0) {
return false;
}

@ -495,6 +495,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.cc',
'src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc',
'src/core/lib/event_engine/resolved_address.cc',
'src/core/lib/event_engine/shim.cc',
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/tcp_socket_utils.cc',
@ -578,6 +579,8 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/ev_poll_posix.cc',
'src/core/lib/iomgr/ev_posix.cc',
'src/core/lib/iomgr/ev_windows.cc',
'src/core/lib/iomgr/event_engine_shims/endpoint.cc',
'src/core/lib/iomgr/event_engine_shims/tcp_client.cc',
'src/core/lib/iomgr/exec_ctx.cc',
'src/core/lib/iomgr/executor.cc',
'src/core/lib/iomgr/fork_posix.cc',

@ -41,6 +41,7 @@
#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/global_config.h"
#include "src/core/lib/gprpp/memory.h"
@ -81,8 +82,8 @@ std::list<Connection> CreateConnectedEndpoints(
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
std::string target_addr = absl::StrCat(
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
EventEngine::ResolvedAddress resolved_addr =
URIToResolvedAddress(target_addr);
auto resolved_addr = URIToResolvedAddress(target_addr);
GPR_ASSERT(resolved_addr.ok());
std::unique_ptr<EventEngine::Endpoint> server_endpoint;
grpc_core::Notification* server_signal = new grpc_core::Notification();
@ -108,12 +109,12 @@ std::list<Connection> CreateConnectedEndpoints(
std::make_unique<grpc_core::MemoryQuota>("foo"));
GPR_ASSERT(listener.ok());
EXPECT_TRUE((*listener)->Bind(resolved_addr).ok());
EXPECT_TRUE((*listener)->Bind(*resolved_addr).ok());
EXPECT_TRUE((*listener)->Start().ok());
// Create client socket and connect to the target address.
for (int i = 0; i < num_connections; ++i) {
int client_fd = ConnectToServerOrDie(resolved_addr);
int client_fd = ConnectToServerOrDie(*resolved_addr);
EventHandle* handle =
poller.CreateHandle(client_fd, "test", poller.CanTrackErrors());
EXPECT_NE(handle, nullptr);

@ -147,10 +147,11 @@ TEST(PosixEventEngineTest, IndefiniteConnectTimeoutOrRstTest) {
std::string target_addr = absl::StrCat(
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
auto resolved_addr = URIToResolvedAddress(target_addr);
GPR_ASSERT(resolved_addr.ok());
std::shared_ptr<EventEngine> posix_ee = std::make_shared<PosixEventEngine>();
std::string resolved_addr_str =
ResolvedAddressToNormalizedString(resolved_addr).value();
auto sockets = CreateConnectedSockets(resolved_addr);
ResolvedAddressToNormalizedString(*resolved_addr).value();
auto sockets = CreateConnectedSockets(*resolved_addr);
grpc_core::Notification signal;
grpc_core::ChannelArgs args;
auto quota = grpc_core::ResourceQuota::Default();
@ -162,8 +163,8 @@ TEST(PosixEventEngineTest, IndefiniteConnectTimeoutOrRstTest) {
EXPECT_EQ(status.status().code(), absl::StatusCode::kUnknown);
signal.Notify();
},
URIToResolvedAddress(target_addr), config,
memory_quota->CreateMemoryAllocator("conn-1"), 3s);
*resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"),
3s);
signal.WaitForNotification();
for (auto sock : sockets) {
close(sock);
@ -175,10 +176,11 @@ TEST(PosixEventEngineTest, IndefiniteConnectCancellationTest) {
std::string target_addr = absl::StrCat(
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
auto resolved_addr = URIToResolvedAddress(target_addr);
GPR_ASSERT(resolved_addr.ok());
std::shared_ptr<EventEngine> posix_ee = std::make_shared<PosixEventEngine>();
std::string resolved_addr_str =
ResolvedAddressToNormalizedString(resolved_addr).value();
auto sockets = CreateConnectedSockets(resolved_addr);
ResolvedAddressToNormalizedString(*resolved_addr).value();
auto sockets = CreateConnectedSockets(*resolved_addr);
grpc_core::ChannelArgs args;
auto quota = grpc_core::ResourceQuota::Default();
args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
@ -189,8 +191,8 @@ TEST(PosixEventEngineTest, IndefiniteConnectCancellationTest) {
FAIL() << "The on_connect callback should not have run since the "
"connection attempt was cancelled.";
},
URIToResolvedAddress(target_addr), config,
memory_quota->CreateMemoryAllocator("conn-2"), 3s);
*resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-2"),
3s);
if (connection_handle.keys[0] > 0) {
ASSERT_TRUE(posix_ee->CancelConnect(connection_handle));
}

@ -39,6 +39,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/resource_quota/memory_quota.h"
@ -83,7 +84,7 @@ TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) {
ASSERT_FALSE(status.ok());
signal.Notify();
},
URIToResolvedAddress(target_addr), config,
*URIToResolvedAddress(target_addr), config,
memory_quota->CreateMemoryAllocator("conn-1"), 24h);
signal.WaitForNotification();
WaitForSingleOwner(std::move(test_ee));
@ -101,6 +102,8 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
std::string target_addr = absl::StrCat(
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
auto resolved_addr = URIToResolvedAddress(target_addr);
GPR_ASSERT(resolved_addr.ok());
std::unique_ptr<EventEngine::Endpoint> client_endpoint;
std::unique_ptr<EventEngine::Endpoint> server_endpoint;
grpc_core::Notification client_signal;
@ -123,7 +126,7 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
[](absl::Status status) { GPR_ASSERT(status.ok()); }, config,
std::make_unique<grpc_core::MemoryQuota>("foo"));
ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok());
ASSERT_TRUE(listener->Bind(*resolved_addr).ok());
ASSERT_TRUE(listener->Start().ok());
test_ee->Connect(
@ -133,8 +136,8 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) {
client_endpoint = std::move(*endpoint);
client_signal.Notify();
},
URIToResolvedAddress(target_addr), config,
memory_quota->CreateMemoryAllocator("conn-1"), 24h);
*resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"),
24h);
client_signal.WaitForNotification();
server_signal.WaitForNotification();
@ -195,7 +198,7 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
for (int i = 0; i < kNumListenerAddresses; i++) {
std::string target_addr = absl::StrCat(
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok());
ASSERT_TRUE(listener->Bind(*URIToResolvedAddress(target_addr)).ok());
target_addrs.push_back(target_addr);
}
ASSERT_TRUE(listener->Start().ok());
@ -217,7 +220,7 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) {
client_endpoint = std::move(*endpoint);
client_signal.Notify();
},
URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]),
*URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]),
client_config,
memory_quota->CreateMemoryAllocator(
absl::StrCat("conn-", std::to_string(i))),

@ -35,12 +35,10 @@
#include <grpc/slice_buffer.h>
#include <grpc/support/log.h>
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/uri/uri_parser.h"
// IWYU pragma: no_include <sys/socket.h>
@ -84,19 +82,6 @@ void WaitForSingleOwner(std::shared_ptr<EventEngine>&& engine) {
}
}
EventEngine::ResolvedAddress URIToResolvedAddress(std::string address_str) {
grpc_resolved_address addr;
absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(address_str);
if (!uri.ok()) {
gpr_log(GPR_ERROR, "Failed to parse. Error: %s",
uri.status().ToString().c_str());
GPR_ASSERT(uri.ok());
}
GPR_ASSERT(grpc_parse_uri(*uri, &addr));
return EventEngine::ResolvedAddress(
reinterpret_cast<const sockaddr*>(addr.addr), addr.len);
}
void AppendStringToSliceBuffer(SliceBuffer* buf, std::string data) {
buf->Append(Slice::FromCopiedString(data));
}
@ -201,7 +186,7 @@ absl::Status ConnectionManager::BindAndStartListener(
std::shared_ptr<EventEngine::Listener> listener((*status).release());
for (auto& addr : addrs) {
auto bind_status = listener->Bind(URIToResolvedAddress(addr));
auto bind_status = listener->Bind(*URIToResolvedAddress(addr));
if (!bind_status.ok()) {
gpr_log(GPR_ERROR, "Binding listener failed: %s",
bind_status.status().ToString().c_str());
@ -239,7 +224,7 @@ ConnectionManager::CreateConnection(std::string target_addr,
last_in_progress_connection_.SetClientEndpoint(std::move(*status));
}
},
URIToResolvedAddress(target_addr), config,
*URIToResolvedAddress(target_addr), config,
memory_quota_->CreateMemoryAllocator(conn_name), timeout);
auto client_endpoint = last_in_progress_connection_.GetClientEndpoint();

@ -43,8 +43,6 @@ void AppendStringToSliceBuffer(SliceBuffer* buf, std::string data);
std::string ExtractSliceBufferIntoString(SliceBuffer* buf);
EventEngine::ResolvedAddress URIToResolvedAddress(std::string address_str);
// Returns a random message with bounded length.
std::string GetNextSendMessage();

@ -35,9 +35,11 @@
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/resource_quota/memory_quota.h"
@ -75,6 +77,8 @@ TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) {
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
std::string target_addr = absl::StrCat(
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
auto resolved_addr = URIToResolvedAddress(target_addr);
GPR_ASSERT(resolved_addr.ok());
std::unique_ptr<EventEngine::Endpoint> client_endpoint;
std::unique_ptr<EventEngine::Endpoint> server_endpoint;
grpc_core::Notification client_signal;
@ -96,7 +100,7 @@ TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) {
std::move(accept_cb), [](absl::Status /*status*/) {}, config,
std::make_unique<grpc_core::MemoryQuota>("foo"));
ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok());
ASSERT_TRUE(listener->Bind(*resolved_addr).ok());
ASSERT_TRUE(listener->Start().ok());
oracle_ee->Connect(
@ -106,8 +110,8 @@ TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) {
client_endpoint = std::move(*endpoint);
client_signal.Notify();
},
URIToResolvedAddress(target_addr), config,
memory_quota->CreateMemoryAllocator("conn-1"), 24h);
*resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"),
24h);
client_signal.WaitForNotification();
server_signal.WaitForNotification();
@ -169,7 +173,7 @@ TEST_F(EventEngineServerTest,
for (int i = 0; i < kNumListenerAddresses; i++) {
std::string target_addr = absl::StrCat(
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok());
ASSERT_TRUE(listener->Bind(*URIToResolvedAddress(target_addr)).ok());
target_addrs.push_back(target_addr);
}
ASSERT_TRUE(listener->Start().ok());
@ -191,7 +195,7 @@ TEST_F(EventEngineServerTest,
client_endpoint = std::move(*endpoint);
client_signal.Notify();
},
URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]),
*URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]),
client_config,
memory_quota->CreateMemoryAllocator(
absl::StrCat("conn-", std::to_string(i))),

@ -61,7 +61,10 @@ grpc_cc_test(
srcs = ["endpoint_pair_test.cc"],
external_deps = ["gtest"],
language = "C++",
tags = ["endpoint_test"],
tags = [
"endpoint_test",
"event_engine_client_test",
],
deps = [
":endpoint_tests",
"//:gpr",
@ -245,6 +248,7 @@ grpc_cc_test(
language = "C++",
tags = [
"endpoint_test",
"event_engine_client_test",
"no_windows",
],
deps = [

@ -18,18 +18,94 @@
#include "src/core/lib/iomgr/endpoint_pair.h"
#include <chrono>
#include <gtest/gtest.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/event_engine/shim.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "test/core/iomgr/endpoint_tests.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
using namespace std::chrono_literals;
namespace {
using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::URIToResolvedAddress;
grpc_endpoint_pair grpc_iomgr_event_engine_shim_endpoint_pair(
grpc_channel_args* c_args) {
grpc_core::ExecCtx ctx;
grpc_endpoint_pair p;
auto ee = grpc_event_engine::experimental::GetDefaultEventEngine();
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
std::string target_addr = absl::StrCat(
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
auto resolved_addr = URIToResolvedAddress(target_addr);
GPR_ASSERT(resolved_addr.ok());
std::unique_ptr<EventEngine::Endpoint> client_endpoint;
std::unique_ptr<EventEngine::Endpoint> server_endpoint;
grpc_core::Notification client_signal;
grpc_core::Notification server_signal;
EventEngine::Listener::AcceptCallback accept_cb =
[&server_endpoint, &server_signal](
std::unique_ptr<EventEngine::Endpoint> ep,
grpc_core::MemoryAllocator /*memory_allocator*/) {
server_endpoint = std::move(ep);
server_signal.Notify();
};
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(c_args);
ChannelArgsEndpointConfig config(args);
auto listener = *ee->CreateListener(
std::move(accept_cb), [](absl::Status /*status*/) {}, config,
std::make_unique<grpc_core::MemoryQuota>("foo"));
GPR_ASSERT(listener->Bind(*resolved_addr).ok());
GPR_ASSERT(listener->Start().ok());
ee->Connect(
[&client_endpoint, &client_signal](
absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> endpoint) {
GPR_ASSERT(endpoint.ok());
client_endpoint = std::move(*endpoint);
client_signal.Notify();
},
*resolved_addr, config, memory_quota->CreateMemoryAllocator("conn-1"),
24h);
client_signal.WaitForNotification();
server_signal.WaitForNotification();
p.client = grpc_event_engine::experimental::grpc_event_engine_endpoint_create(
std::move(client_endpoint));
p.server = grpc_event_engine::experimental::grpc_event_engine_endpoint_create(
std::move(server_endpoint));
return p;
}
} // namespace
static gpr_mu* g_mu;
static grpc_pollset* g_pollset;
@ -44,7 +120,12 @@ static grpc_endpoint_test_fixture create_fixture_endpoint_pair(
a[0].type = GRPC_ARG_INTEGER;
a[0].value.integer = static_cast<int>(slice_size);
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
grpc_endpoint_pair p = grpc_iomgr_create_endpoint_pair("test", &args);
grpc_endpoint_pair p;
if (grpc_event_engine::experimental::UseEventEngineClient()) {
p = grpc_iomgr_event_engine_shim_endpoint_pair(&args);
} else {
p = grpc_iomgr_create_endpoint_pair("test", &args);
}
f.client_ep = p.client;
f.server_ep = p.server;
grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);

@ -30,6 +30,7 @@
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/slice/slice_internal.h"
#include "test/core/util/test_config.h"
@ -129,26 +130,34 @@ static void read_scheduler(void* data, grpc_error_handle /* error */) {
/*urgent=*/false, /*min_progress_size=*/1);
}
static void read_and_write_test_read_handler_read_done(
read_and_write_test_state* state, int read_done_state) {
gpr_log(GPR_DEBUG, "Read handler done");
gpr_mu_lock(g_mu);
state->read_done = read_done_state;
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
gpr_mu_unlock(g_mu);
}
static void read_and_write_test_read_handler(void* data,
grpc_error_handle error) {
struct read_and_write_test_state* state =
static_cast<struct read_and_write_test_state*>(data);
if (!error.ok()) {
read_and_write_test_read_handler_read_done(state, 1);
return;
}
state->bytes_read += count_slices(
state->incoming.slices, state->incoming.count, &state->current_read_data);
if (state->bytes_read == state->target_bytes || !error.ok()) {
gpr_log(GPR_DEBUG, "Read handler done");
gpr_mu_lock(g_mu);
state->read_done = 1 + (error.ok());
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
gpr_mu_unlock(g_mu);
} else if (error.ok()) {
if (state->bytes_read == state->target_bytes) {
read_and_write_test_read_handler_read_done(state, 2);
return;
}
// We perform many reads one after another. If grpc_endpoint_read and the
// read_handler are both run inline, we might end up growing the stack
// beyond the limit. Schedule the read on ExecCtx to avoid this.
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->read_scheduler,
absl::OkStatus());
}
}
static void write_scheduler(void* data, grpc_error_handle /* error */) {

@ -304,6 +304,7 @@ grpc_cc_library(
"//:grpc",
"//:grpc_client_channel",
"//:orphanable",
"//:parse_address",
"//:ref_counted_ptr",
"//:server_address",
"//:uri_parser",

@ -2066,6 +2066,9 @@ src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h \
src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/resolved_address_internal.h \
src/core/lib/event_engine/shim.cc \
src/core/lib/event_engine/shim.h \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/socket_notifier.h \
@ -2233,6 +2236,10 @@ src/core/lib/iomgr/ev_poll_posix.h \
src/core/lib/iomgr/ev_posix.cc \
src/core/lib/iomgr/ev_posix.h \
src/core/lib/iomgr/ev_windows.cc \
src/core/lib/iomgr/event_engine_shims/endpoint.cc \
src/core/lib/iomgr/event_engine_shims/endpoint.h \
src/core/lib/iomgr/event_engine_shims/tcp_client.cc \
src/core/lib/iomgr/event_engine_shims/tcp_client.h \
src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/exec_ctx.h \
src/core/lib/iomgr/executor.cc \

@ -1845,6 +1845,9 @@ src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.cc \
src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h \
src/core/lib/event_engine/resolved_address.cc \
src/core/lib/event_engine/resolved_address_internal.h \
src/core/lib/event_engine/shim.cc \
src/core/lib/event_engine/shim.h \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/socket_notifier.h \
@ -2015,6 +2018,10 @@ src/core/lib/iomgr/ev_poll_posix.h \
src/core/lib/iomgr/ev_posix.cc \
src/core/lib/iomgr/ev_posix.h \
src/core/lib/iomgr/ev_windows.cc \
src/core/lib/iomgr/event_engine_shims/endpoint.cc \
src/core/lib/iomgr/event_engine_shims/endpoint.h \
src/core/lib/iomgr/event_engine_shims/tcp_client.cc \
src/core/lib/iomgr/event_engine_shims/tcp_client.h \
src/core/lib/iomgr/exec_ctx.cc \
src/core/lib/iomgr/exec_ctx.h \
src/core/lib/iomgr/executor.cc \

Loading…
Cancel
Save