Move posix event engine to new interface defined in #31816 (#31890)

* update

* regenerate projects

* fix sanity

* review comments

* An EventEngine subclass to be implemented by all posix based event engines

* sanity

* comments

* update

* review comments

* re-word

* fix

* update

* review comments

* regenerate projects

* syntax fix

* add lock free event benchmark

* releasable mutex lock

* fix build isue

* update

* start

* regenerate projects

* update

* fix

* windows build

* update

* windows portability issue

* update

* update

* format

* update

* update

* update

* fix sanity

* regenerate projects

* update

* iwyu

* fix

* fix

* update

* fix sanity

* review comments
pull/31661/head
Vignesh Babu 2 years ago committed by GitHub
parent 63d1edd0f2
commit 5680a9b57b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      build_autogenerated.yaml
  2. 2
      gRPC-C++.podspec
  3. 2
      gRPC-Core.podspec
  4. 1
      grpc.gemspec
  5. 1
      package.xml
  6. 3
      src/core/BUILD
  7. 13
      src/core/lib/event_engine/posix_engine/posix_endpoint.cc
  8. 49
      src/core/lib/event_engine/posix_engine/posix_endpoint.h
  9. 47
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  10. 15
      src/core/lib/event_engine/posix_engine/posix_engine.h
  11. 52
      src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
  12. 70
      src/core/lib/event_engine/posix_engine/posix_engine_listener.h
  13. 1
      tools/doxygen/Doxyfile.c++.internal
  14. 1
      tools/doxygen/Doxyfile.core.internal

@ -778,6 +778,7 @@ libs:
- src/core/lib/event_engine/forkable.h - src/core/lib/event_engine/forkable.h
- src/core/lib/event_engine/handle_containers.h - src/core/lib/event_engine/handle_containers.h
- src/core/lib/event_engine/poller.h - src/core/lib/event_engine/poller.h
- src/core/lib/event_engine/posix.h
- src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h - src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
- src/core/lib/event_engine/posix_engine/ev_poll_posix.h - src/core/lib/event_engine/posix_engine/ev_poll_posix.h
- src/core/lib/event_engine/posix_engine/event_poller.h - src/core/lib/event_engine/posix_engine/event_poller.h
@ -2086,6 +2087,7 @@ libs:
- src/core/lib/event_engine/forkable.h - src/core/lib/event_engine/forkable.h
- src/core/lib/event_engine/handle_containers.h - src/core/lib/event_engine/handle_containers.h
- src/core/lib/event_engine/poller.h - src/core/lib/event_engine/poller.h
- src/core/lib/event_engine/posix.h
- src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h - src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
- src/core/lib/event_engine/posix_engine/ev_poll_posix.h - src/core/lib/event_engine/posix_engine/ev_poll_posix.h
- src/core/lib/event_engine/posix_engine/event_poller.h - src/core/lib/event_engine/posix_engine/event_poller.h
@ -3522,6 +3524,7 @@ libs:
- src/core/lib/event_engine/forkable.h - src/core/lib/event_engine/forkable.h
- src/core/lib/event_engine/handle_containers.h - src/core/lib/event_engine/handle_containers.h
- src/core/lib/event_engine/poller.h - src/core/lib/event_engine/poller.h
- src/core/lib/event_engine/posix.h
- src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h - src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
- src/core/lib/event_engine/posix_engine/ev_poll_posix.h - src/core/lib/event_engine/posix_engine/ev_poll_posix.h
- src/core/lib/event_engine/posix_engine/event_poller.h - src/core/lib/event_engine/posix_engine/event_poller.h
@ -7261,6 +7264,7 @@ targets:
- src/core/lib/event_engine/forkable.h - src/core/lib/event_engine/forkable.h
- src/core/lib/event_engine/handle_containers.h - src/core/lib/event_engine/handle_containers.h
- src/core/lib/event_engine/poller.h - src/core/lib/event_engine/poller.h
- src/core/lib/event_engine/posix.h
- src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h - src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
- src/core/lib/event_engine/posix_engine/ev_poll_posix.h - src/core/lib/event_engine/posix_engine/ev_poll_posix.h
- src/core/lib/event_engine/posix_engine/event_poller.h - src/core/lib/event_engine/posix_engine/event_poller.h

