diff --git a/src/core/lib/event_engine/extensions/supports_fd.h b/src/core/lib/event_engine/extensions/supports_fd.h index c7a1bfdcd4f..070bce175de 100644 --- a/src/core/lib/event_engine/extensions/supports_fd.h +++ b/src/core/lib/event_engine/extensions/supports_fd.h @@ -119,6 +119,23 @@ class EventEngineSupportsFdExtension { virtual std::unique_ptr CreateEndpointFromFd( int fd, const EndpointConfig& config) = 0; + /// Creates an EventEngine::Endpoint from a file descriptor that is configured + /// and bound locally but not yet connected to a remote peer. Returns a + /// connection handle to cancel the connection attempt if needed. Created + /// endpoint will be returned through `on_connect` callback. + /// \a fd - The socket file descriptor. + /// \a on_connect - The callback to invoke once fd is connected to peer. + /// \a addr - The remote peer to connect to. This should be the mapped peer + /// address returned when creating a new socket. + /// \a config - Additional configuration to be applied to the endpoint. + /// \a memory_allocator - The endpoint may use the provided memory allocator + /// to track memory allocations. + /// \a timeout - The timeout to use for the connection attempt. + virtual EventEngine::ConnectionHandle CreateEndpointFromUnconnectedFd( + int fd, EventEngine::OnConnectCallback on_connect, + const EventEngine::ResolvedAddress& addr, const EndpointConfig& config, + MemoryAllocator memory_allocator, EventEngine::Duration timeout) = 0; + /// Called when the posix listener has accepted a new client connection. /// \a listener_fd - The listening socket fd that accepted the new client /// connection. diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc index f2275975b0d..1c7f6df10b1 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -238,14 +238,16 @@ void AsyncConnect::OnWritable(absl::Status status) } } -EventEngine::ConnectionHandle PosixEventEngine::ConnectInternal( - PosixSocketWrapper sock, OnConnectCallback on_connect, ResolvedAddress addr, - MemoryAllocator&& allocator, const PosixTcpOptions& options, - Duration timeout) { +EventEngine::ConnectionHandle +PosixEventEngine::CreateEndpointFromUnconnectedFdInternal( + int fd, EventEngine::OnConnectCallback on_connect, + const EventEngine::ResolvedAddress& addr, + const PosixTcpOptions& tcp_options, MemoryAllocator memory_allocator, + EventEngine::Duration timeout) { int err; int connect_errno; do { - err = connect(sock.Fd(), addr.address(), addr.size()); + err = connect(fd, addr.address(), addr.size()); } while (err < 0 && errno == EINTR); connect_errno = (err < 0) ? errno : 0; @@ -261,16 +263,15 @@ EventEngine::ConnectionHandle PosixEventEngine::ConnectInternal( std::string name = absl::StrCat("tcp-client:", addr_uri.value()); PosixEventPoller* poller = poller_manager_->Poller(); EventHandle* handle = - poller->CreateHandle(sock.Fd(), name, poller->CanTrackErrors()); + poller->CreateHandle(fd, name, poller->CanTrackErrors()); if (connect_errno == 0) { - // Connection already succeded. Return 0 to discourage any cancellation + // Connection already succeeded. Return 0 to discourage any cancellation // attempts. Run([on_connect = std::move(on_connect), - ep = CreatePosixEndpoint(handle, nullptr, shared_from_this(), - std::move(allocator), options)]() mutable { - on_connect(std::move(ep)); - }); + ep = CreatePosixEndpoint( + handle, nullptr, shared_from_this(), std::move(memory_allocator), + tcp_options)]() mutable { on_connect(std::move(ep)); }); return EventEngine::ConnectionHandle::kInvalid; } if (connect_errno != EWOULDBLOCK && connect_errno != EINPROGRESS) { @@ -288,9 +289,10 @@ EventEngine::ConnectionHandle PosixEventEngine::ConnectInternal( // Connection is still in progress. int64_t connection_id = last_connection_id_.fetch_add(1, std::memory_order_acq_rel); - AsyncConnect* ac = new AsyncConnect( - std::move(on_connect), shared_from_this(), executor_.get(), handle, - std::move(allocator), options, addr_uri.value(), connection_id); + AsyncConnect* ac = + new AsyncConnect(std::move(on_connect), shared_from_this(), + executor_.get(), handle, std::move(memory_allocator), + tcp_options, addr_uri.value(), connection_id); int shard_number = connection_id % connection_shards_.size(); struct ConnectionShard* shard = &connection_shards_[shard_number]; { @@ -635,14 +637,29 @@ EventEngine::ConnectionHandle PosixEventEngine::Connect( status = socket.status()]() mutable { on_connect(status); }); return EventEngine::ConnectionHandle::kInvalid; } - return ConnectInternal((*socket).sock, std::move(on_connect), - (*socket).mapped_target_addr, - std::move(memory_allocator), options, timeout); + return CreateEndpointFromUnconnectedFdInternal( + (*socket).sock.Fd(), std::move(on_connect), (*socket).mapped_target_addr, + options, std::move(memory_allocator), timeout); #else // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING grpc_core::Crash("EventEngine::Connect is not supported on this platform"); #endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING } +EventEngine::ConnectionHandle PosixEventEngine::CreateEndpointFromUnconnectedFd( + int fd, EventEngine::OnConnectCallback on_connect, + const EventEngine::ResolvedAddress& addr, const EndpointConfig& config, + MemoryAllocator memory_allocator, EventEngine::Duration timeout) { +#if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING + return CreateEndpointFromUnconnectedFdInternal( + fd, std::move(on_connect), addr, TcpOptionsFromEndpointConfig(config), + std::move(memory_allocator), timeout); +#else // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING + grpc_core::Crash( + "EventEngine::CreateEndpointFromUnconnectedFd is not supported on this " + "platform"); +#endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING +} + std::unique_ptr PosixEventEngine::CreatePosixEndpointFromFd(int fd, const EndpointConfig& config, diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.h b/src/core/lib/event_engine/posix_engine/posix_engine.h index 82be0bdd0e5..63a142a11f2 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.h +++ b/src/core/lib/event_engine/posix_engine/posix_engine.h @@ -174,6 +174,11 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport, std::unique_ptr CreateEndpointFromFd( int fd, const EndpointConfig& config) override; + ConnectionHandle CreateEndpointFromUnconnectedFd( + int fd, EventEngine::OnConnectCallback on_connect, + const EventEngine::ResolvedAddress& addr, const EndpointConfig& config, + MemoryAllocator memory_allocator, EventEngine::Duration timeout) override; + absl::StatusOr> CreateListener( Listener::AcceptCallback on_accept, absl::AnyInvocable on_shutdown, @@ -235,12 +240,10 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport, static void PollerWorkInternal( std::shared_ptr poller_manager); - ConnectionHandle ConnectInternal( - grpc_event_engine::experimental::PosixSocketWrapper sock, - OnConnectCallback on_connect, ResolvedAddress addr, - MemoryAllocator&& allocator, - const grpc_event_engine::experimental::PosixTcpOptions& options, - Duration timeout); + ConnectionHandle CreateEndpointFromUnconnectedFdInternal( + int fd, EventEngine::OnConnectCallback on_connect, + const EventEngine::ResolvedAddress& addr, const PosixTcpOptions& options, + MemoryAllocator memory_allocator, EventEngine::Duration timeout); void OnConnectFinishInternal(int connection_handle);