Util functions to help with posix event engine listener implementation (#31435)

* Util functions to help with posix engine listener implementation

* sanity

* update comments in posix_engine_listener_utils.h

* review comments

* iwyu

* revert prev commit

* iwyu

* update build

* regenerate projects

* minor fixes

* comments
pull/31536/head
Vignesh Babu 2 years ago committed by GitHub
parent 107beb7435
commit 7eb5fb5a25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 43
      CMakeLists.txt
  2. 19
      build_autogenerated.yaml
  3. 24
      src/core/BUILD
  4. 375
      src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
  5. 91
      src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h
  6. 124
      src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
  7. 34
      src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
  8. 18
      test/core/event_engine/posix/BUILD
  9. 162
      test/core/event_engine/posix/posix_engine_listener_utils_test.cc
  10. 20
      test/core/event_engine/posix/tcp_posix_socket_utils_test.cc
  11. 22
      tools/run_tests/generated/tests.json

43
CMakeLists.txt generated

@ -1071,6 +1071,9 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx posix_endpoint_test)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx posix_engine_listener_utils_test)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx posix_event_engine_test)
endif()
@ -14870,6 +14873,46 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
)
endif()
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(posix_engine_listener_utils_test
src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
src/core/lib/iomgr/socket_mutator.cc
test/core/event_engine/posix/posix_engine_listener_utils_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(posix_engine_listener_utils_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(posix_engine_listener_utils_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)

@ -8633,6 +8633,25 @@ targets:
- linux
- posix
- mac
- name: posix_engine_listener_utils_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h
- src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
- src/core/lib/iomgr/socket_mutator.h
src:
- src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
- src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
- src/core/lib/iomgr/socket_mutator.cc
- test/core/event_engine/posix/posix_engine_listener_utils_test.cc
deps:
- grpc_test_util
platforms:
- linux
- posix
- mac
- name: posix_event_engine_test
gtest: true
build: test

@ -1703,6 +1703,30 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "posix_event_engine_listener_utils",
srcs = [
"lib/event_engine/posix_engine/posix_engine_listener_utils.cc",
],
hdrs = [
"lib/event_engine/posix_engine/posix_engine_listener_utils.h",
],
external_deps = [
"absl/cleanup",
"absl/status",
"absl/status:statusor",
"absl/strings",
],
deps = [
"iomgr_port",
"posix_event_engine_tcp_socket_utils",
"socket_mutator",
"status_helper",
"//:event_engine_base_hdrs",
"//:gpr",
],
)
grpc_cc_library(
name = "posix_event_engine",
srcs = ["lib/event_engine/posix_engine/posix_engine.cc"],

@ -0,0 +1,375 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h"
#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <cstring>
#include <string>
#include "absl/cleanup/cleanup.h"
#include "absl/status/status.h"
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/iomgr/socket_mutator.h"
#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
#include <errno.h> // IWYU pragma: keep
#include <ifaddrs.h> // IWYU pragma: keep
#include <netinet/in.h> // IWYU pragma: keep
#include <sys/socket.h> // IWYU pragma: keep
#include "absl/strings/str_cat.h"
#endif
namespace grpc_event_engine {
namespace posix_engine {
#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
namespace {
using ResolvedAddress =
grpc_event_engine::experimental::EventEngine::ResolvedAddress;
using ListenerSocket = ListenerSocketsContainer::ListenerSocket;
#ifdef GRPC_HAVE_IFADDRS
// Bind to "::" to get a port number not used by any address.
absl::StatusOr<int> GetUnusedPort() {
ResolvedAddress wild = SockaddrMakeWild6(0);
PosixSocketWrapper::DSMode dsmode;
auto sock = PosixSocketWrapper::CreateDualStackSocket(nullptr, wild,
SOCK_STREAM, 0, dsmode);
GRPC_RETURN_IF_ERROR(sock.status());
if (dsmode == PosixSocketWrapper::DSMode::DSMODE_IPV4) {
wild = SockaddrMakeWild4(0);
}
if (bind(sock->Fd(), wild.address(), wild.size()) != 0) {
close(sock->Fd());
return absl::FailedPreconditionError(
absl::StrCat("bind(GetUnusedPort): ", std::strerror(errno)));
}
socklen_t len = wild.size();
if (getsockname(sock->Fd(), const_cast<sockaddr*>(wild.address()), &len) !=
0) {
close(sock->Fd());
return absl::FailedPreconditionError(
absl::StrCat("getsockname(GetUnusedPort): ", std::strerror(errno)));
}
close(sock->Fd());
int port = SockaddrGetPort(wild);
if (port <= 0) {
return absl::FailedPreconditionError("Bad port");
}
return port;
}
bool SystemHasIfAddrs() { return true; }
#else // GRPC_HAVE_IFADDRS
bool SystemHasIfAddrs() { return false; }
#endif // GRPC_HAVE_IFADDRS
// get max listen queue size on linux
int InitMaxAcceptQueueSize() {
int n = SOMAXCONN;
char buf[64];
FILE* fp = fopen("/proc/sys/net/core/somaxconn", "r");
int max_accept_queue_size;
if (fp == nullptr) {
// 2.4 kernel.
return SOMAXCONN;
}
if (fgets(buf, sizeof buf, fp)) {
char* end;
long i = strtol(buf, &end, 10);
if (i > 0 && i <= INT_MAX && end && *end == '\n') {
n = static_cast<int>(i);
}
}
fclose(fp);
max_accept_queue_size = n;
if (max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) {
gpr_log(GPR_INFO,
"Suspiciously small accept queue (%d) will probably lead to "
"connection drops",
max_accept_queue_size);
}
return max_accept_queue_size;
}
int GetMaxAcceptQueueSize() {
static const int kMaxAcceptQueueSize = InitMaxAcceptQueueSize();
return kMaxAcceptQueueSize;
}
// Prepare a recently-created socket for listening.
absl::Status PrepareSocket(const PosixTcpOptions& options,
ListenerSocket& socket) {
ResolvedAddress sockname_temp;
absl::Status error;
int fd = socket.sock.Fd();
GPR_ASSERT(fd >= 0);
bool close_fd = true;
socket.zero_copy_enabled = false;
socket.port = 0;
auto sock_cleanup = absl::MakeCleanup([&close_fd, fd]() -> void {
if (close_fd && fd >= 0) {
close(fd);
}
});
if (PosixSocketWrapper::IsSocketReusePortSupported() &&
options.allow_reuse_port && socket.addr.address()->sa_family != AF_UNIX) {
GRPC_RETURN_IF_ERROR(socket.sock.SetSocketReusePort(1));
}
#ifdef GRPC_LINUX_ERRQUEUE
if (!socket.sock.SetSocketZeroCopy().ok()) {
// it's not fatal, so just log it.
gpr_log(GPR_DEBUG, "Node does not support SO_ZEROCOPY, continuing.");
} else {
socket.zero_copy_enabled = true;
}
#endif
GRPC_RETURN_IF_ERROR(socket.sock.SetSocketNonBlocking(1));
GRPC_RETURN_IF_ERROR(socket.sock.SetSocketCloexec(1));
if (socket.addr.address()->sa_family != AF_UNIX) {
GRPC_RETURN_IF_ERROR(socket.sock.SetSocketLowLatency(1));
GRPC_RETURN_IF_ERROR(socket.sock.SetSocketReuseAddr(1));
socket.sock.TrySetSocketTcpUserTimeout(options, false);
}
GRPC_RETURN_IF_ERROR(socket.sock.SetSocketNoSigpipeIfPossible());
GRPC_RETURN_IF_ERROR(socket.sock.ApplySocketMutatorInOptions(
GRPC_FD_SERVER_LISTENER_USAGE, options));
if (bind(fd, socket.addr.address(), socket.addr.size()) < 0) {
return absl::FailedPreconditionError(
absl::StrCat("Error in bind: ", std::strerror(errno)));
}
if (listen(fd, GetMaxAcceptQueueSize()) < 0) {
return absl::FailedPreconditionError(
absl::StrCat("Error in listen: ", std::strerror(errno)));
}
socklen_t len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
if (getsockname(fd, const_cast<sockaddr*>(sockname_temp.address()), &len) <
0) {
return absl::FailedPreconditionError(
absl::StrCat("Error in getsockname: ", std::strerror(errno)));
}
socket.port = SockaddrGetPort(ResolvedAddress(sockname_temp.address(), len));
// No errors. Set close_fd to false to ensure the socket is not closed.
close_fd = false;
return absl::OkStatus();
}
} // namespace
absl::StatusOr<ListenerSocket> CreateAndPrepareListenerSocket(
const PosixTcpOptions& options, const ResolvedAddress& addr) {
ResolvedAddress addr4_copy;
ListenerSocket socket;
auto result = PosixSocketWrapper::CreateDualStackSocket(
nullptr, addr, SOCK_STREAM, 0, socket.dsmode);
if (!result.ok()) {
return result.status();
}
socket.sock = *result;
if (socket.dsmode == PosixSocketWrapper::DSMODE_IPV4 &&
SockaddrIsV4Mapped(&addr, &addr4_copy)) {
socket.addr = addr4_copy;
} else {
socket.addr = addr;
}
GRPC_RETURN_IF_ERROR(PrepareSocket(options, socket));
GPR_ASSERT(socket.port > 0);
return socket;
}
absl::StatusOr<int> ListenerContainerAddAllLocalAddresses(
ListenerSocketsContainer& listener_sockets, const PosixTcpOptions& options,
int requested_port) {
#ifdef GRPC_HAVE_IFADDRS
absl::Status op_status = absl::OkStatus();
struct ifaddrs* ifa = nullptr;
struct ifaddrs* ifa_it;
bool no_local_addresses = true;
int assigned_port = 0;
if (requested_port == 0) {
auto result = GetUnusedPort();
GRPC_RETURN_IF_ERROR(result.status());
requested_port = *result;
gpr_log(GPR_DEBUG, "Picked unused port %d", requested_port);
}
if (getifaddrs(&ifa) != 0 || ifa == nullptr) {
return absl::FailedPreconditionError(
absl::StrCat("getifaddrs: ", std::strerror(errno)));
}
for (ifa_it = ifa; ifa_it != nullptr; ifa_it = ifa_it->ifa_next) {
ResolvedAddress addr;
socklen_t len;
const char* ifa_name = (ifa_it->ifa_name ? ifa_it->ifa_name : "<unknown>");
if (ifa_it->ifa_addr == nullptr) {
continue;
} else if (ifa_it->ifa_addr->sa_family == AF_INET) {
len = static_cast<socklen_t>(sizeof(sockaddr_in));
} else if (ifa_it->ifa_addr->sa_family == AF_INET6) {
len = static_cast<socklen_t>(sizeof(sockaddr_in6));
} else {
continue;
}
memcpy(const_cast<sockaddr*>(addr.address()), ifa_it->ifa_addr, len);
SockaddrSetPort(addr, requested_port);
std::string addr_str = *SockaddrToString(&addr, false);
gpr_log(GPR_DEBUG,
"Adding local addr from interface %s flags 0x%x to server: %s",
ifa_name, ifa_it->ifa_flags, addr_str.c_str());
// We could have multiple interfaces with the same address (e.g.,
// bonding), so look for duplicates.
if (listener_sockets.Find(addr).ok()) {
gpr_log(GPR_DEBUG, "Skipping duplicate addr %s on interface %s",
addr_str.c_str(), ifa_name);
continue;
}
auto result = CreateAndPrepareListenerSocket(options, addr);
if (!result.ok()) {
op_status = absl::FailedPreconditionError(
absl::StrCat("Failed to add listener: ", addr_str,
" due to error: ", result.status().message()));
break;
} else {
listener_sockets.Append(*result);
assigned_port = result->port;
no_local_addresses = false;
}
}
freeifaddrs(ifa);
GRPC_RETURN_IF_ERROR(op_status);
if (no_local_addresses) {
return absl::FailedPreconditionError("No local addresses");
}
return assigned_port;
#else
GPR_ASSERT(false && "System does not support ifaddrs");
#endif
}
absl::StatusOr<int> ListenerContainerAddWildcardAddresses(
ListenerSocketsContainer& listener_sockets, const PosixTcpOptions& options,
int requested_port) {
ResolvedAddress wild4 = SockaddrMakeWild4(requested_port);
ResolvedAddress wild6 = SockaddrMakeWild6(requested_port);
absl::StatusOr<ListenerSocket> v6_sock;
absl::StatusOr<ListenerSocket> v4_sock;
int assigned_port = 0;
if (SystemHasIfAddrs() && options.expand_wildcard_addrs) {
return ListenerContainerAddAllLocalAddresses(listener_sockets, options,
requested_port);
}
// Try listening on IPv6 first.
v6_sock = CreateAndPrepareListenerSocket(options, wild6);
if (v6_sock.ok()) {
listener_sockets.Append(*v6_sock);
requested_port = v6_sock->port;
assigned_port = v6_sock->port;
if (v6_sock->dsmode == PosixSocketWrapper::DSMODE_DUALSTACK ||
v6_sock->dsmode == PosixSocketWrapper::DSMODE_IPV4) {
return v6_sock->port;
}
}
// If we got a v6-only socket or nothing, try adding 0.0.0.0.
SockaddrSetPort(wild4, requested_port);
v4_sock = CreateAndPrepareListenerSocket(options, wild4);
if (v4_sock.ok()) {
assigned_port = v4_sock->port;
listener_sockets.Append(*v4_sock);
}
if (assigned_port > 0) {
if (!v6_sock.ok()) {
gpr_log(GPR_INFO,
"Failed to add :: listener, the environment may not support "
"IPv6: %s",
v6_sock.status().ToString().c_str());
}
if (!v4_sock.ok()) {
gpr_log(GPR_INFO,
"Failed to add 0.0.0.0 listener, "
"the environment may not support IPv4: %s",
v4_sock.status().ToString().c_str());
}
return assigned_port;
} else {
GPR_ASSERT(!v6_sock.ok());
GPR_ASSERT(!v4_sock.ok());
return absl::FailedPreconditionError(absl::StrCat(
"Failed to add any wildcard listeners: ", v6_sock.status().message(),
v4_sock.status().message()));
}
}
#else // GRPC_POSIX_SOCKET_UTILS_COMMON
absl::StatusOr<ListenerSocketsContainer::ListenerSocket>
CreateAndPrepareListenerSocket(const PosixTcpOptions& /*options*/,
const grpc_event_engine::experimental::
EventEngine::ResolvedAddress& /*addr*/) {
GPR_ASSERT(
false &&
"CreateAndPrepareListenerSocket is not supported on this platform");
}
absl::StatusOr<int> ListenerContainerAddWildcardAddresses(
ListenerSocketsContainer& /*listener_sockets*/,
const PosixTcpOptions& /*options*/, int /*requested_port*/) {
GPR_ASSERT(false &&
"ListenerContainerAddWildcardAddresses is not supported on this "
"platform");
}
absl::StatusOr<int> ListenerContainerAddAllLocalAddresses(
ListenerSocketsContainer& /*listener_sockets*/,
const PosixTcpOptions& /*options*/, int /*requested_port*/) {
GPR_ASSERT(false &&
"ListenerContainerAddAllLocalAddresses is not supported on this "
"platform");
}
#endif // GRPC_POSIX_SOCKET_UTILS_COMMON
} // namespace posix_engine
} // namespace grpc_event_engine

@ -0,0 +1,91 @@
// Copyright 2022 gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_UTILS_H
#define GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_UTILS_H
#include <grpc/support/port_platform.h>
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
namespace grpc_event_engine {
namespace posix_engine {
// This interface exists to allow different Event Engines to implement different
// custom interception operations while a socket is Appended. The
// listener util functions are defined over this interface and thus can be
// shared across multiple event engines.
class ListenerSocketsContainer {
public:
struct ListenerSocket {
// Listener socket fd
PosixSocketWrapper sock;
// Assigned/chosen listening port
int port;
// Socket configuration
bool zero_copy_enabled;
// Address at which the socket is listening for connections
grpc_event_engine::experimental::EventEngine::ResolvedAddress addr;
// Dual stack mode.
PosixSocketWrapper::DSMode dsmode;
};
// Adds a socket to the internal db of sockets associated with a listener.
virtual void Append(ListenerSocket socket) = 0;
// Returns a non-OK status if the socket cannot be found. Otherwise, returns
// the socket.
virtual absl::StatusOr<ListenerSocket> Find(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
addr) = 0;
virtual ~ListenerSocketsContainer() = default;
};
// Creates and configures a socket to be used by the Event Engine Listener. The
// type of the socket to create is determined by the by the passed address. The
// socket configuration is specified by passed tcp options. If successful, it
// returns a ListenerSocketsContainer::ListenerSocket type which holds the
// socket fd and its dsmode. If unsuccessful, it returns a Not-OK status.
absl::StatusOr<ListenerSocketsContainer::ListenerSocket>
CreateAndPrepareListenerSocket(
const PosixTcpOptions& options,
const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr);
// Instead of creating and adding a socket bound to specific address, this
// function creates and adds a socket bound to the wildcard address on the
// server. The newly created socket is configured according to the passed
// options and added to the passed ListenerSocketsContainer object. The function
// returns the port at which the created socket listens for incoming
// connections.
absl::StatusOr<int> ListenerContainerAddWildcardAddresses(
ListenerSocketsContainer& listener_sockets, const PosixTcpOptions& options,
int requested_port);
// Get all addresses assigned to network interfaces on the machine and create
// and add a socket for each local address. Each newly created socket is
// configured according to the passed options and added to the passed
// ListenerSocketsContainer object. The requested_port is the port to use for
// every socket. If set to 0, a random port will be used for every socket.
// The function returns the chosen port number for all created sockets.
absl::StatusOr<int> ListenerContainerAddAllLocalAddresses(
ListenerSocketsContainer& listener_sockets, const PosixTcpOptions& options,
int requested_port);
} // namespace posix_engine
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_UTILS_H

@ -19,6 +19,7 @@
#include <errno.h>
#include <inttypes.h>
#include <limits.h>
#include <stdlib.h>
#include "absl/cleanup/cleanup.h"
#include "absl/status/statusor.h"
@ -58,6 +59,7 @@
#include "src/core/lib/gprpp/strerror.h"
#ifdef GRPC_HAVE_UNIX_SOCKET
#include <sys/stat.h> // IWYU pragma: keep
#include <sys/un.h>
#endif
@ -195,7 +197,7 @@ int Accept4(int sockfd,
grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr,
int nonblock, int cloexec) {
int fd, flags;
socklen_t len = addr.size();
socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
fd = accept(sockfd, const_cast<sockaddr*>(addr.address()), &len);
if (fd >= 0) {
if (nonblock) {
@ -224,7 +226,7 @@ int Accept4(int sockfd,
int flags = 0;
flags |= nonblock ? SOCK_NONBLOCK : 0;
flags |= cloexec ? SOCK_CLOEXEC : 0;
socklen_t len = addr.size();
socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
return accept4(sockfd, const_cast<sockaddr*>(addr.address()), &len, flags);
}
@ -354,6 +356,124 @@ absl::StatusOr<std::string> SockaddrToString(
return out;
}
EventEngine::ResolvedAddress SockaddrMakeWild6(int port) {
EventEngine::ResolvedAddress resolved_wild_out;
sockaddr_in6* wild_out = reinterpret_cast<sockaddr_in6*>(
const_cast<sockaddr*>(resolved_wild_out.address()));
GPR_ASSERT(port >= 0 && port < 65536);
memset(wild_out, 0, sizeof(sockaddr_in6));
wild_out->sin6_family = AF_INET6;
wild_out->sin6_port = htons(static_cast<uint16_t>(port));
return EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(wild_out),
static_cast<socklen_t>(sizeof(sockaddr_in6)));
}
EventEngine::ResolvedAddress SockaddrMakeWild4(int port) {
EventEngine::ResolvedAddress resolved_wild_out;
sockaddr_in* wild_out = reinterpret_cast<sockaddr_in*>(
const_cast<sockaddr*>(resolved_wild_out.address()));
GPR_ASSERT(port >= 0 && port < 65536);
memset(wild_out, 0, sizeof(sockaddr_in));
wild_out->sin_family = AF_INET;
wild_out->sin_port = htons(static_cast<uint16_t>(port));
return EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(wild_out),
static_cast<socklen_t>(sizeof(sockaddr_in)));
}
int SockaddrGetPort(const EventEngine::ResolvedAddress& resolved_addr) {
const sockaddr* addr = resolved_addr.address();
switch (addr->sa_family) {
case AF_INET:
return ntohs((reinterpret_cast<const sockaddr_in*>(addr))->sin_port);
case AF_INET6:
return ntohs((reinterpret_cast<const sockaddr_in6*>(addr))->sin6_port);
#ifdef GRPC_HAVE_UNIX_SOCKET
case AF_UNIX:
return 1;
#endif
default:
gpr_log(GPR_ERROR, "Unknown socket family %d in SockaddrGetPort",
addr->sa_family);
abort();
}
}
void SockaddrSetPort(EventEngine::ResolvedAddress& resolved_addr, int port) {
sockaddr* addr = const_cast<sockaddr*>(resolved_addr.address());
switch (addr->sa_family) {
case AF_INET:
GPR_ASSERT(port >= 0 && port < 65536);
(reinterpret_cast<sockaddr_in*>(addr))->sin_port =
htons(static_cast<uint16_t>(port));
return;
case AF_INET6:
GPR_ASSERT(port >= 0 && port < 65536);
(reinterpret_cast<sockaddr_in6*>(addr))->sin6_port =
htons(static_cast<uint16_t>(port));
return;
default:
gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port",
addr->sa_family);
abort();
}
}
void UnlinkIfUnixDomainSocket(
const EventEngine::ResolvedAddress& resolved_addr) {
#ifdef GRPC_HAVE_UNIX_SOCKET
if (resolved_addr.address()->sa_family != AF_UNIX) {
return;
}
struct sockaddr_un* un = reinterpret_cast<struct sockaddr_un*>(
const_cast<sockaddr*>(resolved_addr.address()));
// There is nothing to unlink for an abstract unix socket
if (un->sun_path[0] == '\0' && un->sun_path[1] != '\0') {
return;
}
struct stat st;
if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) {
unlink(un->sun_path);
}
#else
(void)resolved_addr;
#endif
}
absl::optional<int> SockaddrIsWildcard(
const EventEngine::ResolvedAddress& addr) {
const EventEngine::ResolvedAddress* resolved_addr = &addr;
EventEngine::ResolvedAddress addr4_normalized;
if (SockaddrIsV4Mapped(resolved_addr, &addr4_normalized)) {
resolved_addr = &addr4_normalized;
}
if (resolved_addr->address()->sa_family == AF_INET) {
// Check for 0.0.0.0
const sockaddr_in* addr4 =
reinterpret_cast<const sockaddr_in*>(resolved_addr->address());
if (addr4->sin_addr.s_addr != 0) {
return absl::nullopt;
}
return static_cast<int>(ntohs(addr4->sin_port));
} else if (resolved_addr->address()->sa_family == AF_INET6) {
// Check for ::
const sockaddr_in6* addr6 =
reinterpret_cast<const sockaddr_in6*>(resolved_addr->address());
int i;
for (i = 0; i < 16; i++) {
if (addr6->sin6_addr.s6_addr[i] != 0) {
return absl::nullopt;
}
}
return static_cast<int>(ntohs(addr6->sin6_port));
} else {
return absl::nullopt;
}
}
// Instruct the kernel to wait for specified number of bytes to be received on
// the socket before generating an interrupt for packet receive. If the call
// succeeds, it returns the number of bytes (wait threshold) that was actually

@ -23,6 +23,7 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/event_engine.h>
@ -159,12 +160,35 @@ bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* resolved_addr,
bool SockaddrToV4Mapped(const EventEngine::ResolvedAddress* resolved_addr,
EventEngine::ResolvedAddress* resolved_addr6_out);
// Converts a EventEngine::ResolvedAddress into a newly-allocated human-readable
// string.
// Make wild card IPv6 address with specified port.
EventEngine::ResolvedAddress SockaddrMakeWild6(int port);
// Make wild card IPv4 address with specified port.
EventEngine::ResolvedAddress SockaddrMakeWild4(int port);
// Given a resolved address, return the port number in the address.
int SockaddrGetPort(const EventEngine::ResolvedAddress& resolved_addr);
// Modifes the passed address to use the specified port number. The
// operation would only succeed if the passed address is an IPv4 or Ipv6
// address. Otherwise the function call would abort fail.
void SockaddrSetPort(EventEngine::ResolvedAddress& resolved_addr, int port);
// Unlink the path pointed to by the given address if it refers to UDS path.
void UnlinkIfUnixDomainSocket(
const EventEngine::ResolvedAddress& resolved_addr);
// Returns the port number associated with the address if the given address is
// not a wildcard ipv6 or ipv6 address. Otherwise returns absl::nullopt
absl::optional<int> SockaddrIsWildcard(
const EventEngine::ResolvedAddress& addr);
// Converts a EventEngine::ResolvedAddress into a newly-allocated
// human-readable string.
//
// Currently, only the AF_INET, AF_INET6, and AF_UNIX families are recognized.
// If the normalize flag is enabled, ::ffff:0.0.0.0/96 IPv6 addresses are
// displayed as plain IPv4.
// Currently, only the AF_INET, AF_INET6, and AF_UNIX families are
// recognized. If the normalize flag is enabled, ::ffff:0.0.0.0/96 IPv6
// addresses are displayed as plain IPv4.
absl::StatusOr<std::string> SockaddrToString(
const EventEngine::ResolvedAddress* resolved_addr, bool normalize);

@ -191,3 +191,21 @@ grpc_cc_test(
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "posix_engine_listener_utils_test",
srcs = ["posix_engine_listener_utils_test.cc"],
external_deps = ["gtest"],
language = "C++",
tags = [
"no_windows",
],
uses_event_engine = False,
deps = [
"//src/core:event_engine_common",
"//src/core:posix_event_engine_listener_utils",
"//src/core:posix_event_engine_tcp_socket_utils",
"//src/core:socket_mutator",
"//test/core/util:grpc_test_util",
],
)

@ -0,0 +1,162 @@
// Copyright 2022 gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <list>
#include <string>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/iomgr/port.h"
// This test won't work except with posix sockets enabled
#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
#include <ifaddrs.h>
#include <gtest/gtest.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "test/core/util/port.h"
namespace grpc_event_engine {
namespace posix_engine {
namespace {
using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
class TestListenerSocketsContainer : public ListenerSocketsContainer {
public:
void Append(ListenerSocket socket) override { sockets_.push_back(socket); }
absl::StatusOr<ListenerSocket> Find(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr)
override {
for (auto socket = sockets_.begin(); socket != sockets_.end(); ++socket) {
if (socket->addr.size() == addr.size() &&
memcmp(socket->addr.address(), addr.address(), addr.size()) == 0) {
return *socket;
}
}
return absl::NotFoundError("Socket not found!");
}
int Size() { return static_cast<int>(sockets_.size()); }
std::list<ListenerSocket>::const_iterator begin() { return sockets_.begin(); }
std::list<ListenerSocket>::const_iterator end() { return sockets_.end(); }
private:
std::list<ListenerSocket> sockets_;
};
} // namespace
TEST(PosixEngineListenerUtils, ListenerContainerAddWildcardAddressesTest) {
TestListenerSocketsContainer listener_sockets;
int port = grpc_pick_unused_port_or_die();
ChannelArgsEndpointConfig config;
auto result = ListenerContainerAddWildcardAddresses(
listener_sockets, TcpOptionsFromEndpointConfig(config), port);
EXPECT_TRUE(result.ok());
EXPECT_GT(*result, 0);
port = *result;
EXPECT_GE(listener_sockets.Size(), 1);
EXPECT_LE(listener_sockets.Size(), 2);
for (auto socket = listener_sockets.begin(); socket != listener_sockets.end();
++socket) {
ASSERT_TRUE((*socket).addr.address()->sa_family == AF_INET6 ||
(*socket).addr.address()->sa_family == AF_INET);
if ((*socket).addr.address()->sa_family == AF_INET6) {
EXPECT_EQ(SockaddrToString(&(*socket).addr, true).value(),
absl::StrCat("[::]:", std::to_string(port)));
} else if ((*socket).addr.address()->sa_family == AF_INET) {
EXPECT_EQ(SockaddrToString(&(*socket).addr, true).value(),
absl::StrCat("0.0.0.0:", std::to_string(port)));
}
close(socket->sock.Fd());
}
}
#ifdef GRPC_HAVE_IFADDRS
TEST(PosixEngineListenerUtils, ListenerContainerAddAllLocalAddressesTest) {
TestListenerSocketsContainer listener_sockets;
int port = grpc_pick_unused_port_or_die();
ChannelArgsEndpointConfig config;
struct ifaddrs* ifa = nullptr;
struct ifaddrs* ifa_it;
if (getifaddrs(&ifa) != 0 || ifa == nullptr) {
// No ifaddresses available.
gpr_log(GPR_INFO,
"Skipping ListenerAddAllLocalAddressesTest because the machine "
"does not have interfaces configured for listening.");
return;
}
int num_ifaddrs = 0;
for (ifa_it = ifa; ifa_it != nullptr; ifa_it = ifa_it->ifa_next) {
++num_ifaddrs;
}
freeifaddrs(ifa);
auto result = ListenerContainerAddAllLocalAddresses(
listener_sockets, TcpOptionsFromEndpointConfig(config), port);
if (num_ifaddrs == 0 || !result.ok()) {
// Its possible that the machine may not have any Ipv4/Ipv6 interfaces
// configured for listening. In that case, dont fail test.
gpr_log(GPR_INFO,
"Skipping ListenerAddAllLocalAddressesTest because the machine "
"does not have Ipv6/Ipv6 interfaces configured for listening.");
return;
}
// Some sockets have been created and bound to interfaces on the machiene.
// Verify that they are listening on the correct port.
EXPECT_GT(*result, 0);
port = *result;
EXPECT_GE(listener_sockets.Size(), 1);
EXPECT_LE(listener_sockets.Size(), num_ifaddrs);
for (auto socket = listener_sockets.begin(); socket != listener_sockets.end();
++socket) {
ASSERT_TRUE((*socket).addr.address()->sa_family == AF_INET6 ||
(*socket).addr.address()->sa_family == AF_INET);
EXPECT_EQ(SockaddrGetPort((*socket).addr), port);
close(socket->sock.Fd());
}
}
#endif // GRPC_HAVE_IFADDRS
} // namespace posix_engine
} // namespace grpc_event_engine
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else /* GRPC_POSIX_SOCKET_UTILS_COMMON */
int main(int argc, char** argv) { return 0; }
#endif /* GRPC_POSIX_SOCKET_UTILS_COMMON */

@ -384,6 +384,24 @@ TEST(TcpPosixSocketUtilsTest, SockAddrToStringTest) {
#endif
}
TEST(TcpPosixSocketUtilsTest, SockAddrPortTest) {
EventEngine::ResolvedAddress wild6 = SockaddrMakeWild6(20);
EventEngine::ResolvedAddress wild4 = SockaddrMakeWild4(20);
// Verify the string description matches the expected wildcard address with
// correct port number.
EXPECT_EQ(SockaddrToString(&wild6, true).value(), "[::]:20");
EXPECT_EQ(SockaddrToString(&wild4, true).value(), "0.0.0.0:20");
// Update the port values.
SockaddrSetPort(wild4, 21);
SockaddrSetPort(wild6, 22);
// Read back the port values.
EXPECT_EQ(SockaddrGetPort(wild4), 21);
EXPECT_EQ(SockaddrGetPort(wild6), 22);
// Ensure the string description reflects the updated port values.
EXPECT_EQ(SockaddrToString(&wild4, true).value(), "0.0.0.0:21");
EXPECT_EQ(SockaddrToString(&wild6, true).value(), "[::]:22");
}
} // namespace posix_engine
} // namespace grpc_event_engine
@ -394,6 +412,6 @@ int main(int argc, char** argv) {
#else /* GRPC_POSIX_SOCKET_UTILS_COMMON */
int main(int argc, char** argv) { return 1; }
int main(int argc, char** argv) { return 0; }
#endif /* GRPC_POSIX_SOCKET_UTILS_COMMON */

@ -5333,6 +5333,28 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "posix_engine_listener_utils_test",
"platforms": [
"linux",
"mac",
"posix"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save