2
gRPC-C++.podspec generated

@ -728,6 +728,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/forkable.h', 'src/core/lib/event_engine/forkable.h',
'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/handle_containers.h',
'src/core/lib/event_engine/poller.h', 'src/core/lib/event_engine/poller.h',
'src/core/lib/event_engine/posix.h',
'src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h', 'src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h',
'src/core/lib/event_engine/posix_engine/ev_poll_posix.h', 'src/core/lib/event_engine/posix_engine/ev_poll_posix.h',
'src/core/lib/event_engine/posix_engine/event_poller.h', 'src/core/lib/event_engine/posix_engine/event_poller.h',
@ -1640,6 +1641,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/forkable.h', 'src/core/lib/event_engine/forkable.h',
'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/handle_containers.h',
'src/core/lib/event_engine/poller.h', 'src/core/lib/event_engine/poller.h',
'src/core/lib/event_engine/posix.h',
'src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h', 'src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h',
'src/core/lib/event_engine/posix_engine/ev_poll_posix.h', 'src/core/lib/event_engine/posix_engine/ev_poll_posix.h',
'src/core/lib/event_engine/posix_engine/event_poller.h', 'src/core/lib/event_engine/posix_engine/event_poller.h',

2
gRPC-Core.podspec generated

@ -1118,6 +1118,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/handle_containers.h',
'src/core/lib/event_engine/memory_allocator.cc', 'src/core/lib/event_engine/memory_allocator.cc',
'src/core/lib/event_engine/poller.h', 'src/core/lib/event_engine/poller.h',
'src/core/lib/event_engine/posix.h',
'src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc', 'src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc',
'src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h', 'src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h',
'src/core/lib/event_engine/posix_engine/ev_poll_posix.cc', 'src/core/lib/event_engine/posix_engine/ev_poll_posix.cc',
@ -2309,6 +2310,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/forkable.h', 'src/core/lib/event_engine/forkable.h',
'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/handle_containers.h',
'src/core/lib/event_engine/poller.h', 'src/core/lib/event_engine/poller.h',
'src/core/lib/event_engine/posix.h',
'src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h', 'src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h',
'src/core/lib/event_engine/posix_engine/ev_poll_posix.h', 'src/core/lib/event_engine/posix_engine/ev_poll_posix.h',
'src/core/lib/event_engine/posix_engine/event_poller.h', 'src/core/lib/event_engine/posix_engine/event_poller.h',

1
grpc.gemspec generated

@ -1029,6 +1029,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/handle_containers.h ) s.files += %w( src/core/lib/event_engine/handle_containers.h )
s.files += %w( src/core/lib/event_engine/memory_allocator.cc ) s.files += %w( src/core/lib/event_engine/memory_allocator.cc )
s.files += %w( src/core/lib/event_engine/poller.h ) s.files += %w( src/core/lib/event_engine/poller.h )
s.files += %w( src/core/lib/event_engine/posix.h )
s.files += %w( src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc ) s.files += %w( src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc )
s.files += %w( src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h ) s.files += %w( src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h )
s.files += %w( src/core/lib/event_engine/posix_engine/ev_poll_posix.cc ) s.files += %w( src/core/lib/event_engine/posix_engine/ev_poll_posix.cc )

1
package.xml generated

@ -1011,6 +1011,7 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/handle_containers.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/event_engine/handle_containers.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/memory_allocator.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/event_engine/memory_allocator.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/poller.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/event_engine/poller.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/ev_poll_posix.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/ev_poll_posix.cc" role="src" />

