From 7eb5fb5a252913263e0893f1fd8b944214183a79 Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Wed, 2 Nov 2022 13:25:25 -0700 Subject: [PATCH] Util functions to help with posix event engine listener implementation (#31435) * Util functions to help with posix engine listener implementation * sanity * update comments in posix_engine_listener_utils.h * review comments * iwyu * revert prev commit * iwyu * update build * regenerate projects * minor fixes * comments --- CMakeLists.txt | 43 ++ build_autogenerated.yaml | 19 + src/core/BUILD | 24 ++ .../posix_engine_listener_utils.cc | 375 ++++++++++++++++++ .../posix_engine_listener_utils.h | 91 +++++ .../posix_engine/tcp_socket_utils.cc | 124 +++++- .../posix_engine/tcp_socket_utils.h | 34 +- test/core/event_engine/posix/BUILD | 18 + .../posix/posix_engine_listener_utils_test.cc | 162 ++++++++ .../posix/tcp_posix_socket_utils_test.cc | 20 +- tools/run_tests/generated/tests.json | 22 + 11 files changed, 924 insertions(+), 8 deletions(-) create mode 100644 src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc create mode 100644 src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h create mode 100644 test/core/event_engine/posix/posix_engine_listener_utils_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 06561be561b..55226f9dccf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1071,6 +1071,9 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx posix_endpoint_test) endif() + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + add_dependencies(buildtests_cxx posix_engine_listener_utils_test) + endif() if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx posix_event_engine_test) endif() @@ -14870,6 +14873,46 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) ) +endif() +endif() +if(gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + + add_executable(posix_engine_listener_utils_test + src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc + src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc + src/core/lib/iomgr/socket_mutator.cc + test/core/event_engine/posix/posix_engine_listener_utils_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc + ) + + target_include_directories(posix_engine_listener_utils_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} + ) + + target_link_libraries(posix_engine_listener_utils_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + ) + + endif() endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 0df061e02aa..cc8ef3c222f 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -8633,6 +8633,25 @@ targets: - linux - posix - mac +- name: posix_engine_listener_utils_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h + - src/core/lib/event_engine/posix_engine/tcp_socket_utils.h + - src/core/lib/iomgr/socket_mutator.h + src: + - src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc + - src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc + - src/core/lib/iomgr/socket_mutator.cc + - test/core/event_engine/posix/posix_engine_listener_utils_test.cc + deps: + - grpc_test_util + platforms: + - linux + - posix + - mac - name: posix_event_engine_test gtest: true build: test diff --git a/src/core/BUILD b/src/core/BUILD index 2e0019b5bc5..c8c7f8eabeb 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1703,6 +1703,30 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "posix_event_engine_listener_utils", + srcs = [ + "lib/event_engine/posix_engine/posix_engine_listener_utils.cc", + ], + hdrs = [ + "lib/event_engine/posix_engine/posix_engine_listener_utils.h", + ], + external_deps = [ + "absl/cleanup", + "absl/status", + "absl/status:statusor", + "absl/strings", + ], + deps = [ + "iomgr_port", + "posix_event_engine_tcp_socket_utils", + "socket_mutator", + "status_helper", + "//:event_engine_base_hdrs", + "//:gpr", + ], +) + grpc_cc_library( name = "posix_event_engine", srcs = ["lib/event_engine/posix_engine/posix_engine.cc"], diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc b/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc new file mode 100644 index 00000000000..55c02408300 --- /dev/null +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc @@ -0,0 +1,375 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h" + +#include +#include +#include +#include + +#include +#include + +#include "absl/cleanup/cleanup.h" +#include "absl/status/status.h" + +#include + +#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" +#include "src/core/lib/gprpp/status_helper.h" +#include "src/core/lib/iomgr/port.h" +#include "src/core/lib/iomgr/socket_mutator.h" + +#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 + +#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON +#include // IWYU pragma: keep +#include // IWYU pragma: keep +#include // IWYU pragma: keep +#include // IWYU pragma: keep + +#include "absl/strings/str_cat.h" +#endif + +namespace grpc_event_engine { +namespace posix_engine { + +#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON + +namespace { + +using ResolvedAddress = + grpc_event_engine::experimental::EventEngine::ResolvedAddress; +using ListenerSocket = ListenerSocketsContainer::ListenerSocket; + +#ifdef GRPC_HAVE_IFADDRS + +// Bind to "::" to get a port number not used by any address. +absl::StatusOr GetUnusedPort() { + ResolvedAddress wild = SockaddrMakeWild6(0); + PosixSocketWrapper::DSMode dsmode; + auto sock = PosixSocketWrapper::CreateDualStackSocket(nullptr, wild, + SOCK_STREAM, 0, dsmode); + GRPC_RETURN_IF_ERROR(sock.status()); + if (dsmode == PosixSocketWrapper::DSMode::DSMODE_IPV4) { + wild = SockaddrMakeWild4(0); + } + if (bind(sock->Fd(), wild.address(), wild.size()) != 0) { + close(sock->Fd()); + return absl::FailedPreconditionError( + absl::StrCat("bind(GetUnusedPort): ", std::strerror(errno))); + } + socklen_t len = wild.size(); + if (getsockname(sock->Fd(), const_cast(wild.address()), &len) != + 0) { + close(sock->Fd()); + return absl::FailedPreconditionError( + absl::StrCat("getsockname(GetUnusedPort): ", std::strerror(errno))); + } + close(sock->Fd()); + int port = SockaddrGetPort(wild); + if (port <= 0) { + return absl::FailedPreconditionError("Bad port"); + } + return port; +} + +bool SystemHasIfAddrs() { return true; } + +#else // GRPC_HAVE_IFADDRS + +bool SystemHasIfAddrs() { return false; } + +#endif // GRPC_HAVE_IFADDRS + +// get max listen queue size on linux +int InitMaxAcceptQueueSize() { + int n = SOMAXCONN; + char buf[64]; + FILE* fp = fopen("/proc/sys/net/core/somaxconn", "r"); + int max_accept_queue_size; + if (fp == nullptr) { + // 2.4 kernel. + return SOMAXCONN; + } + if (fgets(buf, sizeof buf, fp)) { + char* end; + long i = strtol(buf, &end, 10); + if (i > 0 && i <= INT_MAX && end && *end == '\n') { + n = static_cast(i); + } + } + fclose(fp); + max_accept_queue_size = n; + + if (max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) { + gpr_log(GPR_INFO, + "Suspiciously small accept queue (%d) will probably lead to " + "connection drops", + max_accept_queue_size); + } + return max_accept_queue_size; +} + +int GetMaxAcceptQueueSize() { + static const int kMaxAcceptQueueSize = InitMaxAcceptQueueSize(); + return kMaxAcceptQueueSize; +} + +// Prepare a recently-created socket for listening. +absl::Status PrepareSocket(const PosixTcpOptions& options, + ListenerSocket& socket) { + ResolvedAddress sockname_temp; + absl::Status error; + int fd = socket.sock.Fd(); + GPR_ASSERT(fd >= 0); + bool close_fd = true; + socket.zero_copy_enabled = false; + socket.port = 0; + auto sock_cleanup = absl::MakeCleanup([&close_fd, fd]() -> void { + if (close_fd && fd >= 0) { + close(fd); + } + }); + if (PosixSocketWrapper::IsSocketReusePortSupported() && + options.allow_reuse_port && socket.addr.address()->sa_family != AF_UNIX) { + GRPC_RETURN_IF_ERROR(socket.sock.SetSocketReusePort(1)); + } + +#ifdef GRPC_LINUX_ERRQUEUE + if (!socket.sock.SetSocketZeroCopy().ok()) { + // it's not fatal, so just log it. + gpr_log(GPR_DEBUG, "Node does not support SO_ZEROCOPY, continuing."); + } else { + socket.zero_copy_enabled = true; + } +#endif + + GRPC_RETURN_IF_ERROR(socket.sock.SetSocketNonBlocking(1)); + GRPC_RETURN_IF_ERROR(socket.sock.SetSocketCloexec(1)); + + if (socket.addr.address()->sa_family != AF_UNIX) { + GRPC_RETURN_IF_ERROR(socket.sock.SetSocketLowLatency(1)); + GRPC_RETURN_IF_ERROR(socket.sock.SetSocketReuseAddr(1)); + socket.sock.TrySetSocketTcpUserTimeout(options, false); + } + GRPC_RETURN_IF_ERROR(socket.sock.SetSocketNoSigpipeIfPossible()); + GRPC_RETURN_IF_ERROR(socket.sock.ApplySocketMutatorInOptions( + GRPC_FD_SERVER_LISTENER_USAGE, options)); + + if (bind(fd, socket.addr.address(), socket.addr.size()) < 0) { + return absl::FailedPreconditionError( + absl::StrCat("Error in bind: ", std::strerror(errno))); + } + + if (listen(fd, GetMaxAcceptQueueSize()) < 0) { + return absl::FailedPreconditionError( + absl::StrCat("Error in listen: ", std::strerror(errno))); + } + socklen_t len = static_cast(sizeof(struct sockaddr_storage)); + + if (getsockname(fd, const_cast(sockname_temp.address()), &len) < + 0) { + return absl::FailedPreconditionError( + absl::StrCat("Error in getsockname: ", std::strerror(errno))); + } + + socket.port = SockaddrGetPort(ResolvedAddress(sockname_temp.address(), len)); + // No errors. Set close_fd to false to ensure the socket is not closed. + close_fd = false; + return absl::OkStatus(); +} + +} // namespace + +absl::StatusOr CreateAndPrepareListenerSocket( + const PosixTcpOptions& options, const ResolvedAddress& addr) { + ResolvedAddress addr4_copy; + ListenerSocket socket; + auto result = PosixSocketWrapper::CreateDualStackSocket( + nullptr, addr, SOCK_STREAM, 0, socket.dsmode); + if (!result.ok()) { + return result.status(); + } + socket.sock = *result; + if (socket.dsmode == PosixSocketWrapper::DSMODE_IPV4 && + SockaddrIsV4Mapped(&addr, &addr4_copy)) { + socket.addr = addr4_copy; + } else { + socket.addr = addr; + } + + GRPC_RETURN_IF_ERROR(PrepareSocket(options, socket)); + GPR_ASSERT(socket.port > 0); + return socket; +} + +absl::StatusOr ListenerContainerAddAllLocalAddresses( + ListenerSocketsContainer& listener_sockets, const PosixTcpOptions& options, + int requested_port) { +#ifdef GRPC_HAVE_IFADDRS + absl::Status op_status = absl::OkStatus(); + struct ifaddrs* ifa = nullptr; + struct ifaddrs* ifa_it; + bool no_local_addresses = true; + int assigned_port = 0; + if (requested_port == 0) { + auto result = GetUnusedPort(); + GRPC_RETURN_IF_ERROR(result.status()); + requested_port = *result; + gpr_log(GPR_DEBUG, "Picked unused port %d", requested_port); + } + if (getifaddrs(&ifa) != 0 || ifa == nullptr) { + return absl::FailedPreconditionError( + absl::StrCat("getifaddrs: ", std::strerror(errno))); + } + for (ifa_it = ifa; ifa_it != nullptr; ifa_it = ifa_it->ifa_next) { + ResolvedAddress addr; + socklen_t len; + const char* ifa_name = (ifa_it->ifa_name ? ifa_it->ifa_name : ""); + if (ifa_it->ifa_addr == nullptr) { + continue; + } else if (ifa_it->ifa_addr->sa_family == AF_INET) { + len = static_cast(sizeof(sockaddr_in)); + } else if (ifa_it->ifa_addr->sa_family == AF_INET6) { + len = static_cast(sizeof(sockaddr_in6)); + } else { + continue; + } + memcpy(const_cast(addr.address()), ifa_it->ifa_addr, len); + SockaddrSetPort(addr, requested_port); + std::string addr_str = *SockaddrToString(&addr, false); + gpr_log(GPR_DEBUG, + "Adding local addr from interface %s flags 0x%x to server: %s", + ifa_name, ifa_it->ifa_flags, addr_str.c_str()); + // We could have multiple interfaces with the same address (e.g., + // bonding), so look for duplicates. + if (listener_sockets.Find(addr).ok()) { + gpr_log(GPR_DEBUG, "Skipping duplicate addr %s on interface %s", + addr_str.c_str(), ifa_name); + continue; + } + auto result = CreateAndPrepareListenerSocket(options, addr); + if (!result.ok()) { + op_status = absl::FailedPreconditionError( + absl::StrCat("Failed to add listener: ", addr_str, + " due to error: ", result.status().message())); + break; + } else { + listener_sockets.Append(*result); + assigned_port = result->port; + no_local_addresses = false; + } + } + freeifaddrs(ifa); + GRPC_RETURN_IF_ERROR(op_status); + if (no_local_addresses) { + return absl::FailedPreconditionError("No local addresses"); + } + return assigned_port; + +#else + GPR_ASSERT(false && "System does not support ifaddrs"); +#endif +} + +absl::StatusOr ListenerContainerAddWildcardAddresses( + ListenerSocketsContainer& listener_sockets, const PosixTcpOptions& options, + int requested_port) { + ResolvedAddress wild4 = SockaddrMakeWild4(requested_port); + ResolvedAddress wild6 = SockaddrMakeWild6(requested_port); + absl::StatusOr v6_sock; + absl::StatusOr v4_sock; + int assigned_port = 0; + + if (SystemHasIfAddrs() && options.expand_wildcard_addrs) { + return ListenerContainerAddAllLocalAddresses(listener_sockets, options, + requested_port); + } + + // Try listening on IPv6 first. + v6_sock = CreateAndPrepareListenerSocket(options, wild6); + if (v6_sock.ok()) { + listener_sockets.Append(*v6_sock); + requested_port = v6_sock->port; + assigned_port = v6_sock->port; + if (v6_sock->dsmode == PosixSocketWrapper::DSMODE_DUALSTACK || + v6_sock->dsmode == PosixSocketWrapper::DSMODE_IPV4) { + return v6_sock->port; + } + } + // If we got a v6-only socket or nothing, try adding 0.0.0.0. + SockaddrSetPort(wild4, requested_port); + v4_sock = CreateAndPrepareListenerSocket(options, wild4); + if (v4_sock.ok()) { + assigned_port = v4_sock->port; + listener_sockets.Append(*v4_sock); + } + if (assigned_port > 0) { + if (!v6_sock.ok()) { + gpr_log(GPR_INFO, + "Failed to add :: listener, the environment may not support " + "IPv6: %s", + v6_sock.status().ToString().c_str()); + } + if (!v4_sock.ok()) { + gpr_log(GPR_INFO, + "Failed to add 0.0.0.0 listener, " + "the environment may not support IPv4: %s", + v4_sock.status().ToString().c_str()); + } + return assigned_port; + } else { + GPR_ASSERT(!v6_sock.ok()); + GPR_ASSERT(!v4_sock.ok()); + return absl::FailedPreconditionError(absl::StrCat( + "Failed to add any wildcard listeners: ", v6_sock.status().message(), + v4_sock.status().message())); + } +} + +#else // GRPC_POSIX_SOCKET_UTILS_COMMON + +absl::StatusOr +CreateAndPrepareListenerSocket(const PosixTcpOptions& /*options*/, + const grpc_event_engine::experimental:: + EventEngine::ResolvedAddress& /*addr*/) { + GPR_ASSERT( + false && + "CreateAndPrepareListenerSocket is not supported on this platform"); +} + +absl::StatusOr ListenerContainerAddWildcardAddresses( + ListenerSocketsContainer& /*listener_sockets*/, + const PosixTcpOptions& /*options*/, int /*requested_port*/) { + GPR_ASSERT(false && + "ListenerContainerAddWildcardAddresses is not supported on this " + "platform"); +} + +absl::StatusOr ListenerContainerAddAllLocalAddresses( + ListenerSocketsContainer& /*listener_sockets*/, + const PosixTcpOptions& /*options*/, int /*requested_port*/) { + GPR_ASSERT(false && + "ListenerContainerAddAllLocalAddresses is not supported on this " + "platform"); +} + +#endif // GRPC_POSIX_SOCKET_UTILS_COMMON + +} // namespace posix_engine +} // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h b/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h new file mode 100644 index 00000000000..a010a020b71 --- /dev/null +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h @@ -0,0 +1,91 @@ +// Copyright 2022 gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_UTILS_H +#define GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_UTILS_H + +#include + +#include "absl/status/statusor.h" + +#include + +#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" + +namespace grpc_event_engine { +namespace posix_engine { + +// This interface exists to allow different Event Engines to implement different +// custom interception operations while a socket is Appended. The +// listener util functions are defined over this interface and thus can be +// shared across multiple event engines. +class ListenerSocketsContainer { + public: + struct ListenerSocket { + // Listener socket fd + PosixSocketWrapper sock; + // Assigned/chosen listening port + int port; + // Socket configuration + bool zero_copy_enabled; + // Address at which the socket is listening for connections + grpc_event_engine::experimental::EventEngine::ResolvedAddress addr; + // Dual stack mode. + PosixSocketWrapper::DSMode dsmode; + }; + // Adds a socket to the internal db of sockets associated with a listener. + virtual void Append(ListenerSocket socket) = 0; + + // Returns a non-OK status if the socket cannot be found. Otherwise, returns + // the socket. + virtual absl::StatusOr Find( + const grpc_event_engine::experimental::EventEngine::ResolvedAddress& + addr) = 0; + + virtual ~ListenerSocketsContainer() = default; +}; + +// Creates and configures a socket to be used by the Event Engine Listener. The +// type of the socket to create is determined by the by the passed address. The +// socket configuration is specified by passed tcp options. If successful, it +// returns a ListenerSocketsContainer::ListenerSocket type which holds the +// socket fd and its dsmode. If unsuccessful, it returns a Not-OK status. +absl::StatusOr +CreateAndPrepareListenerSocket( + const PosixTcpOptions& options, + const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr); + +// Instead of creating and adding a socket bound to specific address, this +// function creates and adds a socket bound to the wildcard address on the +// server. The newly created socket is configured according to the passed +// options and added to the passed ListenerSocketsContainer object. The function +// returns the port at which the created socket listens for incoming +// connections. +absl::StatusOr ListenerContainerAddWildcardAddresses( + ListenerSocketsContainer& listener_sockets, const PosixTcpOptions& options, + int requested_port); + +// Get all addresses assigned to network interfaces on the machine and create +// and add a socket for each local address. Each newly created socket is +// configured according to the passed options and added to the passed +// ListenerSocketsContainer object. The requested_port is the port to use for +// every socket. If set to 0, a random port will be used for every socket. +// The function returns the chosen port number for all created sockets. +absl::StatusOr ListenerContainerAddAllLocalAddresses( + ListenerSocketsContainer& listener_sockets, const PosixTcpOptions& options, + int requested_port); + +} // namespace posix_engine +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_UTILS_H diff --git a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc index 4e1f4c58642..089c73139fa 100644 --- a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc +++ b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include "absl/cleanup/cleanup.h" #include "absl/status/statusor.h" @@ -58,6 +59,7 @@ #include "src/core/lib/gprpp/strerror.h" #ifdef GRPC_HAVE_UNIX_SOCKET +#include // IWYU pragma: keep #include #endif @@ -195,7 +197,7 @@ int Accept4(int sockfd, grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr, int nonblock, int cloexec) { int fd, flags; - socklen_t len = addr.size(); + socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES; fd = accept(sockfd, const_cast(addr.address()), &len); if (fd >= 0) { if (nonblock) { @@ -224,7 +226,7 @@ int Accept4(int sockfd, int flags = 0; flags |= nonblock ? SOCK_NONBLOCK : 0; flags |= cloexec ? SOCK_CLOEXEC : 0; - socklen_t len = addr.size(); + socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES; return accept4(sockfd, const_cast(addr.address()), &len, flags); } @@ -354,6 +356,124 @@ absl::StatusOr SockaddrToString( return out; } +EventEngine::ResolvedAddress SockaddrMakeWild6(int port) { + EventEngine::ResolvedAddress resolved_wild_out; + sockaddr_in6* wild_out = reinterpret_cast( + const_cast(resolved_wild_out.address())); + GPR_ASSERT(port >= 0 && port < 65536); + memset(wild_out, 0, sizeof(sockaddr_in6)); + wild_out->sin6_family = AF_INET6; + wild_out->sin6_port = htons(static_cast(port)); + return EventEngine::ResolvedAddress( + reinterpret_cast(wild_out), + static_cast(sizeof(sockaddr_in6))); +} + +EventEngine::ResolvedAddress SockaddrMakeWild4(int port) { + EventEngine::ResolvedAddress resolved_wild_out; + sockaddr_in* wild_out = reinterpret_cast( + const_cast(resolved_wild_out.address())); + GPR_ASSERT(port >= 0 && port < 65536); + memset(wild_out, 0, sizeof(sockaddr_in)); + wild_out->sin_family = AF_INET; + wild_out->sin_port = htons(static_cast(port)); + return EventEngine::ResolvedAddress( + reinterpret_cast(wild_out), + static_cast(sizeof(sockaddr_in))); +} + +int SockaddrGetPort(const EventEngine::ResolvedAddress& resolved_addr) { + const sockaddr* addr = resolved_addr.address(); + switch (addr->sa_family) { + case AF_INET: + return ntohs((reinterpret_cast(addr))->sin_port); + case AF_INET6: + return ntohs((reinterpret_cast(addr))->sin6_port); +#ifdef GRPC_HAVE_UNIX_SOCKET + case AF_UNIX: + return 1; +#endif + default: + gpr_log(GPR_ERROR, "Unknown socket family %d in SockaddrGetPort", + addr->sa_family); + abort(); + } +} + +void SockaddrSetPort(EventEngine::ResolvedAddress& resolved_addr, int port) { + sockaddr* addr = const_cast(resolved_addr.address()); + switch (addr->sa_family) { + case AF_INET: + GPR_ASSERT(port >= 0 && port < 65536); + (reinterpret_cast(addr))->sin_port = + htons(static_cast(port)); + return; + case AF_INET6: + GPR_ASSERT(port >= 0 && port < 65536); + (reinterpret_cast(addr))->sin6_port = + htons(static_cast(port)); + return; + default: + gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port", + addr->sa_family); + abort(); + } +} + +void UnlinkIfUnixDomainSocket( + const EventEngine::ResolvedAddress& resolved_addr) { +#ifdef GRPC_HAVE_UNIX_SOCKET + if (resolved_addr.address()->sa_family != AF_UNIX) { + return; + } + struct sockaddr_un* un = reinterpret_cast( + const_cast(resolved_addr.address())); + + // There is nothing to unlink for an abstract unix socket + if (un->sun_path[0] == '\0' && un->sun_path[1] != '\0') { + return; + } + + struct stat st; + if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) { + unlink(un->sun_path); + } +#else + (void)resolved_addr; +#endif +} + +absl::optional SockaddrIsWildcard( + const EventEngine::ResolvedAddress& addr) { + const EventEngine::ResolvedAddress* resolved_addr = &addr; + EventEngine::ResolvedAddress addr4_normalized; + if (SockaddrIsV4Mapped(resolved_addr, &addr4_normalized)) { + resolved_addr = &addr4_normalized; + } + if (resolved_addr->address()->sa_family == AF_INET) { + // Check for 0.0.0.0 + const sockaddr_in* addr4 = + reinterpret_cast(resolved_addr->address()); + if (addr4->sin_addr.s_addr != 0) { + return absl::nullopt; + } + return static_cast(ntohs(addr4->sin_port)); + } else if (resolved_addr->address()->sa_family == AF_INET6) { + // Check for :: + const sockaddr_in6* addr6 = + reinterpret_cast(resolved_addr->address()); + int i; + for (i = 0; i < 16; i++) { + if (addr6->sin6_addr.s6_addr[i] != 0) { + return absl::nullopt; + } + } + return static_cast(ntohs(addr6->sin6_port)); + } else { + return absl::nullopt; + } +} + // Instruct the kernel to wait for specified number of bytes to be received on // the socket before generating an interrupt for packet receive. If the call // succeeds, it returns the number of bytes (wait threshold) that was actually diff --git a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h index 43d96c7e3b7..7a1db0e416d 100644 --- a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h +++ b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h @@ -23,6 +23,7 @@ #include "absl/status/status.h" #include "absl/status/statusor.h" +#include "absl/types/optional.h" #include #include @@ -159,12 +160,35 @@ bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* resolved_addr, bool SockaddrToV4Mapped(const EventEngine::ResolvedAddress* resolved_addr, EventEngine::ResolvedAddress* resolved_addr6_out); -// Converts a EventEngine::ResolvedAddress into a newly-allocated human-readable -// string. +// Make wild card IPv6 address with specified port. +EventEngine::ResolvedAddress SockaddrMakeWild6(int port); + +// Make wild card IPv4 address with specified port. +EventEngine::ResolvedAddress SockaddrMakeWild4(int port); + +// Given a resolved address, return the port number in the address. +int SockaddrGetPort(const EventEngine::ResolvedAddress& resolved_addr); + +// Modifes the passed address to use the specified port number. The +// operation would only succeed if the passed address is an IPv4 or Ipv6 +// address. Otherwise the function call would abort fail. +void SockaddrSetPort(EventEngine::ResolvedAddress& resolved_addr, int port); + +// Unlink the path pointed to by the given address if it refers to UDS path. +void UnlinkIfUnixDomainSocket( + const EventEngine::ResolvedAddress& resolved_addr); + +// Returns the port number associated with the address if the given address is +// not a wildcard ipv6 or ipv6 address. Otherwise returns absl::nullopt +absl::optional SockaddrIsWildcard( + const EventEngine::ResolvedAddress& addr); + +// Converts a EventEngine::ResolvedAddress into a newly-allocated +// human-readable string. // -// Currently, only the AF_INET, AF_INET6, and AF_UNIX families are recognized. -// If the normalize flag is enabled, ::ffff:0.0.0.0/96 IPv6 addresses are -// displayed as plain IPv4. +// Currently, only the AF_INET, AF_INET6, and AF_UNIX families are +// recognized. If the normalize flag is enabled, ::ffff:0.0.0.0/96 IPv6 +// addresses are displayed as plain IPv4. absl::StatusOr SockaddrToString( const EventEngine::ResolvedAddress* resolved_addr, bool normalize); diff --git a/test/core/event_engine/posix/BUILD b/test/core/event_engine/posix/BUILD index cb93ab8eeaf..5a6f63a3d67 100644 --- a/test/core/event_engine/posix/BUILD +++ b/test/core/event_engine/posix/BUILD @@ -191,3 +191,21 @@ grpc_cc_test( "//test/core/util:grpc_test_util", ], ) + +grpc_cc_test( + name = "posix_engine_listener_utils_test", + srcs = ["posix_engine_listener_utils_test.cc"], + external_deps = ["gtest"], + language = "C++", + tags = [ + "no_windows", + ], + uses_event_engine = False, + deps = [ + "//src/core:event_engine_common", + "//src/core:posix_event_engine_listener_utils", + "//src/core:posix_event_engine_tcp_socket_utils", + "//src/core:socket_mutator", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/event_engine/posix/posix_engine_listener_utils_test.cc b/test/core/event_engine/posix/posix_engine_listener_utils_test.cc new file mode 100644 index 00000000000..b212c9962e8 --- /dev/null +++ b/test/core/event_engine/posix/posix_engine_listener_utils_test.cc @@ -0,0 +1,162 @@ +// Copyright 2022 gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" +#include "gtest/gtest.h" + +#include + +#include "src/core/lib/iomgr/port.h" + +// This test won't work except with posix sockets enabled +#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON + +#include + +#include + +#include + +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h" +#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" +#include "test/core/util/port.h" + +namespace grpc_event_engine { +namespace posix_engine { + +namespace { + +using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig; + +class TestListenerSocketsContainer : public ListenerSocketsContainer { + public: + void Append(ListenerSocket socket) override { sockets_.push_back(socket); } + + absl::StatusOr Find( + const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr) + override { + for (auto socket = sockets_.begin(); socket != sockets_.end(); ++socket) { + if (socket->addr.size() == addr.size() && + memcmp(socket->addr.address(), addr.address(), addr.size()) == 0) { + return *socket; + } + } + return absl::NotFoundError("Socket not found!"); + } + + int Size() { return static_cast(sockets_.size()); } + + std::list::const_iterator begin() { return sockets_.begin(); } + std::list::const_iterator end() { return sockets_.end(); } + + private: + std::list sockets_; +}; + +} // namespace + +TEST(PosixEngineListenerUtils, ListenerContainerAddWildcardAddressesTest) { + TestListenerSocketsContainer listener_sockets; + int port = grpc_pick_unused_port_or_die(); + ChannelArgsEndpointConfig config; + auto result = ListenerContainerAddWildcardAddresses( + listener_sockets, TcpOptionsFromEndpointConfig(config), port); + EXPECT_TRUE(result.ok()); + EXPECT_GT(*result, 0); + port = *result; + EXPECT_GE(listener_sockets.Size(), 1); + EXPECT_LE(listener_sockets.Size(), 2); + for (auto socket = listener_sockets.begin(); socket != listener_sockets.end(); + ++socket) { + ASSERT_TRUE((*socket).addr.address()->sa_family == AF_INET6 || + (*socket).addr.address()->sa_family == AF_INET); + if ((*socket).addr.address()->sa_family == AF_INET6) { + EXPECT_EQ(SockaddrToString(&(*socket).addr, true).value(), + absl::StrCat("[::]:", std::to_string(port))); + } else if ((*socket).addr.address()->sa_family == AF_INET) { + EXPECT_EQ(SockaddrToString(&(*socket).addr, true).value(), + absl::StrCat("0.0.0.0:", std::to_string(port))); + } + close(socket->sock.Fd()); + } +} + +#ifdef GRPC_HAVE_IFADDRS +TEST(PosixEngineListenerUtils, ListenerContainerAddAllLocalAddressesTest) { + TestListenerSocketsContainer listener_sockets; + int port = grpc_pick_unused_port_or_die(); + ChannelArgsEndpointConfig config; + struct ifaddrs* ifa = nullptr; + struct ifaddrs* ifa_it; + if (getifaddrs(&ifa) != 0 || ifa == nullptr) { + // No ifaddresses available. + gpr_log(GPR_INFO, + "Skipping ListenerAddAllLocalAddressesTest because the machine " + "does not have interfaces configured for listening."); + return; + } + int num_ifaddrs = 0; + for (ifa_it = ifa; ifa_it != nullptr; ifa_it = ifa_it->ifa_next) { + ++num_ifaddrs; + } + freeifaddrs(ifa); + auto result = ListenerContainerAddAllLocalAddresses( + listener_sockets, TcpOptionsFromEndpointConfig(config), port); + if (num_ifaddrs == 0 || !result.ok()) { + // Its possible that the machine may not have any Ipv4/Ipv6 interfaces + // configured for listening. In that case, dont fail test. + gpr_log(GPR_INFO, + "Skipping ListenerAddAllLocalAddressesTest because the machine " + "does not have Ipv6/Ipv6 interfaces configured for listening."); + return; + } + // Some sockets have been created and bound to interfaces on the machiene. + // Verify that they are listening on the correct port. + EXPECT_GT(*result, 0); + port = *result; + EXPECT_GE(listener_sockets.Size(), 1); + EXPECT_LE(listener_sockets.Size(), num_ifaddrs); + for (auto socket = listener_sockets.begin(); socket != listener_sockets.end(); + ++socket) { + ASSERT_TRUE((*socket).addr.address()->sa_family == AF_INET6 || + (*socket).addr.address()->sa_family == AF_INET); + EXPECT_EQ(SockaddrGetPort((*socket).addr), port); + close(socket->sock.Fd()); + } +} +#endif // GRPC_HAVE_IFADDRS + +} // namespace posix_engine +} // namespace grpc_event_engine + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else /* GRPC_POSIX_SOCKET_UTILS_COMMON */ + +int main(int argc, char** argv) { return 0; } + +#endif /* GRPC_POSIX_SOCKET_UTILS_COMMON */ diff --git a/test/core/event_engine/posix/tcp_posix_socket_utils_test.cc b/test/core/event_engine/posix/tcp_posix_socket_utils_test.cc index b2e259e1a4c..cc39c41da31 100644 --- a/test/core/event_engine/posix/tcp_posix_socket_utils_test.cc +++ b/test/core/event_engine/posix/tcp_posix_socket_utils_test.cc @@ -384,6 +384,24 @@ TEST(TcpPosixSocketUtilsTest, SockAddrToStringTest) { #endif } +TEST(TcpPosixSocketUtilsTest, SockAddrPortTest) { + EventEngine::ResolvedAddress wild6 = SockaddrMakeWild6(20); + EventEngine::ResolvedAddress wild4 = SockaddrMakeWild4(20); + // Verify the string description matches the expected wildcard address with + // correct port number. + EXPECT_EQ(SockaddrToString(&wild6, true).value(), "[::]:20"); + EXPECT_EQ(SockaddrToString(&wild4, true).value(), "0.0.0.0:20"); + // Update the port values. + SockaddrSetPort(wild4, 21); + SockaddrSetPort(wild6, 22); + // Read back the port values. + EXPECT_EQ(SockaddrGetPort(wild4), 21); + EXPECT_EQ(SockaddrGetPort(wild6), 22); + // Ensure the string description reflects the updated port values. + EXPECT_EQ(SockaddrToString(&wild4, true).value(), "0.0.0.0:21"); + EXPECT_EQ(SockaddrToString(&wild6, true).value(), "[::]:22"); +} + } // namespace posix_engine } // namespace grpc_event_engine @@ -394,6 +412,6 @@ int main(int argc, char** argv) { #else /* GRPC_POSIX_SOCKET_UTILS_COMMON */ -int main(int argc, char** argv) { return 1; } +int main(int argc, char** argv) { return 0; } #endif /* GRPC_POSIX_SOCKET_UTILS_COMMON */ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 6473637cac6..ca0a9024f2c 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -5333,6 +5333,28 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "posix_engine_listener_utils_test", + "platforms": [ + "linux", + "mac", + "posix" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,