[EventEngine] Modify iomgr to allow creation of posix event engine listeners and server side endpoints (#31928)

* [WIP] EventEngine iomgr endpoint shims

* [WIP] EventEngine::Endpoint iomgr shims for the PosixEventEngine

* Util functions to help with posix engine listener implementation

* sanity

* update comments in posix_engine_listener_utils.h

* review comments

* iwyu

* revert prev commit

* iwyu

* update build

* update

* regenerate projects

* regenerate projects

* minor fixes

* update BUILD

* sanity

* update build

* regenerate projects

* fix unused parameter

* sanity

* update

* sanity

* regenerate_projects

* remove unused variable

* start

* update

* regenerate_projects

* sanity

* update

* fixes

* update

* regenerate_projects

* update

* fix sanity and msan failure

* more fixes

* build failure

* update

* fix

* sanity

* fixes

* update

* regenerate projects

* fix sanity

* review comments

* An EventEngine subclass to be implemented by all posix based event engines

* sanity

* comments

* update

* review comments

* re-word

* fix

* update

* review comments

* regenerate projects

* syntax fix

* add lock free event benchmark

* releasable mutex lock

* fix build isue

* update

* start

* regenerate projects

* update

* fix

* windows build

* update

* windows portability issue

* update

* update

* update

* update

* format

* update

* update

* start

* Update tcp server interface to move on_accept_cb to create method

* update

* start

* update

* update

* update

* update

* update

* update

* update

* update

* sanity

* update

* update

* update

* windows build

* fix msan

* fix sanity

* regenerate projects

* update

* iwyu

* Fix resolved address length related bugs in tcp_socket_utils and listener_utils

* iwyu

* cleanup

* cleanup src/core/lib/event_engine/tcp_socket_utils.cc

* iwyu

* fix

* regenerate projects

* fix sanity

* re-write endpoint shim

* more re-write

* cleanup

* update

* regenerate projects

* review comments

* build issue

* more build issue fixes plus adding event_engine_trace

* even more build issue fixes

* iwyu

* add static_cast

* fix sanity

* update

* update

* sanity

* fix

* Fix

* Review comments

* fix

* iwyu

* fix build issue

Co-authored-by: AJ Heller <hork@google.com>
pull/32285/head
Vignesh Babu 2 years ago committed by GitHub
parent 4a64142be5
commit fa5a6c42a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 4
      CMakeLists.txt
  3. 2
      Makefile
  4. 3
      bazel/experiments.bzl
  5. 8
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 3
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 3
      grpc.gyp
  12. 2
      package.xml
  13. 1
      src/core/BUILD
  14. 17
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
  15. 8
      src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
  16. 20
      src/core/lib/event_engine/posix_engine/posix_endpoint.cc
  17. 3
      src/core/lib/event_engine/posix_engine/posix_endpoint.h
  18. 14
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  19. 17
      src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
  20. 3
      src/core/lib/event_engine/posix_engine/posix_engine_listener.h
  21. 12
      src/core/lib/event_engine/shim.cc
  22. 2
      src/core/lib/event_engine/shim.h
  23. 3
      src/core/lib/experiments/experiments.cc
  24. 3
      src/core/lib/experiments/experiments.h
  25. 7
      src/core/lib/experiments/experiments.yaml
  26. 62
      src/core/lib/iomgr/event_engine_shims/closure.cc
  27. 39
      src/core/lib/iomgr/event_engine_shims/closure.h
  28. 57
      src/core/lib/iomgr/event_engine_shims/endpoint.cc
  29. 9
      src/core/lib/iomgr/event_engine_shims/endpoint.h
  30. 5
      src/core/lib/iomgr/tcp_posix.cc
  31. 211
      src/core/lib/iomgr/tcp_server_posix.cc
  32. 10
      src/core/lib/iomgr/tcp_server_utils_posix.h
  33. 10
      src/core/lib/transport/handshaker.cc
  34. 1
      src/python/grpcio/grpc_core_dependencies.py
  35. 1
      test/core/end2end/fixtures/http_proxy_fixture.cc
  36. 6
      test/core/iomgr/BUILD
  37. 52
      test/core/iomgr/tcp_posix_test.cc
  38. 11
      test/core/iomgr/tcp_server_posix_test.cc
  39. 1
      test/core/surface/concurrent_connectivity_test.cc
  40. 8
      test/core/util/test_tcp_server.cc
  41. 2
      tools/doxygen/Doxyfile.c++.internal
  42. 2
      tools/doxygen/Doxyfile.core.internal

@ -1275,6 +1275,7 @@ grpc_cc_library(
# These headers used to be vended by this target, but they have to be
# removed after landing event engine.
[
"//src/core:lib/iomgr/event_engine_shims/closure.cc",
"//src/core:lib/iomgr/event_engine_shims/endpoint.cc",
"//src/core:lib/iomgr/event_engine_shims/tcp_client.cc",
],
@ -1365,6 +1366,7 @@ grpc_cc_library(
# These headers used to be vended by this target, but they have to be
# removed after landing event engine.
[
"//src/core:lib/iomgr/event_engine_shims/closure.h",
"//src/core:lib/iomgr/event_engine_shims/endpoint.h",
"//src/core:lib/iomgr/event_engine_shims/tcp_client.h",
],

4
CMakeLists.txt generated

@ -2228,6 +2228,7 @@ add_library(grpc
src/core/lib/iomgr/ev_poll_posix.cc
src/core/lib/iomgr/ev_posix.cc
src/core/lib/iomgr/ev_windows.cc
src/core/lib/iomgr/event_engine_shims/closure.cc
src/core/lib/iomgr/event_engine_shims/endpoint.cc
src/core/lib/iomgr/event_engine_shims/tcp_client.cc
src/core/lib/iomgr/exec_ctx.cc
@ -2903,6 +2904,7 @@ add_library(grpc_unsecure
src/core/lib/iomgr/ev_poll_posix.cc
src/core/lib/iomgr/ev_posix.cc
src/core/lib/iomgr/ev_windows.cc
src/core/lib/iomgr/event_engine_shims/closure.cc
src/core/lib/iomgr/event_engine_shims/endpoint.cc
src/core/lib/iomgr/event_engine_shims/tcp_client.cc
src/core/lib/iomgr/exec_ctx.cc
@ -4391,6 +4393,7 @@ add_library(grpc_authorization_provider
src/core/lib/iomgr/ev_poll_posix.cc
src/core/lib/iomgr/ev_posix.cc
src/core/lib/iomgr/ev_windows.cc
src/core/lib/iomgr/event_engine_shims/closure.cc
src/core/lib/iomgr/event_engine_shims/endpoint.cc
src/core/lib/iomgr/event_engine_shims/tcp_client.cc
src/core/lib/iomgr/exec_ctx.cc
@ -11457,6 +11460,7 @@ add_executable(frame_test
src/core/lib/iomgr/ev_poll_posix.cc
src/core/lib/iomgr/ev_posix.cc
src/core/lib/iomgr/ev_windows.cc
src/core/lib/iomgr/event_engine_shims/closure.cc
src/core/lib/iomgr/event_engine_shims/endpoint.cc
src/core/lib/iomgr/event_engine_shims/tcp_client.cc
src/core/lib/iomgr/exec_ctx.cc

2
Makefile generated

@ -1485,6 +1485,7 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/ev_poll_posix.cc \
src/core/lib/iomgr/ev_posix.cc \
src/core/lib/iomgr/ev_windows.cc \
src/core/lib/iomgr/event_engine_shims/closure.cc \
src/core/lib/iomgr/event_engine_shims/endpoint.cc \
src/core/lib/iomgr/event_engine_shims/tcp_client.cc \
src/core/lib/iomgr/exec_ctx.cc \
@ -2019,6 +2020,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/ev_poll_posix.cc \
src/core/lib/iomgr/ev_posix.cc \
src/core/lib/iomgr/ev_windows.cc \
src/core/lib/iomgr/event_engine_shims/closure.cc \
src/core/lib/iomgr/event_engine_shims/endpoint.cc \
src/core/lib/iomgr/event_engine_shims/tcp_client.cc \
src/core/lib/iomgr/exec_ctx.cc \

@ -33,6 +33,9 @@ EXPERIMENTS = {
"event_engine_client_test": [
"event_engine_client",
],
"event_engine_listener_test": [
"event_engine_listener",
],
"flow_control_test": [
"peer_state_based_framing",
"tcp_frame_size_tuning",

@ -868,6 +868,7 @@ libs:
- src/core/lib/iomgr/ev_epoll1_linux.h
- src/core/lib/iomgr/ev_poll_posix.h
- src/core/lib/iomgr/ev_posix.h
- src/core/lib/iomgr/event_engine_shims/closure.h
- src/core/lib/iomgr/event_engine_shims/endpoint.h
- src/core/lib/iomgr/event_engine_shims/tcp_client.h
- src/core/lib/iomgr/exec_ctx.h
@ -1619,6 +1620,7 @@ libs:
- src/core/lib/iomgr/ev_poll_posix.cc
- src/core/lib/iomgr/ev_posix.cc
- src/core/lib/iomgr/ev_windows.cc
- src/core/lib/iomgr/event_engine_shims/closure.cc
- src/core/lib/iomgr/event_engine_shims/endpoint.cc
- src/core/lib/iomgr/event_engine_shims/tcp_client.cc
- src/core/lib/iomgr/exec_ctx.cc
@ -2195,6 +2197,7 @@ libs:
- src/core/lib/iomgr/ev_epoll1_linux.h
- src/core/lib/iomgr/ev_poll_posix.h
- src/core/lib/iomgr/ev_posix.h
- src/core/lib/iomgr/event_engine_shims/closure.h
- src/core/lib/iomgr/event_engine_shims/endpoint.h
- src/core/lib/iomgr/event_engine_shims/tcp_client.h
- src/core/lib/iomgr/exec_ctx.h
@ -2560,6 +2563,7 @@ libs:
- src/core/lib/iomgr/ev_poll_posix.cc
- src/core/lib/iomgr/ev_posix.cc
- src/core/lib/iomgr/ev_windows.cc
- src/core/lib/iomgr/event_engine_shims/closure.cc
- src/core/lib/iomgr/event_engine_shims/endpoint.cc
- src/core/lib/iomgr/event_engine_shims/tcp_client.cc
- src/core/lib/iomgr/exec_ctx.cc
@ -3639,6 +3643,7 @@ libs:
- src/core/lib/iomgr/ev_epoll1_linux.h
- src/core/lib/iomgr/ev_poll_posix.h
- src/core/lib/iomgr/ev_posix.h
- src/core/lib/iomgr/event_engine_shims/closure.h
- src/core/lib/iomgr/event_engine_shims/endpoint.h
- src/core/lib/iomgr/event_engine_shims/tcp_client.h
- src/core/lib/iomgr/exec_ctx.h
@ -3883,6 +3888,7 @@ libs:
- src/core/lib/iomgr/ev_poll_posix.cc
- src/core/lib/iomgr/ev_posix.cc
- src/core/lib/iomgr/ev_windows.cc
- src/core/lib/iomgr/event_engine_shims/closure.cc
- src/core/lib/iomgr/event_engine_shims/endpoint.cc
- src/core/lib/iomgr/event_engine_shims/tcp_client.cc
- src/core/lib/iomgr/exec_ctx.cc
@ -7434,6 +7440,7 @@ targets:
- src/core/lib/iomgr/ev_epoll1_linux.h
- src/core/lib/iomgr/ev_poll_posix.h
- src/core/lib/iomgr/ev_posix.h
- src/core/lib/iomgr/event_engine_shims/closure.h
- src/core/lib/iomgr/event_engine_shims/endpoint.h
- src/core/lib/iomgr/event_engine_shims/tcp_client.h
- src/core/lib/iomgr/exec_ctx.h
@ -7660,6 +7667,7 @@ targets:
- src/core/lib/iomgr/ev_poll_posix.cc
- src/core/lib/iomgr/ev_posix.cc
- src/core/lib/iomgr/ev_windows.cc
- src/core/lib/iomgr/event_engine_shims/closure.cc
- src/core/lib/iomgr/event_engine_shims/endpoint.cc
- src/core/lib/iomgr/event_engine_shims/tcp_client.cc
- src/core/lib/iomgr/exec_ctx.cc

1
config.m4 generated

@ -610,6 +610,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/ev_poll_posix.cc \
src/core/lib/iomgr/ev_posix.cc \
src/core/lib/iomgr/ev_windows.cc \
src/core/lib/iomgr/event_engine_shims/closure.cc \
src/core/lib/iomgr/event_engine_shims/endpoint.cc \
src/core/lib/iomgr/event_engine_shims/tcp_client.cc \
src/core/lib/iomgr/exec_ctx.cc \

1
config.w32 generated

@ -576,6 +576,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\iomgr\\ev_poll_posix.cc " +
"src\\core\\lib\\iomgr\\ev_posix.cc " +
"src\\core\\lib\\iomgr\\ev_windows.cc " +
"src\\core\\lib\\iomgr\\event_engine_shims\\closure.cc " +
"src\\core\\lib\\iomgr\\event_engine_shims\\endpoint.cc " +
"src\\core\\lib\\iomgr\\event_engine_shims\\tcp_client.cc " +
"src\\core\\lib\\iomgr\\exec_ctx.cc " +

2
gRPC-C++.podspec generated

@ -843,6 +843,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/ev_epoll1_linux.h',
'src/core/lib/iomgr/ev_poll_posix.h',
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/event_engine_shims/closure.h',
'src/core/lib/iomgr/event_engine_shims/endpoint.h',
'src/core/lib/iomgr/event_engine_shims/tcp_client.h',
'src/core/lib/iomgr/exec_ctx.h',
@ -1769,6 +1770,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/ev_epoll1_linux.h',
'src/core/lib/iomgr/ev_poll_posix.h',
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/event_engine_shims/closure.h',
'src/core/lib/iomgr/event_engine_shims/endpoint.h',
'src/core/lib/iomgr/event_engine_shims/tcp_client.h',
'src/core/lib/iomgr/exec_ctx.h',

3
gRPC-Core.podspec generated

@ -1343,6 +1343,8 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/ev_posix.cc',
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/ev_windows.cc',
'src/core/lib/iomgr/event_engine_shims/closure.cc',
'src/core/lib/iomgr/event_engine_shims/closure.h',
'src/core/lib/iomgr/event_engine_shims/endpoint.cc',
'src/core/lib/iomgr/event_engine_shims/endpoint.h',
'src/core/lib/iomgr/event_engine_shims/tcp_client.cc',
@ -2450,6 +2452,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/ev_epoll1_linux.h',
'src/core/lib/iomgr/ev_poll_posix.h',
'src/core/lib/iomgr/ev_posix.h',
'src/core/lib/iomgr/event_engine_shims/closure.h',
'src/core/lib/iomgr/event_engine_shims/endpoint.h',
'src/core/lib/iomgr/event_engine_shims/tcp_client.h',
'src/core/lib/iomgr/exec_ctx.h',

2
grpc.gemspec generated

@ -1254,6 +1254,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/ev_posix.cc )
s.files += %w( src/core/lib/iomgr/ev_posix.h )
s.files += %w( src/core/lib/iomgr/ev_windows.cc )
s.files += %w( src/core/lib/iomgr/event_engine_shims/closure.cc )
s.files += %w( src/core/lib/iomgr/event_engine_shims/closure.h )
s.files += %w( src/core/lib/iomgr/event_engine_shims/endpoint.cc )
s.files += %w( src/core/lib/iomgr/event_engine_shims/endpoint.h )
s.files += %w( src/core/lib/iomgr/event_engine_shims/tcp_client.cc )

3
grpc.gyp generated

@ -898,6 +898,7 @@
'src/core/lib/iomgr/ev_poll_posix.cc',
'src/core/lib/iomgr/ev_posix.cc',
'src/core/lib/iomgr/ev_windows.cc',
'src/core/lib/iomgr/event_engine_shims/closure.cc',
'src/core/lib/iomgr/event_engine_shims/endpoint.cc',
'src/core/lib/iomgr/event_engine_shims/tcp_client.cc',
'src/core/lib/iomgr/exec_ctx.cc',
@ -1374,6 +1375,7 @@
'src/core/lib/iomgr/ev_poll_posix.cc',
'src/core/lib/iomgr/ev_posix.cc',
'src/core/lib/iomgr/ev_windows.cc',
'src/core/lib/iomgr/event_engine_shims/closure.cc',
'src/core/lib/iomgr/event_engine_shims/endpoint.cc',
'src/core/lib/iomgr/event_engine_shims/tcp_client.cc',
'src/core/lib/iomgr/exec_ctx.cc',
@ -1873,6 +1875,7 @@
'src/core/lib/iomgr/ev_poll_posix.cc',
'src/core/lib/iomgr/ev_posix.cc',
'src/core/lib/iomgr/ev_windows.cc',
'src/core/lib/iomgr/event_engine_shims/closure.cc',
'src/core/lib/iomgr/event_engine_shims/endpoint.cc',
'src/core/lib/iomgr/event_engine_shims/tcp_client.cc',
'src/core/lib/iomgr/exec_ctx.cc',

2
package.xml generated

@ -1236,6 +1236,8 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_posix.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/ev_windows.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/event_engine_shims/closure.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/event_engine_shims/closure.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/event_engine_shims/endpoint.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/event_engine_shims/endpoint.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/event_engine_shims/tcp_client.cc" role="src" />

@ -1741,6 +1741,7 @@ grpc_cc_library(
"status_helper",
"strerror",
"time",
"//:debug_location",
"//:event_engine_base_hdrs",
"//:gpr",
"//:grpc_public_hdrs",

@ -285,7 +285,9 @@ void Epoll1EventHandle::OrphanHandle(PosixEngineClosure* on_done,
int* release_fd,
absl::string_view reason) {
bool is_release_fd = (release_fd != nullptr);
bool was_shutdown = false;
if (!read_closure_->IsShutdown()) {
was_shutdown = true;
HandleShutdownInternal(absl::Status(absl::StatusCode::kUnknown, reason),
is_release_fd);
}
@ -293,8 +295,17 @@ void Epoll1EventHandle::OrphanHandle(PosixEngineClosure* on_done,
// If release_fd is not NULL, we should be relinquishing control of the file
// descriptor fd->fd (but we still own the grpc_fd structure).
if (is_release_fd) {
if (!was_shutdown) {
epoll_event phony_event;
if (epoll_ctl(poller_->g_epoll_set_.epfd, EPOLL_CTL_DEL, fd_,
&phony_event) != 0) {
gpr_log(GPR_ERROR, "OrphanHandle: epoll_ctl failed: %s",
grpc_core::StrError(errno).c_str());
}
}
*release_fd = fd_;
} else {
shutdown(fd_, SHUT_RDWR);
close(fd_);
}
@ -328,13 +339,11 @@ void Epoll1EventHandle::HandleShutdownInternal(absl::Status why,
grpc_core::StatusSetInt(&why, grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
if (read_closure_->SetShutdown(why)) {
if (!releasing_fd) {
shutdown(fd_, SHUT_RDWR);
} else {
if (releasing_fd) {
epoll_event phony_event;
if (epoll_ctl(poller_->g_epoll_set_.epfd, EPOLL_CTL_DEL, fd_,
&phony_event) != 0) {
gpr_log(GPR_ERROR, "epoll_ctl failed: %s",
gpr_log(GPR_ERROR, "HandleShutdownInternal: epoll_ctl failed: %s",
grpc_core::StrError(errno).c_str());
}
}

@ -378,13 +378,13 @@ void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
grpc_core::StatusSetInt(&shutdown_error_,
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
SetReadyLocked(&read_closure_);
SetReadyLocked(&write_closure_);
}
// signal read/write closed to OS so that future operations fail.
if (!released_) {
shutdown(fd_, SHUT_RDWR);
}
SetReadyLocked(&read_closure_);
SetReadyLocked(&write_closure_);
}
if (!IsWatched()) {
CloseFd();
} else {
@ -455,8 +455,6 @@ void PollEventHandle::ShutdownHandle(absl::Status why) {
grpc_core::StatusSetInt(&shutdown_error_,
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
// signal read/write closed to OS so that future operations fail.
shutdown(fd_, SHUT_RDWR);
SetReadyLocked(&read_closure_);
SetReadyLocked(&write_closure_);
}

@ -44,6 +44,7 @@
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/load_file.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/status_helper.h"
@ -465,9 +466,11 @@ void PosixEndpointImpl::MaybePostReclaimer() {
has_posted_reclaimer_ = true;
memory_owner_.PostReclaimer(
grpc_core::ReclamationPass::kBenign,
[this](absl::optional<grpc_core::ReclamationSweep> sweep) {
if (!sweep.has_value()) return;
PerformReclamation();
[self = Ref(DEBUG_LOCATION, "Posix Reclaimer")](
absl::optional<grpc_core::ReclamationSweep> sweep) {
if (sweep.has_value()) {
self->PerformReclamation();
}
});
}
}
@ -548,7 +551,7 @@ void PosixEndpointImpl::MaybeMakeReadSlices() {
void PosixEndpointImpl::HandleRead(absl::Status status) {
read_mu_.Lock();
if (status.ok()) {
if (status.ok() && memory_owner_.is_valid()) {
MaybeMakeReadSlices();
if (!TcpDoRead(status)) {
UpdateRcvLowat();
@ -558,6 +561,9 @@ void PosixEndpointImpl::HandleRead(absl::Status status) {
return;
}
} else {
if (!memory_owner_.is_valid()) {
status = absl::UnknownError("Shutting down endpoint");
}
incoming_buffer_->Clear();
last_read_buffer_.Clear();
}
@ -1158,6 +1164,9 @@ void PosixEndpointImpl::MaybeShutdown(
grpc_core::StatusSetInt(&why, grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
handle_->ShutdownHandle(why);
read_mu_.Lock();
memory_owner_.Reset();
read_mu_.Unlock();
Unref();
}
@ -1189,7 +1198,8 @@ PosixEndpointImpl::PosixEndpointImpl(EventHandle* handle,
fd_ = handle_->WrappedFd();
GPR_ASSERT(options.resource_quota != nullptr);
auto peer_addr_string = sock.PeerAddressString();
memory_owner_ = options.resource_quota->memory_quota()->CreateMemoryOwner(
mem_quota_ = options.resource_quota->memory_quota();
memory_owner_ = mem_quota_->CreateMemoryOwner(
peer_addr_string.ok() ? *peer_addr_string : "");
self_reservation_ = memory_owner_.MakeReservation(sizeof(PosixEndpointImpl));
auto local_address = sock.LocalAddress();

@ -564,6 +564,9 @@ class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> {
grpc_event_engine::experimental::EventEngine::ResolvedAddress peer_address_;
grpc_event_engine::experimental::EventEngine::ResolvedAddress local_address_;
// Maintain a shared_ptr to mem_quota_ to ensure the underlying basic memory
// quota is not deleted until the endpoint is destroyed.
grpc_core::MemoryQuotaRefPtr mem_quota_;
grpc_core::MemoryOwner memory_owner_;
grpc_core::MemoryAllocator::Reservation self_reservation_;

@ -66,6 +66,10 @@ using namespace std::chrono_literals;
namespace grpc_event_engine {
namespace experimental {
bool NeedPosixEngine() {
return UseEventEngineClient() || UseEventEngineListener();
}
#ifdef GRPC_POSIX_SOCKET_TCP
void AsyncConnect::Start(EventEngine::Duration timeout) {
@ -332,7 +336,7 @@ PosixEventEngine::PosixEventEngine(PosixEventPoller* poller)
: connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
executor_(std::make_shared<ThreadPool>()),
timer_manager_(executor_) {
if (UseEventEngineClient()) {
if (NeedPosixEngine()) {
poller_manager_ = std::make_shared<PosixEnginePollerManager>(poller);
}
}
@ -341,7 +345,7 @@ PosixEventEngine::PosixEventEngine()
: connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
executor_(std::make_shared<ThreadPool>()),
timer_manager_(executor_) {
if (UseEventEngineClient()) {
if (NeedPosixEngine()) {
poller_manager_ = std::make_shared<PosixEnginePollerManager>(executor_);
if (poller_manager_->Poller() != nullptr) {
executor_->Run([poller_manager = poller_manager_]() {
@ -539,7 +543,7 @@ EventEngine::ConnectionHandle PosixEventEngine::Connect(
OnConnectCallback on_connect, const ResolvedAddress& addr,
const EndpointConfig& args, MemoryAllocator memory_allocator,
Duration timeout) {
if (!UseEventEngineClient()) {
if (!NeedPosixEngine()) {
grpc_core::Crash("unimplemented");
}
#ifdef GRPC_POSIX_SOCKET_TCP
@ -564,7 +568,7 @@ std::unique_ptr<PosixEndpointWithFdSupport>
PosixEventEngine::CreatePosixEndpointFromFd(int fd,
const EndpointConfig& config,
MemoryAllocator memory_allocator) {
if (!UseEventEngineClient()) {
if (!NeedPosixEngine()) {
grpc_core::Crash("unimplemented");
}
#ifdef GRPC_POSIX_SOCKET_TCP
@ -613,7 +617,7 @@ PosixEventEngine::CreatePosixListener(
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
if (!UseEventEngineClient()) {
if (!NeedPosixEngine()) {
grpc_core::Crash("unimplemented");
}
#ifdef GRPC_POSIX_SOCKET_TCP

@ -160,6 +160,7 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
Unref();
return;
}
addr = EventEngine::ResolvedAddress(addr.address(), len);
}
PosixSocketWrapper sock(fd);
@ -176,14 +177,22 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
}
// Create an Endpoint here.
std::string peer_name = *ResolvedAddressToNormalizedString(addr);
auto peer_name = ResolvedAddressToURI(addr);
if (!peer_name.ok()) {
gpr_log(GPR_ERROR, "Invalid address: %s",
peer_name.status().ToString().c_str());
// Shutting down the acceptor. Unref the ref grabbed in
// AsyncConnectionAcceptor::Start().
Unref();
return;
}
auto endpoint = CreatePosixEndpoint(
/*handle=*/listener_->poller_->CreateHandle(
fd, peer_name, listener_->poller_->CanTrackErrors()),
fd, *peer_name, listener_->poller_->CanTrackErrors()),
/*on_shutdown=*/nullptr, /*engine=*/listener_->engine_,
// allocator=
listener_->memory_allocator_factory_->CreateMemoryAllocator(
absl::StrCat("endpoint-tcp-server-connection: ", peer_name)),
absl::StrCat("endpoint-tcp-server-connection: ", *peer_name)),
/*options=*/listener_->options_);
// Call on_accept_ and then resume accepting new connections by continuing
// the parent for-loop.
@ -192,7 +201,7 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
/*is_external=*/false,
/*memory_allocator=*/
listener_->memory_allocator_factory_->CreateMemoryAllocator(
absl::StrCat("on-accept-tcp-server-connection: ", peer_name)),
absl::StrCat("on-accept-tcp-server-connection: ", *peer_name)),
/*pending_data=*/nullptr);
}
GPR_UNREACHABLE_CODE(return);

@ -238,8 +238,7 @@ class PosixEngineListener : public PosixListenerWithFdSupport {
#include "src/core/lib/gprpp/crash.h"
class PosixEngineListener
: public grpc_event_engine::experimental::EventEngine::Listener {
class PosixEngineListener : public PosixListenerWithFdSupport {
public:
PosixEngineListener() = default;
~PosixEngineListener() override = default;

@ -23,7 +23,7 @@ namespace grpc_event_engine {
namespace experimental {
bool UseEventEngineClient() {
// TODO(hork, eryu): Adjust the ifdefs accordingly when event engine's become
// TODO(hork, eryu): Adjust the ifdefs accordingly when event engines become
// available for other platforms.
#ifdef GRPC_POSIX_SOCKET_TCP
return grpc_core::IsEventEngineClientEnabled();
@ -32,5 +32,15 @@ bool UseEventEngineClient() {
#endif
}
bool UseEventEngineListener() {
// TODO(hork, eryu): Adjust the ifdefs accordingly when event engines become
// available for other platforms.
#ifdef GRPC_POSIX_SOCKET_TCP
return grpc_core::IsEventEngineListenerEnabled();
#else
return false;
#endif
}
} // namespace experimental
} // namespace grpc_event_engine

@ -21,6 +21,8 @@ namespace experimental {
bool UseEventEngineClient();
bool UseEventEngineListener();
} // namespace experimental
} // namespace grpc_event_engine

@ -51,6 +51,8 @@ const char* const description_free_large_allocator =
const char* const description_transport_supplies_client_latency =
"If set, use the transport represented value for client latency in "
"opencensus";
const char* const description_event_engine_listener =
"Use EventEngine listeners instead of iomgr's grpc_tcp_server";
} // namespace
namespace grpc_core {
@ -71,6 +73,7 @@ const ExperimentMetadata g_experiment_metadata[] = {
{"free_large_allocator", description_free_large_allocator, false},
{"transport_supplies_client_latency",
description_transport_supplies_client_latency, false},
{"event_engine_listener", description_event_engine_listener, false},
};
} // namespace grpc_core

@ -43,6 +43,7 @@ inline bool IsFreeLargeAllocatorEnabled() { return IsExperimentEnabled(10); }
inline bool IsTransportSuppliesClientLatencyEnabled() {
return IsExperimentEnabled(11);
}
inline bool IsEventEngineListenerEnabled() { return IsExperimentEnabled(12); }
struct ExperimentMetadata {
const char* name;
@ -50,7 +51,7 @@ struct ExperimentMetadata {
bool default_value;
};
constexpr const size_t kNumExperiments = 12;
constexpr const size_t kNumExperiments = 13;
extern const ExperimentMetadata g_experiment_metadata[kNumExperiments];
} // namespace grpc_core

@ -126,3 +126,10 @@
expiry: 2023/06/01
owner: ctiller@google.com
test_tags: [census_test]
- name: event_engine_listener
description:
Use EventEngine listeners instead of iomgr's grpc_tcp_server
default: false
expiry: 2023/02/13
owner: vigneshbabu@google.com
test_tags: ["event_engine_listener_test"]

@ -0,0 +1,62 @@
// Copyright 2021 gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/event_engine_shims/closure.h"
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/transport/error_utils.h"
namespace grpc_event_engine {
namespace experimental {
void RunEventEngineClosure(grpc_closure* closure, grpc_error_handle error) {
if (closure == nullptr) {
return;
}
grpc_core::ApplicationCallbackExecCtx app_ctx;
grpc_core::ExecCtx exec_ctx;
#ifndef NDEBUG
closure->scheduled = false;
if (grpc_trace_closure.enabled()) {
gpr_log(GPR_DEBUG,
"EventEngine: running closure %p: created [%s:%d]: %s [%s:%d]",
closure, closure->file_created, closure->line_created,
closure->run ? "run" : "scheduled", closure->file_initiated,
closure->line_initiated);
}
#endif
closure->cb(closure->cb_arg, error);
#ifndef NDEBUG
if (grpc_trace_closure.enabled()) {
gpr_log(GPR_DEBUG, "EventEngine: closure %p finished", closure);
}
#endif
}
absl::AnyInvocable<void(absl::Status)> GrpcClosureToStatusCallback(
grpc_closure* closure) {
return [closure](absl::Status status) {
RunEventEngineClosure(closure, absl_status_to_grpc_error(status));
};
}
} // namespace experimental
} // namespace grpc_event_engine

@ -0,0 +1,39 @@
// Copyright 2022 gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_CLOSURE_H
#define GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_CLOSURE_H
#include <grpc/support/port_platform.h>
#include "absl/functional/any_invocable.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
namespace grpc_event_engine {
namespace experimental {
/// Runs a grpc_closure inline with the specified error handle.
void RunEventEngineClosure(grpc_closure* closure, grpc_error_handle error);
/// Creates a callback that takes an error status argument.
absl::AnyInvocable<void(absl::Status)> GrpcClosureToStatusCallback(
grpc_closure* closure);
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_SRC_CORE_LIB_IOMGR_EVENT_ENGINE_SHIMS_CLOSURE_H

@ -36,6 +36,7 @@
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/event_engine_shims/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/slice/slice_string_helpers.h"
@ -45,7 +46,6 @@ extern grpc_core::TraceFlag grpc_tcp_trace;
namespace grpc_event_engine {
namespace experimental {
namespace {
constexpr int64_t kShutdownBit = static_cast<int64_t>(1) << 32;
@ -128,6 +128,12 @@ class EventEngineEndpointWrapper {
void ShutdownUnref() {
if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
kShutdownBit + 1) {
#ifdef GRPC_POSIX_SOCKET_TCP
if (fd_ > 0 && on_release_fd_) {
reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
->Shutdown(std::move(on_release_fd_));
}
#endif // GRPC_POSIX_SOCKET_TCP
OnShutdownInternal();
}
}
@ -136,7 +142,13 @@ class EventEngineEndpointWrapper {
// and decrements the shutdown ref. If trigger shutdown has been called
// before or in parallel, only one of them would win the race. The other
// invocation would simply return.
void TriggerShutdown() {
void TriggerShutdown(
absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd) {
#ifdef GRPC_POSIX_SOCKET_TCP
on_release_fd_ = std::move(on_release_fd);
#else
(void)on_release_fd;
#endif // GRPC_POSIX_SOCKET_TCP
int64_t curr = shutdown_ref_.load(std::memory_order_acquire);
while (true) {
if (curr & kShutdownBit) {
@ -148,6 +160,12 @@ class EventEngineEndpointWrapper {
Ref();
if (shutdown_ref_.fetch_sub(1, std::memory_order_acq_rel) ==
kShutdownBit + 1) {
#ifdef GRPC_POSIX_SOCKET_TCP
if (fd_ > 0 && on_release_fd_) {
reinterpret_cast<PosixEndpointWithFdSupport*>(endpoint_.get())
->Shutdown(std::move(on_release_fd_));
}
#endif // GRPC_POSIX_SOCKET_TCP
OnShutdownInternal();
}
return;
@ -171,6 +189,9 @@ class EventEngineEndpointWrapper {
std::unique_ptr<grpc_event_engine_endpoint> eeep_;
std::atomic<int64_t> refs_{1};
std::atomic<int64_t> shutdown_ref_{1};
#ifdef GRPC_POSIX_SOCKET_TCP
absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd_;
#endif // GRPC_POSIX_SOCKET_TCP
grpc_core::Mutex mu_;
std::string peer_address_;
std::string local_address_;
@ -304,7 +325,7 @@ void EndpointShutdown(grpc_endpoint* ep, grpc_error_handle why) {
}
GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Shutdown:%s", eeep->wrapper,
why.ToString().c_str());
eeep->wrapper->TriggerShutdown();
eeep->wrapper->TriggerShutdown(nullptr);
}
// Attempts to free the underlying data structures.
@ -312,8 +333,8 @@ void EndpointDestroy(grpc_endpoint* ep) {
auto* eeep =
reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
ep);
eeep->wrapper->Unref();
GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Destroy", eeep->wrapper);
eeep->wrapper->Unref();
}
absl::string_view EndpointGetPeerAddress(grpc_endpoint* ep) {
@ -384,5 +405,33 @@ grpc_endpoint* grpc_event_engine_endpoint_create(
return wrapper->GetGrpcEndpoint();
}
bool grpc_is_event_engine_endpoint(grpc_endpoint* ep) {
return ep->vtable == &grpc_event_engine_endpoint_vtable;
}
void grpc_event_engine_endpoint_destroy_and_release_fd(
grpc_endpoint* ep, int* fd, grpc_closure* on_release_fd) {
auto* eeep =
reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
ep);
if (fd == nullptr || on_release_fd == nullptr) {
if (fd != nullptr) {
*fd = -1;
}
eeep->wrapper->TriggerShutdown(nullptr);
} else {
*fd = -1;
eeep->wrapper->TriggerShutdown(
[fd, on_release_fd](absl::StatusOr<int> release_fd) {
if (release_fd.ok()) {
*fd = *release_fd;
}
RunEventEngineClosure(on_release_fd,
absl_status_to_grpc_error(release_fd.status()));
});
}
eeep->wrapper->Unref();
}
} // namespace experimental
} // namespace grpc_event_engine

@ -28,6 +28,15 @@ namespace experimental {
grpc_endpoint* grpc_event_engine_endpoint_create(
std::unique_ptr<EventEngine::Endpoint> ee_endpoint);
/// Returns true if the passed endpoint is an event engine shim endpoint.
bool grpc_is_event_engine_endpoint(grpc_endpoint* ep);
/// Destroys the passed in event engine shim endpoint and schedules the
/// asynchronous execution of the on_release_fd callback. The int pointer fd is
/// set to the underlying endpoint's file descriptor.
void grpc_event_engine_endpoint_destroy_and_release_fd(
grpc_endpoint* ep, int* fd, grpc_closure* on_release_fd);
} // namespace experimental
} // namespace grpc_event_engine

@ -61,6 +61,7 @@
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
@ -2016,6 +2017,10 @@ int grpc_tcp_fd(grpc_endpoint* ep) {
void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
grpc_closure* done) {
if (grpc_event_engine::experimental::grpc_is_event_engine_endpoint(ep)) {
return grpc_event_engine::experimental::
grpc_event_engine_endpoint_destroy_and_release_fd(ep, fd, done);
}
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
GPR_ASSERT(ep->vtable == &vtable);
tcp->release_fd = fd;

@ -43,6 +43,7 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include <grpc/byte_buffer.h>
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -50,10 +51,16 @@
#include <grpc/support/time.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/event_engine/resolved_address_internal.h"
#include "src/core/lib/event_engine/shim.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/strerror.h"
#include "src/core/lib/iomgr/event_engine_shims/closure.h"
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
@ -64,10 +71,127 @@
#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/transport/error_utils.h"
static std::atomic<int64_t> num_dropped_connections{0};
using ::grpc_event_engine::experimental::EndpointConfig;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::MemoryAllocator;
using ::grpc_event_engine::experimental::PosixEventEngineWithFdSupport;
using ::grpc_event_engine::experimental::SliceBuffer;
class MemoryAllocatorFactoryWrapper
: public grpc_event_engine::experimental::MemoryAllocatorFactory {
public:
explicit MemoryAllocatorFactoryWrapper(
grpc_core::MemoryQuotaRefPtr memory_quota)
: memory_quota_(std::move(memory_quota)) {}
MemoryAllocator CreateMemoryAllocator(absl::string_view name) override {
return memory_quota_->CreateMemoryAllocator(name);
}
private:
grpc_core::MemoryQuotaRefPtr memory_quota_;
};
static grpc_error_handle CreateEventEngineListener(
grpc_tcp_server* s, grpc_closure* shutdown_complete,
const EndpointConfig& config, grpc_tcp_server** server) {
PosixEventEngineWithFdSupport::PosixAcceptCallback accept_cb =
[s](int listener_fd, std::unique_ptr<EventEngine::Endpoint> ep,
bool is_external, MemoryAllocator /*allocator*/,
SliceBuffer* pending_data) {
grpc_core::ApplicationCallbackExecCtx app_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_tcp_server_acceptor* acceptor =
static_cast<grpc_tcp_server_acceptor*>(
gpr_malloc(sizeof(*acceptor)));
acceptor->from_server = s;
acceptor->port_index = -1;
acceptor->fd_index = -1;
if (!is_external) {
auto it = s->listen_fd_to_index_map.find(listener_fd);
if (it != s->listen_fd_to_index_map.end()) {
acceptor->port_index = std::get<0>(it->second);
acceptor->fd_index = std::get<1>(it->second);
}
} else {
// External connection handling.
grpc_resolved_address addr;
memset(&addr, 0, sizeof(addr));
addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
// Get the fd of the socket connected to peer.
int fd =
reinterpret_cast<
grpc_event_engine::experimental::PosixEndpointWithFdSupport*>(
ep.get())
->GetWrappedFd();
if (getpeername(fd, reinterpret_cast<struct sockaddr*>(addr.addr),
&(addr.len)) < 0) {
gpr_log(GPR_ERROR, "Failed getpeername: %s",
grpc_core::StrError(errno).c_str());
close(fd);
return;
}
(void)grpc_set_socket_no_sigpipe_if_possible(fd);
auto addr_uri = grpc_sockaddr_to_uri(&addr);
if (!addr_uri.ok()) {
gpr_log(GPR_ERROR, "Invalid address: %s",
addr_uri.status().ToString().c_str());
return;
}
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO,
"SERVER_CONNECT: incoming external connection: %s",
addr_uri->c_str());
}
}
grpc_pollset* read_notifier_pollset =
(*(s->pollsets))[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
&s->next_pollset_to_assign, 1)) %
s->pollsets->size()];
acceptor->external_connection = is_external;
acceptor->listener_fd = listener_fd;
grpc_byte_buffer* buf = nullptr;
if (pending_data != nullptr && pending_data->Length() > 0) {
buf = grpc_raw_byte_buffer_create(nullptr, 0);
grpc_slice_buffer_swap(&buf->data.raw.slice_buffer,
pending_data->c_slice_buffer());
pending_data->Clear();
}
acceptor->pending_data = buf;
s->on_accept_cb(
s->on_accept_cb_arg,
grpc_event_engine::experimental::grpc_event_engine_endpoint_create(
std::move(ep)),
read_notifier_pollset, acceptor);
};
auto on_shutdown_complete_cb =
grpc_event_engine::experimental::GrpcClosureToStatusCallback(
shutdown_complete);
auto listener =
reinterpret_cast<PosixEventEngineWithFdSupport*>(
grpc_event_engine::experimental::GetDefaultEventEngine().get())
->CreatePosixListener(
std::move(accept_cb),
[s, shutdown_complete](absl::Status status) {
grpc_event_engine::experimental::RunEventEngineClosure(
shutdown_complete, absl_status_to_grpc_error(status));
delete s->fd_handler;
delete s;
},
config,
std::make_unique<MemoryAllocatorFactoryWrapper>(s->memory_quota));
if (!listener.ok()) {
delete s;
*server = nullptr;
return listener.status();
}
s->ee_listener = std::move(*listener);
return absl::OkStatus();
}
static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
const EndpointConfig& config,
@ -92,7 +216,11 @@ static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
s->shutdown = false;
s->shutdown_starting.head = nullptr;
s->shutdown_starting.tail = nullptr;
if (!grpc_event_engine::experimental::UseEventEngineListener()) {
s->shutdown_complete = shutdown_complete;
} else {
s->shutdown_complete = nullptr;
}
s->on_accept_cb = on_accept_cb;
s->on_accept_cb_arg = on_accept_cb_arg;
s->head = nullptr;
@ -105,7 +233,13 @@ static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
s->memory_quota = s->options.resource_quota->memory_quota();
s->pre_allocated_fd = -1;
gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
s->n_bind_ports = 0;
new (&s->listen_fd_to_index_map)
absl::flat_hash_map<int, std::tuple<int, int>>();
*server = s;
if (grpc_event_engine::experimental::UseEventEngineListener()) {
return CreateEventEngineListener(s, shutdown_complete, config, server);
}
return absl::OkStatus();
}
@ -123,9 +257,15 @@ static void finish_shutdown(grpc_tcp_server* s) {
s->head = sp->next;
gpr_free(sp);
}
if (grpc_event_engine::experimental::UseEventEngineListener()) {
// This will trigger asynchronous execution of the on_shutdown_complete
// callback when appropriate. That callback will delete the server
s->ee_listener.reset();
} else {
delete s->fd_handler;
delete s;
}
}
static void destroyed_port(void* server, grpc_error_handle /*error*/) {
grpc_tcp_server* s = static_cast<grpc_tcp_server*>(server);
@ -423,6 +563,30 @@ static grpc_error_handle clone_port(grpc_tcp_listener* listener,
static grpc_error_handle tcp_server_add_port(grpc_tcp_server* s,
const grpc_resolved_address* addr,
int* out_port) {
if (grpc_event_engine::experimental::UseEventEngineListener()) {
gpr_mu_lock(&s->mu);
if (s->shutdown_listeners) {
gpr_mu_unlock(&s->mu);
return absl::UnknownError("Server already shutdown");
}
int fd_index = 0;
auto port = s->ee_listener->BindWithFd(
grpc_event_engine::experimental::CreateResolvedAddress(*addr),
[s, &fd_index](absl::StatusOr<int> listen_fd) {
if (!listen_fd.ok()) {
return;
}
GPR_DEBUG_ASSERT(*listen_fd > 0);
s->listen_fd_to_index_map.insert_or_assign(
*listen_fd, std::make_tuple(s->n_bind_ports, fd_index++));
});
if (port.ok()) {
s->n_bind_ports++;
*out_port = *port;
}
gpr_mu_unlock(&s->mu);
return port.status();
}
GPR_ASSERT(addr->len <= GRPC_MAX_SOCKADDR_SIZE);
grpc_tcp_listener* sp;
grpc_resolved_address sockname_temp;
@ -500,6 +664,17 @@ static grpc_tcp_listener* get_port_index(grpc_tcp_server* s,
unsigned tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index) {
unsigned num_fds = 0;
gpr_mu_lock(&s->mu);
if (grpc_event_engine::experimental::UseEventEngineListener()) {
// This doesn't need to be very fast. Used in tests.
for (auto it = s->listen_fd_to_index_map.begin();
it != s->listen_fd_to_index_map.end(); it++) {
if (std::get<0>(it->second) == static_cast<int>(port_index)) {
num_fds++;
}
}
gpr_mu_unlock(&s->mu);
return num_fds;
}
grpc_tcp_listener* sp = get_port_index(s, port_index);
for (; sp; sp = sp->sibling) {
++num_fds;
@ -511,6 +686,19 @@ unsigned tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index) {
static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
unsigned fd_index) {
gpr_mu_lock(&s->mu);
if (grpc_event_engine::experimental::UseEventEngineListener()) {
// This doesn't need to be very fast. Used in tests.
for (auto it = s->listen_fd_to_index_map.begin();
it != s->listen_fd_to_index_map.end(); it++) {
if (std::get<0>(it->second) == static_cast<int>(port_index) &&
std::get<1>(it->second) == static_cast<int>(fd_index)) {
gpr_mu_unlock(&s->mu);
return it->first;
}
}
gpr_mu_unlock(&s->mu);
return -1;
}
grpc_tcp_listener* sp = get_port_index(s, port_index);
for (; sp; sp = sp->sibling, --fd_index) {
if (fd_index == 0) {
@ -530,6 +718,12 @@ static void tcp_server_start(grpc_tcp_server* s,
GPR_ASSERT(s->on_accept_cb);
GPR_ASSERT(s->active_ports == 0);
s->pollsets = pollsets;
if (grpc_event_engine::experimental::UseEventEngineListener()) {
GPR_ASSERT(!s->shutdown_listeners);
GPR_ASSERT(GRPC_LOG_IF_ERROR("listener_start", s->ee_listener->Start()));
gpr_mu_unlock(&s->mu);
return;
}
sp = s->head;
while (sp != nullptr) {
if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr) &&
@ -584,7 +778,10 @@ static void tcp_server_unref(grpc_tcp_server* s) {
static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
gpr_mu_lock(&s->mu);
s->shutdown_listeners = true;
// shutdown all fd's
if (grpc_event_engine::experimental::UseEventEngineListener()) {
s->ee_listener->ShutdownListeningFds();
}
/* shutdown all fd's */
if (s->active_ports) {
grpc_tcp_listener* sp;
for (sp = s->head; sp; sp = sp->next) {
@ -611,6 +808,18 @@ class ExternalConnectionHandler : public grpc_core::TcpServerFdHandler {
// TODO(yangg) resolve duplicate code with on_read
void Handle(int listener_fd, int fd, grpc_byte_buffer* buf) override {
if (grpc_event_engine::experimental::UseEventEngineListener()) {
grpc_event_engine::experimental::SliceBuffer pending_data;
if (buf != nullptr) {
pending_data =
grpc_event_engine::experimental::SliceBuffer::TakeCSliceBuffer(
buf->data.raw.slice_buffer);
}
GPR_ASSERT(GRPC_LOG_IF_ERROR("listener_handle_external_connection",
s_->ee_listener->HandleExternalConnection(
listener_fd, fd, &pending_data)));
return;
}
grpc_pollset* read_notifier_pollset;
grpc_resolved_address addr;
memset(&addr, 0, sizeof(addr));

@ -21,6 +21,11 @@
#include <grpc/support/port_platform.h>
#include <memory>
#include "absl/container/flat_hash_map.h"
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
@ -99,6 +104,11 @@ struct grpc_tcp_server {
// used to create slice allocators for endpoints, owned
grpc_core::MemoryQuotaRefPtr memory_quota;
/* used when event engine based servers are enabled */
int n_bind_ports = 0;
absl::flat_hash_map<int, std::tuple<int, int>> listen_fd_to_index_map;
std::unique_ptr<grpc_event_engine::experimental::PosixListenerWithFdSupport>
ee_listener = nullptr;
/* used to store a pre-allocated FD assigned to a socket */
int pre_allocated_fd;
};

@ -28,6 +28,7 @@
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include <grpc/byte_buffer.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/slice_buffer.h>
@ -38,6 +39,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
namespace grpc_core {
@ -189,6 +191,14 @@ void HandshakeManager::DoHandshake(grpc_endpoint* endpoint,
acceptor->pending_data != nullptr) {
grpc_slice_buffer_swap(args_.read_buffer,
&(acceptor->pending_data->data.raw.slice_buffer));
// TODO(vigneshbabu): For connections accepted through event engine
// listeners, the ownership of the byte buffer received is transferred to
// this callback and it is thus this callback's duty to delete it.
// Make this hack default once event engine is rolled out.
if (grpc_event_engine::experimental::grpc_is_event_engine_endpoint(
endpoint)) {
grpc_byte_buffer_destroy(acceptor->pending_data);
}
}
// Initialize state needed for calling handshakers.
acceptor_ = acceptor;

@ -585,6 +585,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/ev_poll_posix.cc',
'src/core/lib/iomgr/ev_posix.cc',
'src/core/lib/iomgr/ev_windows.cc',
'src/core/lib/iomgr/event_engine_shims/closure.cc',
'src/core/lib/iomgr/event_engine_shims/endpoint.cc',
'src/core/lib/iomgr/event_engine_shims/tcp_client.cc',
'src/core/lib/iomgr/exec_ctx.cc',

@ -656,6 +656,7 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(
grpc_sockaddr_in* addr =
reinterpret_cast<grpc_sockaddr_in*>(resolved_addr.addr);
memset(&resolved_addr, 0, sizeof(resolved_addr));
resolved_addr.len = sizeof(grpc_sockaddr_in);
addr->sin_family = GRPC_AF_INET;
grpc_sockaddr_set_port(&resolved_addr, proxy_port);
int port;

@ -266,6 +266,7 @@ grpc_cc_test(
language = "C++",
tags = [
"endpoint_test",
"event_engine_listener_test",
"no_mac", # TODO(jtattermusch): Reenable once https://github.com/grpc/grpc/issues/21282 is fixed.
"no_windows",
],
@ -313,7 +314,10 @@ grpc_cc_test(
srcs = ["tcp_server_posix_test.cc"],
external_deps = ["gtest"],
language = "C++",
tags = ["no_windows"],
tags = [
"event_engine_listener_test",
"no_windows",
],
deps = [
"//:gpr",
"//:grpc",

@ -16,7 +16,10 @@
//
//
#include "absl/time/time.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/port.h"
@ -37,11 +40,14 @@
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/event_engine/shim.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/iomgr/buffer_list.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
#include "src/core/lib/iomgr/sockaddr_posix.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
@ -524,11 +530,15 @@ static void write_test(size_t num_bytes, size_t slice_size,
static_cast<grpc_resource_quota*>(a[1].value.pointer.p));
}
struct release_fd_arg {
std::atomic<int> fd_released_done{0};
grpc_core::Notification notify;
};
void on_fd_released(void* arg, grpc_error_handle /*errors*/) {
int* done = static_cast<int*>(arg);
*done = 1;
GPR_ASSERT(
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
release_fd_arg* rel_fd = static_cast<release_fd_arg*>(arg);
rel_fd->fd_released_done = 1;
rel_fd->notify.Notify();
}
// Do a read_test, then release fd and try to read/write again. Verify that
@ -543,8 +553,8 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
grpc_timeout_seconds_to_deadline(20));
grpc_core::ExecCtx exec_ctx;
grpc_closure fd_released_cb;
int fd_released_done = 0;
GRPC_CLOSURE_INIT(&fd_released_cb, &on_fd_released, &fd_released_done,
release_fd_arg rel_fd;
GRPC_CLOSURE_INIT(&fd_released_cb, &on_fd_released, &rel_fd,
grpc_schedule_on_exec_ctx);
gpr_log(GPR_INFO,
@ -561,7 +571,22 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
a[1].type = GRPC_ARG_POINTER;
a[1].value.pointer.p = grpc_resource_quota_create("test");
a[1].value.pointer.vtable = grpc_resource_quota_arg_vtable();
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
if (grpc_event_engine::experimental::UseEventEngineListener()) {
// Create an event engine wrapped endpoint to test release_fd operations.
auto eeep =
reinterpret_cast<
grpc_event_engine::experimental::PosixEventEngineWithFdSupport*>(
grpc_event_engine::experimental::GetDefaultEventEngine().get())
->CreatePosixEndpointFromFd(
sv[1],
grpc_event_engine::experimental::ChannelArgsEndpointConfig(
grpc_core::ChannelArgs::FromC(&args)),
memory_quota->CreateMemoryAllocator("test"));
ep = grpc_event_engine::experimental::grpc_event_engine_endpoint_create(
std::move(eeep));
} else {
ep = grpc_tcp_create(
grpc_fd_create(sv[1], "read_test", false),
TcpOptionsFromEndpointConfig(
@ -569,6 +594,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
grpc_core::ChannelArgs::FromC(&args))),
"test");
GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
}
grpc_endpoint_add_to_pollset(ep, g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
@ -601,17 +627,9 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
grpc_slice_buffer_destroy(&state.incoming);
grpc_tcp_destroy_and_release_fd(ep, &fd, &fd_released_cb);
grpc_core::ExecCtx::Get()->Flush();
gpr_mu_lock(g_mu);
while (!fd_released_done) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done);
}
gpr_mu_unlock(g_mu);
GPR_ASSERT(fd_released_done == 1);
rel_fd.notify.WaitForNotificationWithTimeout(absl::Seconds(20));
GPR_ASSERT(rel_fd.fd_released_done == 1);
GPR_ASSERT(fd == sv[1]);
written_bytes = fill_socket_partial(sv[0], num_bytes);
drain_socket_blocking(fd, written_bytes, written_bytes);
written_bytes = fill_socket_partial(fd, num_bytes);

@ -18,6 +18,7 @@
#include <gtest/gtest.h>
#include "src/core/lib/event_engine/shim.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/port.h"
#include "test/core/util/test_config.h"
@ -513,6 +514,11 @@ static void test_pre_allocated_inet_fd() {
struct sockaddr_in6* addr =
reinterpret_cast<struct sockaddr_in6*>(resolved_addr.addr);
grpc_tcp_server* s;
if (grpc_event_engine::experimental::UseEventEngineListener()) {
// TODO(vigneshbabu): Skip the test when event engine is enabled.
// Pre-allocated fd support will be added to event engine later.
return;
}
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);
@ -610,6 +616,11 @@ static void test_pre_allocated_unix_fd() {
struct sockaddr_un* addr =
reinterpret_cast<struct sockaddr_un*>(resolved_addr.addr);
grpc_tcp_server* s;
if (grpc_event_engine::experimental::UseEventEngineListener()) {
// TODO(vigneshbabu): Skip the test when event engine is enabled.
// Pre-allocated fd support will be added to event engine later.
return;
}
auto args = grpc_core::CoreConfiguration::Get()
.channel_args_preconditioning()
.PreconditionChannelArgs(nullptr);

@ -154,6 +154,7 @@ void bad_server_thread(void* vargs) {
ASSERT_TRUE(error.ok());
memset(&resolved_addr, 0, sizeof(resolved_addr));
addr->sa_family = GRPC_AF_INET;
resolved_addr.len = sizeof(grpc_sockaddr_in);
error = grpc_tcp_server_add_port(s, &resolved_addr, &port);
ASSERT_TRUE(GRPC_LOG_IF_ERROR("grpc_tcp_server_add_port", error));
ASSERT_GT(port, 0);

@ -44,7 +44,9 @@
static void on_server_destroyed(void* data, grpc_error_handle /*error*/) {
test_tcp_server* server = static_cast<test_tcp_server*>(data);
gpr_mu_lock(server->mu);
server->shutdown = true;
gpr_mu_unlock(server->mu);
}
void test_tcp_server_init(test_tcp_server* server,
@ -116,10 +118,14 @@ void test_tcp_server_destroy(test_tcp_server* server) {
shutdown_deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(5, GPR_TIMESPAN));
grpc_core::ExecCtx::Get()->Flush();
gpr_mu_lock(server->mu);
while (!server->shutdown &&
gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), shutdown_deadline) < 0) {
test_tcp_server_poll(server, 1000);
gpr_mu_unlock(server->mu);
test_tcp_server_poll(server, 100);
gpr_mu_lock(server->mu);
}
gpr_mu_unlock(server->mu);
grpc_pollset_shutdown(server->pollset[0],
GRPC_CLOSURE_CREATE(finish_pollset, server->pollset[0],
grpc_schedule_on_exec_ctx));

@ -2248,6 +2248,8 @@ src/core/lib/iomgr/ev_poll_posix.h \
src/core/lib/iomgr/ev_posix.cc \
src/core/lib/iomgr/ev_posix.h \
src/core/lib/iomgr/ev_windows.cc \
src/core/lib/iomgr/event_engine_shims/closure.cc \
src/core/lib/iomgr/event_engine_shims/closure.h \
src/core/lib/iomgr/event_engine_shims/endpoint.cc \
src/core/lib/iomgr/event_engine_shims/endpoint.h \
src/core/lib/iomgr/event_engine_shims/tcp_client.cc \

@ -2030,6 +2030,8 @@ src/core/lib/iomgr/ev_poll_posix.h \
src/core/lib/iomgr/ev_posix.cc \
src/core/lib/iomgr/ev_posix.h \
src/core/lib/iomgr/ev_windows.cc \
src/core/lib/iomgr/event_engine_shims/closure.cc \
src/core/lib/iomgr/event_engine_shims/closure.h \
src/core/lib/iomgr/event_engine_shims/endpoint.cc \
src/core/lib/iomgr/event_engine_shims/endpoint.h \
src/core/lib/iomgr/event_engine_shims/tcp_client.cc \

Loading…
Cancel
Save