@ -1725,6 +1725,7 @@ grpc_cc_library(
"iomgr_port", "iomgr_port",
"load_file", "load_file",
"memory_quota", "memory_quota",
"posix_event_engine_base_hdrs",
"posix_event_engine_closure", "posix_event_engine_closure",
"posix_event_engine_event_poller", "posix_event_engine_event_poller",
"posix_event_engine_internal_errqueue", "posix_event_engine_internal_errqueue",
@ -1840,6 +1841,7 @@ grpc_cc_library(
deps = [ deps = [
"event_engine_tcp_socket_utils", "event_engine_tcp_socket_utils",
"iomgr_port", "iomgr_port",
"posix_event_engine_base_hdrs",
"posix_event_engine_closure", "posix_event_engine_closure",
"posix_event_engine_endpoint", "posix_event_engine_endpoint",
"posix_event_engine_event_poller", "posix_event_engine_event_poller",
@ -1876,6 +1878,7 @@ grpc_cc_library(
"event_engine_utils", "event_engine_utils",
"init_internally", "init_internally",
"iomgr_port", "iomgr_port",
"posix_event_engine_base_hdrs",
"posix_event_engine_closure", "posix_event_engine_closure",
"posix_event_engine_endpoint", "posix_event_engine_endpoint",
"posix_event_engine_event_poller", "posix_event_engine_event_poller",

@ -1168,12 +1168,15 @@ void PosixEndpointImpl::Write(
} }
} }
void PosixEndpointImpl::MaybeShutdown(absl::Status why) { void PosixEndpointImpl::MaybeShutdown(
absl::Status why,
absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd) {
if (poller_->CanTrackErrors()) { if (poller_->CanTrackErrors()) {
ZerocopyDisableAndWaitForRemaining(); ZerocopyDisableAndWaitForRemaining();
stop_error_notification_.store(true, std::memory_order_release); stop_error_notification_.store(true, std::memory_order_release);
handle_->SetHasError(); handle_->SetHasError();
} }
on_release_fd_ = std::move(on_release_fd);
grpc_core::StatusSetInt(&why, grpc_core::StatusIntProperty::kRpcStatus, grpc_core::StatusSetInt(&why, grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE); GRPC_STATUS_UNAVAILABLE);
handle_->ShutdownHandle(why); handle_->ShutdownHandle(why);
@ -1181,7 +1184,13 @@ void PosixEndpointImpl::MaybeShutdown(absl::Status why) {
} }
PosixEndpointImpl ::~PosixEndpointImpl() { PosixEndpointImpl ::~PosixEndpointImpl() {
handle_->OrphanHandle(on_done_, nullptr, ""); int release_fd = -1;
handle_->OrphanHandle(on_done_,
on_release_fd_ == nullptr ? nullptr : &release_fd, "");
if (on_release_fd_ != nullptr) {
engine_->Run([on_release_fd = std::move(on_release_fd_),
release_fd]() mutable { on_release_fd(release_fd); });
}
delete on_read_; delete on_read_;
delete on_write_; delete on_write_;
delete on_error_; delete on_error_;

@ -31,6 +31,7 @@
#include "absl/hash/hash.h" #include "absl/hash/hash.h"
#include "absl/meta/type_traits.h" #include "absl/meta/type_traits.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h> #include <grpc/event_engine/memory_allocator.h>
@ -38,6 +39,7 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
@ -489,7 +491,11 @@ class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> {
return local_address_; return local_address_;
} }
void MaybeShutdown(absl::Status why); int GetWrappedFd() { return fd_; }
void MaybeShutdown(
absl::Status why,
absl::AnyInvocable<void(absl::StatusOr<int> release_fd)> on_release_fd);
private: private:
void UpdateRcvLowat() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_); void UpdateRcvLowat() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_);
@ -563,9 +569,11 @@ class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> {
void* outgoing_buffer_arg_ = nullptr; void* outgoing_buffer_arg_ = nullptr;
// A counter which starts at 0. It is initialized the first time the socket absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd_ = nullptr;
// options for collecting timestamps are set, and is incremented with each
// byte sent. // A counter which starts at 0. It is initialized the first time the
// socket options for collecting timestamps are set, and is incremented
// with each byte sent.
int bytes_counter_ = -1; int bytes_counter_ = -1;
// True if timestamping options are set on the socket. // True if timestamping options are set on the socket.
#ifdef GRPC_LINUX_ERRQUEUE #ifdef GRPC_LINUX_ERRQUEUE
@ -587,8 +595,7 @@ class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> {
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_; std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
}; };
class PosixEndpoint class PosixEndpoint : public PosixEndpointWithFdSupport {
: public grpc_event_engine::experimental::EventEngine::Endpoint {
public: public:
PosixEndpoint( PosixEndpoint(
EventHandle* handle, PosixEngineClosure* on_shutdown, EventHandle* handle, PosixEngineClosure* on_shutdown,
@ -623,18 +630,31 @@ class PosixEndpoint
return impl_->GetLocalAddress(); return impl_->GetLocalAddress();
} }
int GetWrappedFd() override { return impl_->GetWrappedFd(); }
void Shutdown(absl::AnyInvocable<void(absl::StatusOr<int> release_fd)>
on_release_fd) override {
if (!shutdown_.exchange(true, std::memory_order_acq_rel)) {
impl_->MaybeShutdown(absl::FailedPreconditionError("Endpoint closing"),
std::move(on_release_fd));
}
}
~PosixEndpoint() override { ~PosixEndpoint() override {
impl_->MaybeShutdown(absl::InternalError("Endpoint closing")); if (!shutdown_.exchange(true, std::memory_order_acq_rel)) {
impl_->MaybeShutdown(absl::FailedPreconditionError("Endpoint closing"),
nullptr);
}
} }
private: private:
PosixEndpointImpl* impl_; PosixEndpointImpl* impl_;
std::atomic<bool> shutdown_{false};
}; };
#else // GRPC_POSIX_SOCKET_TCP #else // GRPC_POSIX_SOCKET_TCP
class PosixEndpoint class PosixEndpoint : public PosixEndpointWithFdSupport {
: public grpc_event_engine::experimental::EventEngine::Endpoint {
public: public:
PosixEndpoint() = default; PosixEndpoint() = default;
@ -663,6 +683,17 @@ class PosixEndpoint
"PosixEndpoint::GetLocalAddress not supported on this platform"); "PosixEndpoint::GetLocalAddress not supported on this platform");
} }
int GetWrappedFd() override {
GPR_ASSERT(false &&
"PosixEndpoint::GetWrappedFd not supported on this platform");
}
void Shutdown(absl::AnyInvocable<void(absl::StatusOr<int> release_fd)>
on_release_fd) override {
GPR_ASSERT(false &&
"PosixEndpoint::Shutdown not supported on this platform");
}
~PosixEndpoint() override = default; ~PosixEndpoint() override = default;
}; };

