Add new function to EventEngineSupportsFdExtension query extension that takes in a configured and bound but unconnected fd, connects to the given remote peer, and returns a resulting EventEngine::Endpoint.

PiperOrigin-RevId: 683229732
pull/37298/head
Alisha Nanda 4 months ago committed by Copybara-Service
parent dbb5164ac7
commit 64b7fce68c
  1. 17
      src/core/lib/event_engine/extensions/supports_fd.h
  2. 51
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  3. 15
      src/core/lib/event_engine/posix_engine/posix_engine.h

@ -119,6 +119,23 @@ class EventEngineSupportsFdExtension {
virtual std::unique_ptr<EventEngine::Endpoint> CreateEndpointFromFd(
int fd, const EndpointConfig& config) = 0;
/// Creates an EventEngine::Endpoint from a file descriptor that is configured
/// and bound locally but not yet connected to a remote peer. Returns a
/// connection handle to cancel the connection attempt if needed. Created
/// endpoint will be returned through `on_connect` callback.
/// \a fd - The socket file descriptor.
/// \a on_connect - The callback to invoke once fd is connected to peer.
/// \a addr - The remote peer to connect to. This should be the mapped peer
/// address returned when creating a new socket.
/// \a config - Additional configuration to be applied to the endpoint.
/// \a memory_allocator - The endpoint may use the provided memory allocator
/// to track memory allocations.
/// \a timeout - The timeout to use for the connection attempt.
virtual EventEngine::ConnectionHandle CreateEndpointFromUnconnectedFd(
int fd, EventEngine::OnConnectCallback on_connect,
const EventEngine::ResolvedAddress& addr, const EndpointConfig& config,
MemoryAllocator memory_allocator, EventEngine::Duration timeout) = 0;
/// Called when the posix listener has accepted a new client connection.
/// \a listener_fd - The listening socket fd that accepted the new client
/// connection.

@ -238,14 +238,16 @@ void AsyncConnect::OnWritable(absl::Status status)
}
}
EventEngine::ConnectionHandle PosixEventEngine::ConnectInternal(
PosixSocketWrapper sock, OnConnectCallback on_connect, ResolvedAddress addr,
MemoryAllocator&& allocator, const PosixTcpOptions& options,
Duration timeout) {
EventEngine::ConnectionHandle
PosixEventEngine::CreateEndpointFromUnconnectedFdInternal(
int fd, EventEngine::OnConnectCallback on_connect,
const EventEngine::ResolvedAddress& addr,
const PosixTcpOptions& tcp_options, MemoryAllocator memory_allocator,
EventEngine::Duration timeout) {
int err;
int connect_errno;
do {
err = connect(sock.Fd(), addr.address(), addr.size());
err = connect(fd, addr.address(), addr.size());
} while (err < 0 && errno == EINTR);
connect_errno = (err < 0) ? errno : 0;
@ -261,16 +263,15 @@ EventEngine::ConnectionHandle PosixEventEngine::ConnectInternal(
std::string name = absl::StrCat("tcp-client:", addr_uri.value());
PosixEventPoller* poller = poller_manager_->Poller();
EventHandle* handle =
poller->CreateHandle(sock.Fd(), name, poller->CanTrackErrors());
poller->CreateHandle(fd, name, poller->CanTrackErrors());
if (connect_errno == 0) {
// Connection already succeded. Return 0 to discourage any cancellation
// Connection already succeeded. Return 0 to discourage any cancellation
// attempts.
Run([on_connect = std::move(on_connect),
ep = CreatePosixEndpoint(handle, nullptr, shared_from_this(),
std::move(allocator), options)]() mutable {
on_connect(std::move(ep));
});
ep = CreatePosixEndpoint(
handle, nullptr, shared_from_this(), std::move(memory_allocator),
tcp_options)]() mutable { on_connect(std::move(ep)); });
return EventEngine::ConnectionHandle::kInvalid;
}
if (connect_errno != EWOULDBLOCK && connect_errno != EINPROGRESS) {
@ -288,9 +289,10 @@ EventEngine::ConnectionHandle PosixEventEngine::ConnectInternal(
// Connection is still in progress.
int64_t connection_id =
last_connection_id_.fetch_add(1, std::memory_order_acq_rel);
AsyncConnect* ac = new AsyncConnect(
std::move(on_connect), shared_from_this(), executor_.get(), handle,
std::move(allocator), options, addr_uri.value(), connection_id);
AsyncConnect* ac =
new AsyncConnect(std::move(on_connect), shared_from_this(),
executor_.get(), handle, std::move(memory_allocator),
tcp_options, addr_uri.value(), connection_id);
int shard_number = connection_id % connection_shards_.size();
struct ConnectionShard* shard = &connection_shards_[shard_number];
{
@ -635,14 +637,29 @@ EventEngine::ConnectionHandle PosixEventEngine::Connect(
status = socket.status()]() mutable { on_connect(status); });
return EventEngine::ConnectionHandle::kInvalid;
}
return ConnectInternal((*socket).sock, std::move(on_connect),
(*socket).mapped_target_addr,
std::move(memory_allocator), options, timeout);
return CreateEndpointFromUnconnectedFdInternal(
(*socket).sock.Fd(), std::move(on_connect), (*socket).mapped_target_addr,
options, std::move(memory_allocator), timeout);
#else // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
grpc_core::Crash("EventEngine::Connect is not supported on this platform");
#endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
}
EventEngine::ConnectionHandle PosixEventEngine::CreateEndpointFromUnconnectedFd(
int fd, EventEngine::OnConnectCallback on_connect,
const EventEngine::ResolvedAddress& addr, const EndpointConfig& config,
MemoryAllocator memory_allocator, EventEngine::Duration timeout) {
#if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
return CreateEndpointFromUnconnectedFdInternal(
fd, std::move(on_connect), addr, TcpOptionsFromEndpointConfig(config),
std::move(memory_allocator), timeout);
#else // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
grpc_core::Crash(
"EventEngine::CreateEndpointFromUnconnectedFd is not supported on this "
"platform");
#endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
}
std::unique_ptr<EventEngine::Endpoint>
PosixEventEngine::CreatePosixEndpointFromFd(int fd,
const EndpointConfig& config,

@ -174,6 +174,11 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport,
std::unique_ptr<EventEngine::Endpoint> CreateEndpointFromFd(
int fd, const EndpointConfig& config) override;
ConnectionHandle CreateEndpointFromUnconnectedFd(
int fd, EventEngine::OnConnectCallback on_connect,
const EventEngine::ResolvedAddress& addr, const EndpointConfig& config,
MemoryAllocator memory_allocator, EventEngine::Duration timeout) override;
absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
@ -235,12 +240,10 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport,
static void PollerWorkInternal(
std::shared_ptr<PosixEnginePollerManager> poller_manager);
ConnectionHandle ConnectInternal(
grpc_event_engine::experimental::PosixSocketWrapper sock,
OnConnectCallback on_connect, ResolvedAddress addr,
MemoryAllocator&& allocator,
const grpc_event_engine::experimental::PosixTcpOptions& options,
Duration timeout);
ConnectionHandle CreateEndpointFromUnconnectedFdInternal(
int fd, EventEngine::OnConnectCallback on_connect,
const EventEngine::ResolvedAddress& addr, const PosixTcpOptions& options,
MemoryAllocator memory_allocator, EventEngine::Duration timeout);
void OnConnectFinishInternal(int connection_handle);

Loading…
Cancel
Save