diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 0a99fe13271..7904c19931e 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -778,6 +778,7 @@ libs: - src/core/lib/event_engine/forkable.h - src/core/lib/event_engine/handle_containers.h - src/core/lib/event_engine/poller.h + - src/core/lib/event_engine/posix.h - src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h - src/core/lib/event_engine/posix_engine/ev_poll_posix.h - src/core/lib/event_engine/posix_engine/event_poller.h @@ -2086,6 +2087,7 @@ libs: - src/core/lib/event_engine/forkable.h - src/core/lib/event_engine/handle_containers.h - src/core/lib/event_engine/poller.h + - src/core/lib/event_engine/posix.h - src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h - src/core/lib/event_engine/posix_engine/ev_poll_posix.h - src/core/lib/event_engine/posix_engine/event_poller.h @@ -3522,6 +3524,7 @@ libs: - src/core/lib/event_engine/forkable.h - src/core/lib/event_engine/handle_containers.h - src/core/lib/event_engine/poller.h + - src/core/lib/event_engine/posix.h - src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h - src/core/lib/event_engine/posix_engine/ev_poll_posix.h - src/core/lib/event_engine/posix_engine/event_poller.h @@ -7261,6 +7264,7 @@ targets: - src/core/lib/event_engine/forkable.h - src/core/lib/event_engine/handle_containers.h - src/core/lib/event_engine/poller.h + - src/core/lib/event_engine/posix.h - src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h - src/core/lib/event_engine/posix_engine/ev_poll_posix.h - src/core/lib/event_engine/posix_engine/event_poller.h diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 132dee3180c..cc887428a80 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -728,6 +728,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/forkable.h', 'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/poller.h', + 'src/core/lib/event_engine/posix.h', 'src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h', 'src/core/lib/event_engine/posix_engine/ev_poll_posix.h', 'src/core/lib/event_engine/posix_engine/event_poller.h', @@ -1640,6 +1641,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/forkable.h', 'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/poller.h', + 'src/core/lib/event_engine/posix.h', 'src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h', 'src/core/lib/event_engine/posix_engine/ev_poll_posix.h', 'src/core/lib/event_engine/posix_engine/event_poller.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 6eda95ee7dd..44ab0ce34c9 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1118,6 +1118,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/memory_allocator.cc', 'src/core/lib/event_engine/poller.h', + 'src/core/lib/event_engine/posix.h', 'src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc', 'src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h', 'src/core/lib/event_engine/posix_engine/ev_poll_posix.cc', @@ -2309,6 +2310,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/forkable.h', 'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/poller.h', + 'src/core/lib/event_engine/posix.h', 'src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h', 'src/core/lib/event_engine/posix_engine/ev_poll_posix.h', 'src/core/lib/event_engine/posix_engine/event_poller.h', diff --git a/grpc.gemspec b/grpc.gemspec index 09e8474ccaf..0459ad35a0c 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1029,6 +1029,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/event_engine/handle_containers.h ) s.files += %w( src/core/lib/event_engine/memory_allocator.cc ) s.files += %w( src/core/lib/event_engine/poller.h ) + s.files += %w( src/core/lib/event_engine/posix.h ) s.files += %w( src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc ) s.files += %w( src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h ) s.files += %w( src/core/lib/event_engine/posix_engine/ev_poll_posix.cc ) diff --git a/package.xml b/package.xml index 8715c0d645c..de42273f56b 100644 --- a/package.xml +++ b/package.xml @@ -1011,6 +1011,7 @@ + diff --git a/src/core/BUILD b/src/core/BUILD index 5492d45fab4..b84862cefcd 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1725,6 +1725,7 @@ grpc_cc_library( "iomgr_port", "load_file", "memory_quota", + "posix_event_engine_base_hdrs", "posix_event_engine_closure", "posix_event_engine_event_poller", "posix_event_engine_internal_errqueue", @@ -1840,6 +1841,7 @@ grpc_cc_library( deps = [ "event_engine_tcp_socket_utils", "iomgr_port", + "posix_event_engine_base_hdrs", "posix_event_engine_closure", "posix_event_engine_endpoint", "posix_event_engine_event_poller", @@ -1876,6 +1878,7 @@ grpc_cc_library( "event_engine_utils", "init_internally", "iomgr_port", + "posix_event_engine_base_hdrs", "posix_event_engine_closure", "posix_event_engine_endpoint", "posix_event_engine_event_poller", diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc index c0a7d704f02..cababfdfe7b 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.cc +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.cc @@ -1168,12 +1168,15 @@ void PosixEndpointImpl::Write( } } -void PosixEndpointImpl::MaybeShutdown(absl::Status why) { +void PosixEndpointImpl::MaybeShutdown( + absl::Status why, + absl::AnyInvocable)> on_release_fd) { if (poller_->CanTrackErrors()) { ZerocopyDisableAndWaitForRemaining(); stop_error_notification_.store(true, std::memory_order_release); handle_->SetHasError(); } + on_release_fd_ = std::move(on_release_fd); grpc_core::StatusSetInt(&why, grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE); handle_->ShutdownHandle(why); @@ -1181,7 +1184,13 @@ void PosixEndpointImpl::MaybeShutdown(absl::Status why) { } PosixEndpointImpl ::~PosixEndpointImpl() { - handle_->OrphanHandle(on_done_, nullptr, ""); + int release_fd = -1; + handle_->OrphanHandle(on_done_, + on_release_fd_ == nullptr ? nullptr : &release_fd, ""); + if (on_release_fd_ != nullptr) { + engine_->Run([on_release_fd = std::move(on_release_fd_), + release_fd]() mutable { on_release_fd(release_fd); }); + } delete on_read_; delete on_write_; delete on_error_; diff --git a/src/core/lib/event_engine/posix_engine/posix_endpoint.h b/src/core/lib/event_engine/posix_engine/posix_endpoint.h index 2d73af72e1e..644b3d43733 100644 --- a/src/core/lib/event_engine/posix_engine/posix_endpoint.h +++ b/src/core/lib/event_engine/posix_engine/posix_endpoint.h @@ -31,6 +31,7 @@ #include "absl/hash/hash.h" #include "absl/meta/type_traits.h" #include "absl/status/status.h" +#include "absl/status/statusor.h" #include #include @@ -38,6 +39,7 @@ #include #include +#include "src/core/lib/event_engine/posix.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" @@ -489,7 +491,11 @@ class PosixEndpointImpl : public grpc_core::RefCounted { return local_address_; } - void MaybeShutdown(absl::Status why); + int GetWrappedFd() { return fd_; } + + void MaybeShutdown( + absl::Status why, + absl::AnyInvocable release_fd)> on_release_fd); private: void UpdateRcvLowat() ABSL_EXCLUSIVE_LOCKS_REQUIRED(read_mu_); @@ -563,9 +569,11 @@ class PosixEndpointImpl : public grpc_core::RefCounted { void* outgoing_buffer_arg_ = nullptr; - // A counter which starts at 0. It is initialized the first time the socket - // options for collecting timestamps are set, and is incremented with each - // byte sent. + absl::AnyInvocable)> on_release_fd_ = nullptr; + + // A counter which starts at 0. It is initialized the first time the + // socket options for collecting timestamps are set, and is incremented + // with each byte sent. int bytes_counter_ = -1; // True if timestamping options are set on the socket. #ifdef GRPC_LINUX_ERRQUEUE @@ -587,8 +595,7 @@ class PosixEndpointImpl : public grpc_core::RefCounted { std::shared_ptr engine_; }; -class PosixEndpoint - : public grpc_event_engine::experimental::EventEngine::Endpoint { +class PosixEndpoint : public PosixEndpointWithFdSupport { public: PosixEndpoint( EventHandle* handle, PosixEngineClosure* on_shutdown, @@ -623,18 +630,31 @@ class PosixEndpoint return impl_->GetLocalAddress(); } + int GetWrappedFd() override { return impl_->GetWrappedFd(); } + + void Shutdown(absl::AnyInvocable release_fd)> + on_release_fd) override { + if (!shutdown_.exchange(true, std::memory_order_acq_rel)) { + impl_->MaybeShutdown(absl::FailedPreconditionError("Endpoint closing"), + std::move(on_release_fd)); + } + } + ~PosixEndpoint() override { - impl_->MaybeShutdown(absl::InternalError("Endpoint closing")); + if (!shutdown_.exchange(true, std::memory_order_acq_rel)) { + impl_->MaybeShutdown(absl::FailedPreconditionError("Endpoint closing"), + nullptr); + } } private: PosixEndpointImpl* impl_; + std::atomic shutdown_{false}; }; #else // GRPC_POSIX_SOCKET_TCP -class PosixEndpoint - : public grpc_event_engine::experimental::EventEngine::Endpoint { +class PosixEndpoint : public PosixEndpointWithFdSupport { public: PosixEndpoint() = default; @@ -663,6 +683,17 @@ class PosixEndpoint "PosixEndpoint::GetLocalAddress not supported on this platform"); } + int GetWrappedFd() override { + GPR_ASSERT(false && + "PosixEndpoint::GetWrappedFd not supported on this platform"); + } + + void Shutdown(absl::AnyInvocable release_fd)> + on_release_fd) override { + GPR_ASSERT(false && + "PosixEndpoint::Shutdown not supported on this platform"); + } + ~PosixEndpoint() override = default; }; diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc index 507da3326ff..d46ab7b7e88 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -32,11 +32,14 @@ #include #include +#include #include #include #include "src/core/lib/debug/trace.h" #include "src/core/lib/event_engine/poller.h" +#include "src/core/lib/event_engine/posix.h" +#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" #include "src/core/lib/event_engine/posix_engine/timer.h" #include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/event_engine/trace.h" @@ -549,12 +552,56 @@ EventEngine::ConnectionHandle PosixEventEngine::Connect( #endif // GRPC_POSIX_SOCKET_TCP } +std::unique_ptr +PosixEventEngine::CreatePosixEndpointFromFd(int fd, + const EndpointConfig& config, + MemoryAllocator memory_allocator) { +#ifdef GRPC_POSIX_SOCKET_TCP + GPR_DEBUG_ASSERT(fd > 0); + PosixEventPoller* poller = poller_manager_->Poller(); + GPR_DEBUG_ASSERT(poller != nullptr); + EventHandle* handle = + poller->CreateHandle(fd, "tcp-client", poller->CanTrackErrors()); + return CreatePosixEndpoint(handle, nullptr, shared_from_this(), + std::move(memory_allocator), + TcpOptionsFromEndpointConfig(config)); +#else // GRPC_POSIX_SOCKET_TCP + grpc_core::Crash( + "PosixEventEngine::CreatePosixEndpointFromFd is not supported on " + "this platform"); +#endif // GRPC_POSIX_SOCKET_TCP +} + absl::StatusOr> PosixEventEngine::CreateListener( Listener::AcceptCallback on_accept, absl::AnyInvocable on_shutdown, const EndpointConfig& config, std::unique_ptr memory_allocator_factory) { +#ifdef GRPC_POSIX_SOCKET_TCP + PosixEventEngineWithFdSupport::PosixAcceptCallback posix_on_accept = + [on_accept_cb = std::move(on_accept)]( + int /*listener_fd*/, std::unique_ptr ep, + bool /*is_external*/, MemoryAllocator allocator, + SliceBuffer* /*pending_data*/) mutable { + on_accept_cb(std::move(ep), std::move(allocator)); + }; + return std::make_unique( + std::move(posix_on_accept), std::move(on_shutdown), config, + std::move(memory_allocator_factory), poller_manager_->Poller(), + shared_from_this()); +#else // GRPC_POSIX_SOCKET_TCP + grpc_core::Crash( + "EventEngine::CreateListener is not supported on this platform"); +#endif // GRPC_POSIX_SOCKET_TCP +} + +absl::StatusOr> +PosixEventEngine::CreatePosixListener( + PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept, + absl::AnyInvocable on_shutdown, + const EndpointConfig& config, + std::unique_ptr memory_allocator_factory) { #ifdef GRPC_POSIX_SOCKET_TCP return std::make_unique( std::move(on_accept), std::move(on_shutdown), config, diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.h b/src/core/lib/event_engine/posix_engine/posix_engine.h index e42f377fc9f..4e64ae24b62 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.h +++ b/src/core/lib/event_engine/posix_engine/posix_engine.h @@ -35,6 +35,7 @@ #include #include "src/core/lib/event_engine/handle_containers.h" +#include "src/core/lib/event_engine/posix.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/timer_manager.h" #include "src/core/lib/event_engine/thread_pool.h" @@ -131,7 +132,7 @@ class PosixEnginePollerManager // All methods require an ExecCtx to already exist on the thread's stack. // TODO(ctiller): KeepsGrpcInitialized is an interim measure to ensure that // event engine is shut down before we shut down iomgr. -class PosixEventEngine final : public EventEngine, +class PosixEventEngine final : public PosixEventEngineWithFdSupport, public grpc_core::KeepsGrpcInitialized { public: class PosixDNSResolver : public EventEngine::DNSResolver { @@ -163,6 +164,10 @@ class PosixEventEngine final : public EventEngine, ~PosixEventEngine() override; + std::unique_ptr CreatePosixEndpointFromFd( + int fd, const EndpointConfig& config, + MemoryAllocator memory_allocator) override; + absl::StatusOr> CreateListener( Listener::AcceptCallback on_accept, absl::AnyInvocable on_shutdown, @@ -170,6 +175,14 @@ class PosixEventEngine final : public EventEngine, std::unique_ptr memory_allocator_factory) override; + absl::StatusOr> + CreatePosixListener( + PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept, + absl::AnyInvocable on_shutdown, + const EndpointConfig& config, + std::unique_ptr memory_allocator_factory) + override; + ConnectionHandle Connect(OnConnectCallback on_connect, const ResolvedAddress& addr, const EndpointConfig& args, diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc index 6109922d3f1..7d982c8ae8f 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc @@ -14,6 +14,7 @@ #include +#include "src/core/lib/event_engine/posix.h" #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET_TCP @@ -46,7 +47,7 @@ namespace grpc_event_engine { namespace experimental { PosixEngineListenerImpl::PosixEngineListenerImpl( - EventEngine::Listener::AcceptCallback on_accept, + PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept, absl::AnyInvocable on_shutdown, const grpc_event_engine::experimental::EndpointConfig& config, std::unique_ptr @@ -61,7 +62,8 @@ PosixEngineListenerImpl::PosixEngineListenerImpl( memory_allocator_factory_(std::move(memory_allocator_factory)) {} absl::StatusOr PosixEngineListenerImpl::Bind( - const EventEngine::ResolvedAddress& addr) { + const EventEngine::ResolvedAddress& addr, + PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd) { EventEngine::ResolvedAddress res_addr = addr; EventEngine::ResolvedAddress addr6_v4mapped; int requested_port = ResolvedAddressGetPort(res_addr); @@ -89,6 +91,9 @@ absl::StatusOr PosixEngineListenerImpl::Bind( } auto used_port = ResolvedAddressIsWildcard(res_addr); + // Update the callback. Any subsequent new sockets created and added to + // acceptors_ in this function will invoke the new callback. + acceptors_.UpdateOnAppendCallback(std::move(on_bind_new_fd)); if (used_port.has_value()) { requested_port = *used_port; return ListenerContainerAddWildcardAddresses(acceptors_, options_, @@ -183,13 +188,52 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept( // Call on_accept_ and then resume accepting new connections by continuing // the parent for-loop. listener_->on_accept_( - std::move(endpoint), + /*listener_fd=*/handle_->WrappedFd(), /*endpoint=*/std::move(endpoint), + /*is_external=*/false, + /*memory_allocator=*/ listener_->memory_allocator_factory_->CreateMemoryAllocator( - absl::StrCat("on-accept-tcp-server-connection: ", peer_name))); + absl::StrCat("on-accept-tcp-server-connection: ", peer_name)), + /*pending_data=*/nullptr); } GPR_UNREACHABLE_CODE(return); } +absl::Status PosixEngineListenerImpl::HandleExternalConnection( + int listener_fd, int fd, SliceBuffer* pending_data) { + if (listener_fd < 0) { + return absl::UnknownError(absl::StrCat( + "HandleExternalConnection: Invalid listener socket: ", listener_fd)); + } + if (fd < 0) { + return absl::UnknownError( + absl::StrCat("HandleExternalConnection: Invalid peer socket: ", fd)); + } + PosixSocketWrapper sock(fd); + (void)sock.SetSocketNoSigpipeIfPossible(); + auto peer_name = sock.PeerAddressString(); + if (!peer_name.ok()) { + return absl::UnknownError( + absl::StrCat("HandleExternalConnection: peer not connected: ", + peer_name.status().ToString())); + } + auto endpoint = CreatePosixEndpoint( + /*handle=*/poller_->CreateHandle(fd, *peer_name, + poller_->CanTrackErrors()), + /*on_shutdown=*/nullptr, /*engine=*/engine_, + /*allocator=*/ + memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat( + "external:endpoint-tcp-server-connection: ", *peer_name)), + /*options=*/options_); + on_accept_( + /*listener_fd=*/listener_fd, /*endpoint=*/std::move(endpoint), + /*is_external=*/true, + /*memory_allocator=*/ + memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat( + "external:on-accept-tcp-server-connection: ", *peer_name)), + /*pending_data=*/pending_data); + return absl::OkStatus(); +} + void PosixEngineListenerImpl::AsyncConnectionAcceptor::Shutdown() { // The ShutdownHandle whould trigger any waiting notify_on_accept_ to get // scheduled with the not-OK status. diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener.h b/src/core/lib/event_engine/posix_engine/posix_engine_listener.h index 33a7a179011..02514efb875 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine_listener.h +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener.h @@ -33,7 +33,9 @@ #include #include #include +#include +#include "src/core/lib/event_engine/posix.h" #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET_TCP @@ -52,7 +54,7 @@ class PosixEngineListenerImpl : public std::enable_shared_from_this { public: PosixEngineListenerImpl( - EventEngine::Listener::AcceptCallback on_accept, + PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept, absl::AnyInvocable on_shutdown, const grpc_event_engine::experimental::EndpointConfig& config, std::unique_ptr @@ -60,12 +62,17 @@ class PosixEngineListenerImpl PosixEventPoller* poller, std::shared_ptr engine); // Binds an address to the listener. This creates a ListenerSocket // and sets its fields appropriately. - absl::StatusOr Bind(const EventEngine::ResolvedAddress& addr); + absl::StatusOr Bind( + const EventEngine::ResolvedAddress& addr, + PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd); // Signals event manager to listen for connections on all created sockets. absl::Status Start(); // Trigger graceful shutdown of all asynchronous accept operations. void TriggerShutdown(); + absl::Status HandleExternalConnection(int listener_fd, int fd, + SliceBuffer* pending_data); + ~PosixEngineListenerImpl(); private: @@ -119,9 +126,18 @@ class PosixEngineListenerImpl public: explicit ListenerAsyncAcceptors(PosixEngineListenerImpl* listener) : listener_(listener){}; + + void UpdateOnAppendCallback( + PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_append) { + on_append_ = std::move(on_append); + } + void Append(ListenerSocket socket) override { acceptors_.push_back(new AsyncConnectionAcceptor( listener_->engine_, listener_->shared_from_this(), socket)); + if (on_append_) { + on_append_(socket.sock.Fd()); + } } absl::StatusOr Find( @@ -148,6 +164,7 @@ class PosixEngineListenerImpl } private: + PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_append_; std::list acceptors_; PosixEngineListenerImpl* listener_; }; @@ -163,7 +180,7 @@ class PosixEngineListenerImpl // operation. ListenerAsyncAcceptors acceptors_ ABSL_GUARDED_BY(mu_); // Callback to be invoked upon accepting a connection. - EventEngine::Listener::AcceptCallback on_accept_; + PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept_; // Callback to be invoked upon shutdown of listener. absl::AnyInvocable on_shutdown_; // Set to true when the listener has started listening for new connections. @@ -175,12 +192,10 @@ class PosixEngineListenerImpl memory_allocator_factory_; }; -class PosixEngineListener - : public grpc_event_engine::experimental::EventEngine::Listener { +class PosixEngineListener : public PosixListenerWithFdSupport { public: PosixEngineListener( - grpc_event_engine::experimental::EventEngine::Listener::AcceptCallback - on_accept, + PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept, absl::AnyInvocable on_shutdown, const grpc_event_engine::experimental::EndpointConfig& config, std::unique_ptr @@ -189,16 +204,34 @@ class PosixEngineListener : impl_(std::make_shared( std::move(on_accept), std::move(on_shutdown), config, std::move(memory_allocator_factory), poller, std::move(engine))) {} - ~PosixEngineListener() override { impl_->TriggerShutdown(); }; + ~PosixEngineListener() override { ShutdownListeningFds(); }; absl::StatusOr Bind( const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr) override { - return impl_->Bind(addr); + return impl_->Bind(addr, nullptr); + } + absl::StatusOr BindWithFd( + const EventEngine::ResolvedAddress& addr, + PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd) + override { + return impl_->Bind(addr, std::move(on_bind_new_fd)); + } + absl::Status HandleExternalConnection(int listener_fd, int fd, + SliceBuffer* pending_data) override { + return impl_->HandleExternalConnection(listener_fd, fd, pending_data); } absl::Status Start() override { return impl_->Start(); } + void ShutdownListeningFds() override { + if (!shutdown_.exchange(true, std::memory_order_acq_rel)) { + impl_->TriggerShutdown(); + } + } + private: std::shared_ptr impl_; + // Set to true when the listener had been explicitly shutdown. + std::atomic shutdown_{false}; }; #else // GRPC_POSIX_SOCKET_TCP @@ -219,6 +252,25 @@ class PosixEngineListener grpc_core::Crash( "EventEngine::Listener::Start not supported on this platform"); } + absl::StatusOr BindWithFd( + const EventEngine::ResolvedAddress& /*addr*/, + PosixListenerWithFdSupport::OnPosixBindNewFdCallback + /*on_bind_new_fd*/) override { + grpc_core::Crash( + "PosixEngineListener::BindWithFd not supported on this " + "platform"); + } + absl::Status HandleExternalConnection( + int /*listener_fd*/, int /*fd*/, SliceBuffer* /*pending_data*/) override { + grpc_core::Crash( + "PosixEngineListener::HandleExternalConnection not " + "supported on this platform"); + } + void ShutdownListeningFds() override { + grpc_core::Crash( + "PosixEngineListener::ShutdownListeningFds not supported on " + "this platform"); + } }; #endif diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 2d2d9e81874..ac2ee780040 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2023,6 +2023,7 @@ src/core/lib/event_engine/forkable.h \ src/core/lib/event_engine/handle_containers.h \ src/core/lib/event_engine/memory_allocator.cc \ src/core/lib/event_engine/poller.h \ +src/core/lib/event_engine/posix.h \ src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc \ src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h \ src/core/lib/event_engine/posix_engine/ev_poll_posix.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 54c80526384..ae892fe586d 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1802,6 +1802,7 @@ src/core/lib/event_engine/forkable.h \ src/core/lib/event_engine/handle_containers.h \ src/core/lib/event_engine/memory_allocator.cc \ src/core/lib/event_engine/poller.h \ +src/core/lib/event_engine/posix.h \ src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc \ src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h \ src/core/lib/event_engine/posix_engine/ev_poll_posix.cc \