@ -32,11 +32,14 @@
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h> #include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/support/cpu.h> #include <grpc/support/cpu.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/poller.h" #include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/posix_engine/timer.h" #include "src/core/lib/event_engine/posix_engine/timer.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/trace.h" #include "src/core/lib/event_engine/trace.h"
@ -549,12 +552,56 @@ EventEngine::ConnectionHandle PosixEventEngine::Connect(
#endif // GRPC_POSIX_SOCKET_TCP #endif // GRPC_POSIX_SOCKET_TCP
} }
std::unique_ptr<PosixEndpointWithFdSupport>
PosixEventEngine::CreatePosixEndpointFromFd(int fd,
const EndpointConfig& config,
MemoryAllocator memory_allocator) {
#ifdef GRPC_POSIX_SOCKET_TCP
GPR_DEBUG_ASSERT(fd > 0);
PosixEventPoller* poller = poller_manager_->Poller();
GPR_DEBUG_ASSERT(poller != nullptr);
EventHandle* handle =
poller->CreateHandle(fd, "tcp-client", poller->CanTrackErrors());
return CreatePosixEndpoint(handle, nullptr, shared_from_this(),
std::move(memory_allocator),
TcpOptionsFromEndpointConfig(config));
#else // GRPC_POSIX_SOCKET_TCP
grpc_core::Crash(
"PosixEventEngine::CreatePosixEndpointFromFd is not supported on "
"this platform");
#endif // GRPC_POSIX_SOCKET_TCP
}
absl::StatusOr<std::unique_ptr<EventEngine::Listener>> absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
PosixEventEngine::CreateListener( PosixEventEngine::CreateListener(
Listener::AcceptCallback on_accept, Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown, absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config, const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) { std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
#ifdef GRPC_POSIX_SOCKET_TCP
PosixEventEngineWithFdSupport::PosixAcceptCallback posix_on_accept =
[on_accept_cb = std::move(on_accept)](
int /*listener_fd*/, std::unique_ptr<EventEngine::Endpoint> ep,
bool /*is_external*/, MemoryAllocator allocator,
SliceBuffer* /*pending_data*/) mutable {
on_accept_cb(std::move(ep), std::move(allocator));
};
return std::make_unique<PosixEngineListener>(
std::move(posix_on_accept), std::move(on_shutdown), config,
std::move(memory_allocator_factory), poller_manager_->Poller(),
shared_from_this());
#else // GRPC_POSIX_SOCKET_TCP
grpc_core::Crash(
"EventEngine::CreateListener is not supported on this platform");
#endif // GRPC_POSIX_SOCKET_TCP
}
absl::StatusOr<std::unique_ptr<PosixListenerWithFdSupport>>
PosixEventEngine::CreatePosixListener(
PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
#ifdef GRPC_POSIX_SOCKET_TCP #ifdef GRPC_POSIX_SOCKET_TCP
return std::make_unique<PosixEngineListener>( return std::make_unique<PosixEngineListener>(
std::move(on_accept), std::move(on_shutdown), config, std::move(on_accept), std::move(on_shutdown), config,

@ -35,6 +35,7 @@
#include <grpc/event_engine/memory_allocator.h> #include <grpc/event_engine/memory_allocator.h>
#include "src/core/lib/event_engine/handle_containers.h" #include "src/core/lib/event_engine/handle_containers.h"
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/timer_manager.h" #include "src/core/lib/event_engine/posix_engine/timer_manager.h"
#include "src/core/lib/event_engine/thread_pool.h" #include "src/core/lib/event_engine/thread_pool.h"
@ -131,7 +132,7 @@ class PosixEnginePollerManager
// All methods require an ExecCtx to already exist on the thread's stack. // All methods require an ExecCtx to already exist on the thread's stack.
// TODO(ctiller): KeepsGrpcInitialized is an interim measure to ensure that // TODO(ctiller): KeepsGrpcInitialized is an interim measure to ensure that
// event engine is shut down before we shut down iomgr. // event engine is shut down before we shut down iomgr.
class PosixEventEngine final : public EventEngine, class PosixEventEngine final : public PosixEventEngineWithFdSupport,
public grpc_core::KeepsGrpcInitialized { public grpc_core::KeepsGrpcInitialized {
public: public:
class PosixDNSResolver : public EventEngine::DNSResolver { class PosixDNSResolver : public EventEngine::DNSResolver {
@ -163,6 +164,10 @@ class PosixEventEngine final : public EventEngine,
~PosixEventEngine() override; ~PosixEventEngine() override;
std::unique_ptr<PosixEndpointWithFdSupport> CreatePosixEndpointFromFd(
int fd, const EndpointConfig& config,
MemoryAllocator memory_allocator) override;
absl::StatusOr<std::unique_ptr<Listener>> CreateListener( absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept, Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown, absl::AnyInvocable<void(absl::Status)> on_shutdown,
@ -170,6 +175,14 @@ class PosixEventEngine final : public EventEngine,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
override; override;
absl::StatusOr<std::unique_ptr<PosixListenerWithFdSupport>>
CreatePosixListener(
PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
override;
ConnectionHandle Connect(OnConnectCallback on_connect, ConnectionHandle Connect(OnConnectCallback on_connect,
const ResolvedAddress& addr, const ResolvedAddress& addr,
const EndpointConfig& args, const EndpointConfig& args,

@ -14,6 +14,7 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/iomgr/port.h" #include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_TCP #ifdef GRPC_POSIX_SOCKET_TCP
@ -46,7 +47,7 @@ namespace grpc_event_engine {
namespace experimental { namespace experimental {
PosixEngineListenerImpl::PosixEngineListenerImpl( PosixEngineListenerImpl::PosixEngineListenerImpl(
EventEngine::Listener::AcceptCallback on_accept, PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown, absl::AnyInvocable<void(absl::Status)> on_shutdown,
const grpc_event_engine::experimental::EndpointConfig& config, const grpc_event_engine::experimental::EndpointConfig& config,
std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory> std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory>
@ -61,7 +62,8 @@ PosixEngineListenerImpl::PosixEngineListenerImpl(
memory_allocator_factory_(std::move(memory_allocator_factory)) {} memory_allocator_factory_(std::move(memory_allocator_factory)) {}
absl::StatusOr<int> PosixEngineListenerImpl::Bind( absl::StatusOr<int> PosixEngineListenerImpl::Bind(
const EventEngine::ResolvedAddress& addr) { const EventEngine::ResolvedAddress& addr,
PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd) {
EventEngine::ResolvedAddress res_addr = addr; EventEngine::ResolvedAddress res_addr = addr;
EventEngine::ResolvedAddress addr6_v4mapped; EventEngine::ResolvedAddress addr6_v4mapped;
int requested_port = ResolvedAddressGetPort(res_addr); int requested_port = ResolvedAddressGetPort(res_addr);
@ -89,6 +91,9 @@ absl::StatusOr<int> PosixEngineListenerImpl::Bind(
} }
auto used_port = ResolvedAddressIsWildcard(res_addr); auto used_port = ResolvedAddressIsWildcard(res_addr);
// Update the callback. Any subsequent new sockets created and added to
// acceptors_ in this function will invoke the new callback.
acceptors_.UpdateOnAppendCallback(std::move(on_bind_new_fd));
if (used_port.has_value()) { if (used_port.has_value()) {
requested_port = *used_port; requested_port = *used_port;
return ListenerContainerAddWildcardAddresses(acceptors_, options_, return ListenerContainerAddWildcardAddresses(acceptors_, options_,
@ -183,13 +188,52 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
// Call on_accept_ and then resume accepting new connections by continuing // Call on_accept_ and then resume accepting new connections by continuing
// the parent for-loop. // the parent for-loop.
listener_->on_accept_( listener_->on_accept_(
std::move(endpoint), /*listener_fd=*/handle_->WrappedFd(), /*endpoint=*/std::move(endpoint),
/*is_external=*/false,
/*memory_allocator=*/
listener_->memory_allocator_factory_->CreateMemoryAllocator( listener_->memory_allocator_factory_->CreateMemoryAllocator(
absl::StrCat("on-accept-tcp-server-connection: ", peer_name))); absl::StrCat("on-accept-tcp-server-connection: ", peer_name)),
/*pending_data=*/nullptr);
} }
GPR_UNREACHABLE_CODE(return); GPR_UNREACHABLE_CODE(return);
} }
absl::Status PosixEngineListenerImpl::HandleExternalConnection(
int listener_fd, int fd, SliceBuffer* pending_data) {
if (listener_fd < 0) {
return absl::UnknownError(absl::StrCat(
"HandleExternalConnection: Invalid listener socket: ", listener_fd));
}
if (fd < 0) {
return absl::UnknownError(
absl::StrCat("HandleExternalConnection: Invalid peer socket: ", fd));
}
PosixSocketWrapper sock(fd);
(void)sock.SetSocketNoSigpipeIfPossible();
auto peer_name = sock.PeerAddressString();
if (!peer_name.ok()) {
return absl::UnknownError(
absl::StrCat("HandleExternalConnection: peer not connected: ",
peer_name.status().ToString()));
}
auto endpoint = CreatePosixEndpoint(
/*handle=*/poller_->CreateHandle(fd, *peer_name,
poller_->CanTrackErrors()),
/*on_shutdown=*/nullptr, /*engine=*/engine_,
/*allocator=*/
memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat(
"external:endpoint-tcp-server-connection: ", *peer_name)),
/*options=*/options_);
on_accept_(
/*listener_fd=*/listener_fd, /*endpoint=*/std::move(endpoint),
/*is_external=*/true,
/*memory_allocator=*/
memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat(
"external:on-accept-tcp-server-connection: ", *peer_name)),
/*pending_data=*/pending_data);
return absl::OkStatus();
}
void PosixEngineListenerImpl::AsyncConnectionAcceptor::Shutdown() { void PosixEngineListenerImpl::AsyncConnectionAcceptor::Shutdown() {
// The ShutdownHandle whould trigger any waiting notify_on_accept_ to get // The ShutdownHandle whould trigger any waiting notify_on_accept_ to get
// scheduled with the not-OK status. // scheduled with the not-OK status.

@ -33,7 +33,9 @@
#include <grpc/event_engine/endpoint_config.h> #include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h> #include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice_buffer.h>
#include "src/core/lib/event_engine/posix.h"
#include "src/core/lib/iomgr/port.h" #include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_TCP #ifdef GRPC_POSIX_SOCKET_TCP
@ -52,7 +54,7 @@ class PosixEngineListenerImpl
: public std::enable_shared_from_this<PosixEngineListenerImpl> { : public std::enable_shared_from_this<PosixEngineListenerImpl> {
public: public:
PosixEngineListenerImpl( PosixEngineListenerImpl(
EventEngine::Listener::AcceptCallback on_accept, PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown, absl::AnyInvocable<void(absl::Status)> on_shutdown,
const grpc_event_engine::experimental::EndpointConfig& config, const grpc_event_engine::experimental::EndpointConfig& config,
std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory> std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory>
@ -60,12 +62,17 @@ class PosixEngineListenerImpl
PosixEventPoller* poller, std::shared_ptr<EventEngine> engine); PosixEventPoller* poller, std::shared_ptr<EventEngine> engine);
// Binds an address to the listener. This creates a ListenerSocket // Binds an address to the listener. This creates a ListenerSocket
// and sets its fields appropriately. // and sets its fields appropriately.
absl::StatusOr<int> Bind(const EventEngine::ResolvedAddress& addr); absl::StatusOr<int> Bind(
const EventEngine::ResolvedAddress& addr,
PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd);
// Signals event manager to listen for connections on all created sockets. // Signals event manager to listen for connections on all created sockets.
absl::Status Start(); absl::Status Start();
// Trigger graceful shutdown of all asynchronous accept operations. // Trigger graceful shutdown of all asynchronous accept operations.
void TriggerShutdown(); void TriggerShutdown();
absl::Status HandleExternalConnection(int listener_fd, int fd,
SliceBuffer* pending_data);
~PosixEngineListenerImpl(); ~PosixEngineListenerImpl();
private: private:
@ -119,9 +126,18 @@ class PosixEngineListenerImpl
public: public:
explicit ListenerAsyncAcceptors(PosixEngineListenerImpl* listener) explicit ListenerAsyncAcceptors(PosixEngineListenerImpl* listener)
: listener_(listener){}; : listener_(listener){};
void UpdateOnAppendCallback(
PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_append) {
on_append_ = std::move(on_append);
}
void Append(ListenerSocket socket) override { void Append(ListenerSocket socket) override {
acceptors_.push_back(new AsyncConnectionAcceptor( acceptors_.push_back(new AsyncConnectionAcceptor(
listener_->engine_, listener_->shared_from_this(), socket)); listener_->engine_, listener_->shared_from_this(), socket));
if (on_append_) {
on_append_(socket.sock.Fd());
}
} }
absl::StatusOr<ListenerSocket> Find( absl::StatusOr<ListenerSocket> Find(
@ -148,6 +164,7 @@ class PosixEngineListenerImpl
} }
private: private:
PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_append_;
std::list<AsyncConnectionAcceptor*> acceptors_; std::list<AsyncConnectionAcceptor*> acceptors_;
PosixEngineListenerImpl* listener_; PosixEngineListenerImpl* listener_;
}; };
@ -163,7 +180,7 @@ class PosixEngineListenerImpl
// operation. // operation.
ListenerAsyncAcceptors acceptors_ ABSL_GUARDED_BY(mu_); ListenerAsyncAcceptors acceptors_ ABSL_GUARDED_BY(mu_);
// Callback to be invoked upon accepting a connection. // Callback to be invoked upon accepting a connection.
EventEngine::Listener::AcceptCallback on_accept_; PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept_;
// Callback to be invoked upon shutdown of listener. // Callback to be invoked upon shutdown of listener.
absl::AnyInvocable<void(absl::Status)> on_shutdown_; absl::AnyInvocable<void(absl::Status)> on_shutdown_;
// Set to true when the listener has started listening for new connections. // Set to true when the listener has started listening for new connections.
@ -175,12 +192,10 @@ class PosixEngineListenerImpl
memory_allocator_factory_; memory_allocator_factory_;
}; };
class PosixEngineListener class PosixEngineListener : public PosixListenerWithFdSupport {
: public grpc_event_engine::experimental::EventEngine::Listener {
public: public:
PosixEngineListener( PosixEngineListener(
grpc_event_engine::experimental::EventEngine::Listener::AcceptCallback PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,
on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown, absl::AnyInvocable<void(absl::Status)> on_shutdown,
const grpc_event_engine::experimental::EndpointConfig& config, const grpc_event_engine::experimental::EndpointConfig& config,
std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory> std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory>
@ -189,16 +204,34 @@ class PosixEngineListener
: impl_(std::make_shared<PosixEngineListenerImpl>( : impl_(std::make_shared<PosixEngineListenerImpl>(
std::move(on_accept), std::move(on_shutdown), config, std::move(on_accept), std::move(on_shutdown), config,
std::move(memory_allocator_factory), poller, std::move(engine))) {} std::move(memory_allocator_factory), poller, std::move(engine))) {}
~PosixEngineListener() override { impl_->TriggerShutdown(); }; ~PosixEngineListener() override { ShutdownListeningFds(); };
absl::StatusOr<int> Bind( absl::StatusOr<int> Bind(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr) const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr)
override { override {
return impl_->Bind(addr); return impl_->Bind(addr, nullptr);
}
absl::StatusOr<int> BindWithFd(
const EventEngine::ResolvedAddress& addr,
PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd)
override {
return impl_->Bind(addr, std::move(on_bind_new_fd));
}
absl::Status HandleExternalConnection(int listener_fd, int fd,
SliceBuffer* pending_data) override {
return impl_->HandleExternalConnection(listener_fd, fd, pending_data);
} }
absl::Status Start() override { return impl_->Start(); } absl::Status Start() override { return impl_->Start(); }
void ShutdownListeningFds() override {
if (!shutdown_.exchange(true, std::memory_order_acq_rel)) {
impl_->TriggerShutdown();
}
}
private: private:
std::shared_ptr<PosixEngineListenerImpl> impl_; std::shared_ptr<PosixEngineListenerImpl> impl_;
// Set to true when the listener had been explicitly shutdown.
std::atomic<bool> shutdown_{false};
}; };
#else // GRPC_POSIX_SOCKET_TCP #else // GRPC_POSIX_SOCKET_TCP
@ -219,6 +252,25 @@ class PosixEngineListener
grpc_core::Crash( grpc_core::Crash(
"EventEngine::Listener::Start not supported on this platform"); "EventEngine::Listener::Start not supported on this platform");
} }
absl::StatusOr<int> BindWithFd(
const EventEngine::ResolvedAddress& /*addr*/,
PosixListenerWithFdSupport::OnPosixBindNewFdCallback
/*on_bind_new_fd*/) override {
grpc_core::Crash(
"PosixEngineListener::BindWithFd not supported on this "
"platform");
}
absl::Status HandleExternalConnection(
int /*listener_fd*/, int /*fd*/, SliceBuffer* /*pending_data*/) override {
grpc_core::Crash(
"PosixEngineListener::HandleExternalConnection not "
"supported on this platform");
}
void ShutdownListeningFds() override {
grpc_core::Crash(
"PosixEngineListener::ShutdownListeningFds not supported on "
"this platform");
}
}; };
#endif #endif

@ -2023,6 +2023,7 @@ src/core/lib/event_engine/forkable.h \
src/core/lib/event_engine/handle_containers.h \ src/core/lib/event_engine/handle_containers.h \
src/core/lib/event_engine/memory_allocator.cc \ src/core/lib/event_engine/memory_allocator.cc \
src/core/lib/event_engine/poller.h \ src/core/lib/event_engine/poller.h \
src/core/lib/event_engine/posix.h \
src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc \ src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc \
src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h \ src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h \
src/core/lib/event_engine/posix_engine/ev_poll_posix.cc \ src/core/lib/event_engine/posix_engine/ev_poll_posix.cc \

@ -1802,6 +1802,7 @@ src/core/lib/event_engine/forkable.h \
src/core/lib/event_engine/handle_containers.h \ src/core/lib/event_engine/handle_containers.h \
src/core/lib/event_engine/memory_allocator.cc \ src/core/lib/event_engine/memory_allocator.cc \
src/core/lib/event_engine/poller.h \ src/core/lib/event_engine/poller.h \
src/core/lib/event_engine/posix.h \
src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc \ src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc \
src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h \ src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h \
src/core/lib/event_engine/posix_engine/ev_poll_posix.cc \ src/core/lib/event_engine/posix_engine/ev_poll_posix.cc \

Loading…
Cancel
Save