diff --git a/BUILD b/BUILD index f9e008e361a..e24e8909387 100644 --- a/BUILD +++ b/BUILD @@ -2789,6 +2789,34 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "posix_event_engine_tcp_socket_utils", + srcs = [ + "src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc", + ], + hdrs = [ + "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h", + ], + external_deps = [ + "absl/status", + "absl/status:statusor", + "absl/strings", + "absl/strings:str_format", + "absl/types:optional", + "absl/utility", + ], + deps = [ + "event_engine_base_hdrs", + "gpr", + "grpc_codegen", + "iomgr_port", + "ref_counted_ptr", + "resource_quota", + "socket_mutator", + "useful", + ], +) + grpc_cc_library( name = "posix_event_engine", srcs = ["src/core/lib/event_engine/posix_engine/posix_engine.cc"], @@ -3016,6 +3044,22 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "socket_mutator", + srcs = [ + "src/core/lib/iomgr/socket_mutator.cc", + ], + hdrs = [ + "src/core/lib/iomgr/socket_mutator.h", + ], + deps = [ + "channel_args", + "gpr", + "grpc_codegen", + "useful", + ], +) + grpc_cc_library( name = "backoff", srcs = [ diff --git a/CMakeLists.txt b/CMakeLists.txt index 1e750b36992..023d843552b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1180,6 +1180,9 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx tcp_client_posix_test) endif() + if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + add_dependencies(buildtests_cxx tcp_posix_socket_utils_test) + endif() if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx tcp_server_posix_test) endif() @@ -17734,6 +17737,45 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) ) +endif() +endif() +if(gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) + + add_executable(tcp_posix_socket_utils_test + src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc + src/core/lib/iomgr/socket_mutator.cc + test/core/event_engine/posix/tcp_posix_socket_utils_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc + ) + + target_include_directories(tcp_posix_socket_utils_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} + ) + + target_link_libraries(tcp_posix_socket_utils_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + ) + + endif() endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 26a526072be..79f33e592f3 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -9767,6 +9767,24 @@ targets: - linux - posix - mac +- name: tcp_posix_socket_utils_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/event_engine/posix_engine/tcp_socket_utils.h + - src/core/lib/iomgr/socket_mutator.h + src: + - src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc + - src/core/lib/iomgr/socket_mutator.cc + - test/core/event_engine/posix/tcp_posix_socket_utils_test.cc + deps: + - grpc_test_util + platforms: + - linux + - posix + - mac + uses_polling: false - name: tcp_server_posix_test gtest: true build: test diff --git a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc new file mode 100644 index 00000000000..87421b52e95 --- /dev/null +++ b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc @@ -0,0 +1,883 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include + +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" + +#include + +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON +#ifdef GRPC_LINUX_TCP_H +#include +#else +#include // IWYU pragma: keep +#include +#endif +#include +#include +#include +#endif + +#include +#include + +#include "absl/status/status.h" +#include "absl/strings/str_format.h" + +#include +#include + +#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" +#include "src/core/lib/gprpp/host_port.h" + +#ifdef GRPC_HAVE_UNIX_SOCKET +#include +#endif + +namespace grpc_event_engine { +namespace posix_engine { + +using ::grpc_event_engine::experimental::EndpointConfig; +using ::grpc_event_engine::experimental::EventEngine; + +namespace { + +int AdjustValue(int default_value, int min_value, int max_value, + absl::optional actual_value) { + if (!actual_value.has_value() || *actual_value < min_value || + *actual_value > max_value) { + return default_value; + } + return *actual_value; +} + +// The default values for TCP_USER_TIMEOUT are currently configured to be in +// line with the default values of KEEPALIVE_TIMEOUT as proposed in +// https://github.com/grpc/proposal/blob/master/A18-tcp-user-timeout.md */ +int kDefaultClientUserTimeoutMs = 20000; +int kDefaultServerUserTimeoutMs = 20000; +bool kDefaultClientUserTimeoutEnabled = false; +bool kDefaultServerUserTimeoutEnabled = true; + +#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON + +absl::Status ErrorForFd( + int fd, const experimental::EventEngine::ResolvedAddress& addr) { + if (fd >= 0) return absl::OkStatus(); + const char* addr_str = reinterpret_cast(addr.address()); + return absl::Status(absl::StatusCode::kInternal, + absl::StrCat("socket: ", strerror(errno), + std::string(addr_str, addr.size()))); +} + +int CreateSocket(std::function socket_factory, int family, + int type, int protocol) { + return socket_factory != nullptr ? socket_factory(family, type, protocol) + : socket(family, type, protocol); +} + +const uint8_t kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; + +#endif /* GRPC_POSIX_SOCKET_UTILS_COMMON */ + +} // namespace + +PosixTcpOptions TcpOptionsFromEndpointConfig(const EndpointConfig& config) { + void* value; + PosixTcpOptions options; + options.tcp_read_chunk_size = AdjustValue( + PosixTcpOptions::kDefaultReadChunkSize, 1, PosixTcpOptions::kMaxChunkSize, + config.GetInt(GRPC_ARG_TCP_READ_CHUNK_SIZE)); + options.tcp_min_read_chunk_size = + AdjustValue(PosixTcpOptions::kDefaultMinReadChunksize, 1, + PosixTcpOptions::kMaxChunkSize, + config.GetInt(GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE)); + options.tcp_max_read_chunk_size = + AdjustValue(PosixTcpOptions::kDefaultMaxReadChunksize, 1, + PosixTcpOptions::kMaxChunkSize, + config.GetInt(GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE)); + options.tcp_tx_zerocopy_send_bytes_threshold = + AdjustValue(PosixTcpOptions::kDefaultSendBytesThreshold, 0, INT_MAX, + config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD)); + options.tcp_tx_zerocopy_max_simultaneous_sends = + AdjustValue(PosixTcpOptions::kDefaultMaxSends, 0, INT_MAX, + config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS)); + options.tcp_tx_zero_copy_enabled = + (AdjustValue(PosixTcpOptions::kZerocpTxEnabledDefault, 0, 1, + config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED)) != 0); + options.keep_alive_time_ms = + AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_KEEPALIVE_TIME_MS)); + options.keep_alive_timeout_ms = + AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS)); + options.expand_wildcard_addrs = + (AdjustValue(0, 1, INT_MAX, + config.GetInt(GRPC_ARG_EXPAND_WILDCARD_ADDRS)) != 0); + options.allow_reuse_port = + (AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_ALLOW_REUSEPORT)) != + 0); + + if (options.tcp_min_read_chunk_size > options.tcp_max_read_chunk_size) { + options.tcp_min_read_chunk_size = options.tcp_max_read_chunk_size; + } + options.tcp_read_chunk_size = grpc_core::Clamp( + options.tcp_read_chunk_size, options.tcp_min_read_chunk_size, + options.tcp_max_read_chunk_size); + + value = config.GetVoidPointer(GRPC_ARG_RESOURCE_QUOTA); + if (value != nullptr) { + options.resource_quota = + reinterpret_cast(value)->Ref(); + } + value = config.GetVoidPointer(GRPC_ARG_SOCKET_MUTATOR); + if (value != nullptr) { + options.socket_mutator = + grpc_socket_mutator_ref(static_cast(value)); + } + return options; +} + +#ifdef GRPC_POSIX_SOCKETUTILS + +int Accept4(int sockfd, + grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr, + int nonblock, int cloexec) { + int fd, flags; + socklen_t len = addr.size(); + fd = accept(sockfd, const_cast(addr.address()), &len); + if (fd >= 0) { + if (nonblock) { + flags = fcntl(fd, F_GETFL, 0); + if (flags < 0) goto close_and_error; + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0) goto close_and_error; + } + if (cloexec) { + flags = fcntl(fd, F_GETFD, 0); + if (flags < 0) goto close_and_error; + if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC) != 0) goto close_and_error; + } + } + return fd; + +close_and_error: + close(fd); + return -1; +} + +#elif GRPC_LINUX_SOCKETUTILS + +int Accept4(int sockfd, + grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr, + int nonblock, int cloexec) { + int flags = 0; + flags |= nonblock ? SOCK_NONBLOCK : 0; + flags |= cloexec ? SOCK_CLOEXEC : 0; + socklen_t len = addr.size(); + return accept4(sockfd, const_cast(addr.address()), &len, flags); +} + +#endif /* GRPC_LINUX_SOCKETUTILS */ + +#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON + +bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* resolved_addr, + EventEngine::ResolvedAddress* resolved_addr4_out) { + const sockaddr* addr = resolved_addr->address(); + if (addr->sa_family == AF_INET6) { + const sockaddr_in6* addr6 = reinterpret_cast(addr); + sockaddr_in* addr4_out = + resolved_addr4_out == nullptr + ? nullptr + : reinterpret_cast( + const_cast(resolved_addr4_out->address())); + + if (memcmp(addr6->sin6_addr.s6_addr, kV4MappedPrefix, + sizeof(kV4MappedPrefix)) == 0) { + if (resolved_addr4_out != nullptr) { + // Normalize ::ffff:0.0.0.0/96 to IPv4. + memset(addr4_out, 0, sizeof(sockaddr_in)); + addr4_out->sin_family = AF_INET; + // s6_addr32 would be nice, but it's non-standard. + memcpy(&addr4_out->sin_addr, &addr6->sin6_addr.s6_addr[12], 4); + addr4_out->sin_port = addr6->sin6_port; + *resolved_addr4_out = EventEngine::ResolvedAddress( + reinterpret_cast(addr4_out), + static_cast(sizeof(sockaddr_in))); + } + return true; + } + } + return false; +} + +bool SockaddrToV4Mapped(const EventEngine::ResolvedAddress* resolved_addr, + EventEngine::ResolvedAddress* resolved_addr6_out) { + GPR_ASSERT(resolved_addr != resolved_addr6_out); + const sockaddr* addr = resolved_addr->address(); + sockaddr_in6* addr6_out = const_cast( + reinterpret_cast(resolved_addr6_out->address())); + if (addr->sa_family == AF_INET) { + const sockaddr_in* addr4 = reinterpret_cast(addr); + memset(resolved_addr6_out, 0, sizeof(*resolved_addr6_out)); + addr6_out->sin6_family = AF_INET6; + memcpy(&addr6_out->sin6_addr.s6_addr[0], kV4MappedPrefix, 12); + memcpy(&addr6_out->sin6_addr.s6_addr[12], &addr4->sin_addr, 4); + addr6_out->sin6_port = addr4->sin_port; + *resolved_addr6_out = EventEngine::ResolvedAddress( + reinterpret_cast(addr6_out), + static_cast(sizeof(sockaddr_in6))); + return true; + } + return false; +} + +absl::StatusOr SockaddrToString( + const EventEngine::ResolvedAddress* resolved_addr, bool normalize) { + const int save_errno = errno; + EventEngine::ResolvedAddress addr_normalized; + if (normalize && SockaddrIsV4Mapped(resolved_addr, &addr_normalized)) { + resolved_addr = &addr_normalized; + } + const sockaddr* addr = + reinterpret_cast(resolved_addr->address()); + std::string out; +#ifdef GRPC_HAVE_UNIX_SOCKET + if (addr->sa_family == AF_UNIX) { + const sockaddr_un* addr_un = reinterpret_cast(addr); + bool abstract = addr_un->sun_path[0] == '\0'; + if (abstract) { +#ifdef GPR_APPLE + int len = resolved_addr->size() - sizeof(addr_un->sun_family) - + sizeof(addr_un->sun_len); +#else + int len = resolved_addr->size() - sizeof(addr_un->sun_family); +#endif + if (len <= 0) { + return absl::InvalidArgumentError("Empty UDS abstract path"); + } + out = std::string(addr_un->sun_path, len); + } else { + size_t maxlen = sizeof(addr_un->sun_path); + if (strnlen(addr_un->sun_path, maxlen) == maxlen) { + return absl::InvalidArgumentError("UDS path is not null-terminated"); + } + out = std::string(addr_un->sun_path); + } + return out; + } +#endif + + const void* ip = nullptr; + int port = 0; + uint32_t sin6_scope_id = 0; + if (addr->sa_family == AF_INET) { + const sockaddr_in* addr4 = reinterpret_cast(addr); + ip = &addr4->sin_addr; + port = ntohs(addr4->sin_port); + } else if (addr->sa_family == AF_INET6) { + const sockaddr_in6* addr6 = reinterpret_cast(addr); + ip = &addr6->sin6_addr; + port = ntohs(addr6->sin6_port); + sin6_scope_id = addr6->sin6_scope_id; + } + char ntop_buf[INET6_ADDRSTRLEN]; + if (ip != nullptr && + inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != nullptr) { + if (sin6_scope_id != 0) { + // Enclose sin6_scope_id with the format defined in RFC 6874 section 2. + std::string host_with_scope = + absl::StrFormat("%s%%%" PRIu32, ntop_buf, sin6_scope_id); + out = grpc_core::JoinHostPort(host_with_scope, port); + } else { + out = grpc_core::JoinHostPort(ntop_buf, port); + } + } else { + return absl::InvalidArgumentError( + absl::StrCat("Unknown sockaddr family: ", addr->sa_family)); + } + // This is probably redundant, but we wouldn't want to log the wrong error. + errno = save_errno; + return out; +} + +// Instruct the kernel to wait for specified number of bytes to be received on +// the socket before generating an interrupt for packet receive. If the call +// succeeds, it returns the number of bytes (wait threshold) that was actually +// set. +absl::StatusOr PosixSocketWrapper::SetSocketRcvLowat(int bytes) { + if (setsockopt(fd_, SOL_SOCKET, SO_RCVLOWAT, &bytes, sizeof(bytes)) != 0) { + return absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("setsockopt(SO_RCVLOWAT): ", strerror(errno))); + } + return bytes; +} + +// Set a socket to use zerocopy +absl::Status PosixSocketWrapper::SetSocketZeroCopy() { +#ifdef GRPC_LINUX_ERRQUEUE + const int enable = 1; + auto err = setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable)); + if (err != 0) { + return absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("setsockopt(SO_ZEROCOPY): ", strerror(errno))); + } + return absl::OkStatus(); +#else + return absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("setsockopt(SO_ZEROCOPY): ", strerror(ENOSYS))); +#endif +} + +// Set a socket to non blocking mode +absl::Status PosixSocketWrapper::SetSocketNonBlocking(int non_blocking) { + int oldflags = fcntl(fd_, F_GETFL, 0); + if (oldflags < 0) { + return absl::Status(absl::StatusCode::kInternal, + absl::StrCat("fcntl: ", strerror(errno))); + } + + if (non_blocking) { + oldflags |= O_NONBLOCK; + } else { + oldflags &= ~O_NONBLOCK; + } + + if (fcntl(fd_, F_SETFL, oldflags) != 0) { + return absl::Status(absl::StatusCode::kInternal, + absl::StrCat("fcntl: ", strerror(errno))); + } + + return absl::OkStatus(); +} + +absl::Status PosixSocketWrapper::SetSocketNoSigpipeIfPossible() { +#ifdef GRPC_HAVE_SO_NOSIGPIPE + int val = 1; + int newval; + socklen_t intlen = sizeof(newval); + if (0 != setsockopt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) { + return absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("setsockopt(SO_NOSIGPIPE): ", strerror(errno))); + } + if (0 != getsockopt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) { + return absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("getsockopt(SO_NOSIGPIPE): ", strerror(errno))); + } + if ((newval != 0) != (val != 0)) { + return absl::Status(absl::StatusCode::kInternal, + "Failed to set SO_NOSIGPIPE"); + } +#endif + return absl::OkStatus(); +} + +absl::Status PosixSocketWrapper::SetSocketIpPktInfoIfPossible() { +#ifdef GRPC_HAVE_IP_PKTINFO + int get_local_ip = 1; + if (0 != setsockopt(fd_, IPPROTO_IP, IP_PKTINFO, &get_local_ip, + sizeof(get_local_ip))) { + return absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("setsockopt(IP_PKTINFO): ", strerror(errno))); + } +#endif + return absl::OkStatus(); +} + +absl::Status PosixSocketWrapper::SetSocketIpv6RecvPktInfoIfPossible() { +#ifdef GRPC_HAVE_IPV6_RECVPKTINFO + int get_local_ip = 1; + if (0 != setsockopt(fd_, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip, + sizeof(get_local_ip))) { + return absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("setsockopt(IPV6_RECVPKTINFO): ", strerror(errno))); + } +#endif + return absl::OkStatus(); +} + +absl::Status PosixSocketWrapper::SetSocketSndBuf(int buffer_size_bytes) { + return 0 == setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &buffer_size_bytes, + sizeof(buffer_size_bytes)) + ? absl::OkStatus() + : absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("setsockopt(SO_SNDBUF): ", strerror(errno))); +} + +absl::Status PosixSocketWrapper::SetSocketRcvBuf(int buffer_size_bytes) { + return 0 == setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &buffer_size_bytes, + sizeof(buffer_size_bytes)) + ? absl::OkStatus() + : absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("setsockopt(SO_RCVBUF): ", strerror(errno))); +} + +// Set a socket to close on exec +absl::Status PosixSocketWrapper::SetSocketCloexec(int close_on_exec) { + int oldflags = fcntl(fd_, F_GETFD, 0); + if (oldflags < 0) { + return absl::Status(absl::StatusCode::kInternal, + absl::StrCat("fcntl: ", strerror(errno))); + } + + if (close_on_exec) { + oldflags |= FD_CLOEXEC; + } else { + oldflags &= ~FD_CLOEXEC; + } + + if (fcntl(fd_, F_SETFD, oldflags) != 0) { + return absl::Status(absl::StatusCode::kInternal, + absl::StrCat("fcntl: ", strerror(errno))); + } + + return absl::OkStatus(); +} + +// set a socket to reuse old addresses +absl::Status PosixSocketWrapper::SetSocketReuseAddr(int reuse) { + int val = (reuse != 0); + int newval; + socklen_t intlen = sizeof(newval); + if (0 != setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val))) { + return absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("setsockopt(SO_REUSEADDR): ", strerror(errno))); + } + if (0 != getsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen)) { + return absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("getsockopt(SO_REUSEADDR): ", strerror(errno))); + } + if ((newval != 0) != val) { + return absl::Status(absl::StatusCode::kInternal, + "Failed to set SO_REUSEADDR"); + } + + return absl::OkStatus(); +} + +// set a socket to reuse old ports +absl::Status PosixSocketWrapper::SetSocketReusePort(int reuse) { +#ifndef SO_REUSEPORT + return absl::Status(absl::StatusCode::kInternal, + "SO_REUSEPORT unavailable on compiling system"); +#else + int val = (reuse != 0); + int newval; + socklen_t intlen = sizeof(newval); + if (0 != setsockopt(fd_, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val))) { + return absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("setsockopt(SO_REUSEPORT): ", strerror(errno))); + } + if (0 != getsockopt(fd_, SOL_SOCKET, SO_REUSEPORT, &newval, &intlen)) { + return absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("getsockopt(SO_REUSEPORT): ", strerror(errno))); + } + if ((newval != 0) != val) { + return absl::Status(absl::StatusCode::kInternal, + "Failed to set SO_REUSEPORT"); + } + + return absl::OkStatus(); +#endif +} + +bool PosixSocketWrapper::IsSocketReusePortSupported() { + static bool kSupportSoReusePort = []() -> bool { + int s = socket(AF_INET, SOCK_STREAM, 0); + if (s < 0) { + // This might be an ipv6-only environment in which case + // 'socket(AF_INET,..)' call would fail. Try creating IPv6 socket in that + // case + s = socket(AF_INET6, SOCK_STREAM, 0); + } + if (s >= 0) { + PosixSocketWrapper sock(s); + return sock.SetSocketReusePort(1).ok(); + } else { + return false; + } + }(); + return kSupportSoReusePort; +} + +// Disable nagle algorithm +absl::Status PosixSocketWrapper::SetSocketLowLatency(int low_latency) { + int val = (low_latency != 0); + int newval; + socklen_t intlen = sizeof(newval); + if (0 != setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) { + return absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("setsockopt(TCP_NODELAY): ", strerror(errno))); + } + if (0 != getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen)) { + return absl::Status( + absl::StatusCode::kInternal, + absl::StrCat("getsockopt(TCP_NODELAY): ", strerror(errno))); + } + if ((newval != 0) != val) { + return absl::Status(absl::StatusCode::kInternal, + "Failed to set TCP_NODELAY"); + } + return absl::OkStatus(); +} + +#if GPR_LINUX == 1 +// For Linux, it will be detected to support TCP_USER_TIMEOUT +#ifndef TCP_USER_TIMEOUT +#define TCP_USER_TIMEOUT 18 +#endif +#define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT 0 +#else +// For non-Linux, TCP_USER_TIMEOUT will be used if TCP_USER_TIMEOUT is defined. +#ifdef TCP_USER_TIMEOUT +#define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT 0 +#else +#define TCP_USER_TIMEOUT 0 +#define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT -1 +#endif // TCP_USER_TIMEOUT +#endif // GPR_LINUX == 1 + +// Whether the socket supports TCP_USER_TIMEOUT option. +// (0: don't know, 1: support, -1: not support) +static std::atomic g_socket_supports_tcp_user_timeout( + SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT); + +void PosixSocketWrapper::ConfigureDefaultTcpUserTimeout(bool enable, + int timeout, + bool is_client) { + if (is_client) { + kDefaultClientUserTimeoutEnabled = enable; + if (timeout > 0) { + kDefaultClientUserTimeoutMs = timeout; + } + } else { + kDefaultServerUserTimeoutEnabled = enable; + if (timeout > 0) { + kDefaultServerUserTimeoutMs = timeout; + } + } +} + +// Set TCP_USER_TIMEOUT +void PosixSocketWrapper::TrySetSocketTcpUserTimeout( + const PosixTcpOptions& options, bool is_client) { + if (g_socket_supports_tcp_user_timeout.load() < 0) { + return; + } + bool enable = is_client ? kDefaultClientUserTimeoutEnabled + : kDefaultServerUserTimeoutEnabled; + int timeout = + is_client ? kDefaultClientUserTimeoutMs : kDefaultServerUserTimeoutMs; + if (options.keep_alive_time_ms > 0) { + enable = options.keep_alive_time_ms != INT_MAX; + } + if (options.keep_alive_timeout_ms > 0) { + timeout = options.keep_alive_timeout_ms; + } + if (enable) { + int newval; + socklen_t len = sizeof(newval); + // If this is the first time to use TCP_USER_TIMEOUT, try to check + // if it is available. + if (g_socket_supports_tcp_user_timeout.load() == 0) { + if (0 != getsockopt(fd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) { + gpr_log(GPR_INFO, + "TCP_USER_TIMEOUT is not available. TCP_USER_TIMEOUT won't " + "be used thereafter"); + g_socket_supports_tcp_user_timeout.store(-1); + } else { + gpr_log(GPR_INFO, + "TCP_USER_TIMEOUT is available. TCP_USER_TIMEOUT will be " + "used thereafter"); + g_socket_supports_tcp_user_timeout.store(1); + } + } + if (g_socket_supports_tcp_user_timeout.load() > 0) { + if (0 != setsockopt(fd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout, + sizeof(timeout))) { + gpr_log(GPR_ERROR, "setsockopt(TCP_USER_TIMEOUT) %s", strerror(errno)); + return; + } + if (0 != getsockopt(fd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) { + gpr_log(GPR_ERROR, "getsockopt(TCP_USER_TIMEOUT) %s", strerror(errno)); + return; + } + if (newval != timeout) { + // Do not fail on failing to set TCP_USER_TIMEOUT + gpr_log(GPR_ERROR, "Failed to set TCP_USER_TIMEOUT"); + return; + } + } + } +} + +// Set a socket using a grpc_socket_mutator +absl::Status PosixSocketWrapper::SetSocketMutator( + grpc_fd_usage usage, grpc_socket_mutator* mutator) { + GPR_ASSERT(mutator); + if (!grpc_socket_mutator_mutate_fd(mutator, fd_, usage)) { + return absl::Status(absl::StatusCode::kInternal, + "grpc_socket_mutator failed."); + } + return absl::OkStatus(); +} + +absl::Status PosixSocketWrapper::ApplySocketMutatorInOptions( + grpc_fd_usage usage, const PosixTcpOptions& options) { + if (options.socket_mutator == nullptr) { + return absl::OkStatus(); + } + return SetSocketMutator(usage, options.socket_mutator); +} + +bool PosixSocketWrapper::SetSocketDualStack() { + const int off = 0; + return 0 == setsockopt(fd_, IPPROTO_IPV6, IPV6_V6ONLY, &off, sizeof(off)); +} + +bool PosixSocketWrapper::IsIpv6LoopbackAvailable() { + static bool kIpv6LoopbackAvailable = []() -> bool { + int fd = socket(AF_INET6, SOCK_STREAM, 0); + bool loopback_available = false; + if (fd < 0) { + gpr_log(GPR_INFO, "Disabling AF_INET6 sockets because socket() failed."); + } else { + sockaddr_in6 addr; + memset(&addr, 0, sizeof(addr)); + addr.sin6_family = AF_INET6; + addr.sin6_addr.s6_addr[15] = 1; /* [::1]:0 */ + if (bind(fd, reinterpret_cast(&addr), sizeof(addr)) == 0) { + loopback_available = true; + } else { + gpr_log(GPR_INFO, + "Disabling AF_INET6 sockets because ::1 is not available."); + } + close(fd); + } + return loopback_available; + }(); + return kIpv6LoopbackAvailable; +} + +absl::StatusOr +PosixSocketWrapper::LocalAddress() { + EventEngine::ResolvedAddress addr; + socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES; + if (getsockname(fd_, const_cast(addr.address()), &len) < 0) { + return absl::InternalError(absl::StrCat("getsockname:", strerror(errno))); + } + return addr; +} + +absl::StatusOr PosixSocketWrapper::PeerAddress() { + EventEngine::ResolvedAddress addr; + socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES; + if (getpeername(fd_, const_cast(addr.address()), &len) < 0) { + return absl::InternalError(absl::StrCat("getpeername:", strerror(errno))); + } + return addr; +} + +absl::StatusOr PosixSocketWrapper::LocalAddressString() { + auto status = LocalAddress(); + if (!status.ok()) { + return status.status(); + } + return SockaddrToString(&(*status), true); +} + +absl::StatusOr PosixSocketWrapper::PeerAddressString() { + auto status = PeerAddress(); + if (!status.ok()) { + return status.status(); + } + return SockaddrToString(&(*status), true); +} + +absl::StatusOr PosixSocketWrapper::CreateDualStackSocket( + std::function socket_factory, + const experimental::EventEngine::ResolvedAddress& addr, int type, + int protocol, PosixSocketWrapper::DSMode& dsmode) { + const sockaddr* sock_addr = addr.address(); + int family = sock_addr->sa_family; + int newfd; + if (family == AF_INET6) { + if (IsIpv6LoopbackAvailable()) { + newfd = CreateSocket(socket_factory, family, type, protocol); + } else { + newfd = -1; + errno = EAFNOSUPPORT; + } + if (newfd < 0) { + return ErrorForFd(newfd, addr); + } + PosixSocketWrapper sock(newfd); + // Check if we've got a valid dualstack socket. + if (sock.SetSocketDualStack()) { + dsmode = PosixSocketWrapper::DSMode::DSMODE_DUALSTACK; + return sock; + } + // If this isn't an IPv4 address, then return whatever we've got. + if (!SockaddrIsV4Mapped(&addr, nullptr)) { + dsmode = PosixSocketWrapper::DSMode::DSMODE_IPV6; + return sock; + } + // Fall back to AF_INET. + if (newfd >= 0) { + close(newfd); + } + family = AF_INET; + } + dsmode = family == AF_INET ? PosixSocketWrapper::DSMode::DSMODE_IPV4 + : PosixSocketWrapper::DSMode::DSMODE_NONE; + newfd = CreateSocket(socket_factory, family, type, protocol); + if (newfd < 0) { + return ErrorForFd(newfd, addr); + } + return PosixSocketWrapper(newfd); +} + +#else /* GRPC_POSIX_SOCKET_UTILS_COMMON */ + +bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* /*resolved_addr*/, + EventEngine::ResolvedAddress* /*resolved_addr4_out*/) { + GPR_ASSERT(false && "unimplemented"); +} + +bool SockaddrToV4Mapped(const EventEngine::ResolvedAddress* /*resolved_addr*/, + EventEngine::ResolvedAddress* /*resolved_addr6_out*/) { + GPR_ASSERT(false && "unimplemented"); +} + +absl::StatusOr SockaddrToString( + const EventEngine::ResolvedAddress* /*resolved_addr*/, bool /*normalize*/) { + GPR_ASSERT(false && "unimplemented"); +} + +absl::StatusOr PosixSocketWrapper::SetSocketRcvLowat(int /*bytes*/) { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status PosixSocketWrapper::SetSocketZeroCopy() { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status PosixSocketWrapper::SetSocketNonBlocking(int /*non_blocking*/) { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status PosixSocketWrapper::SetSocketCloexec(int /*close_on_exec*/) { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status PosixSocketWrapper::SetSocketReuseAddr(int /*reuse*/) { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status PosixSocketWrapper::SetSocketLowLatency(int /*low_latency*/) { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status PosixSocketWrapper::SetSocketReusePort(int /*reuse*/) { + GPR_ASSERT(false && "unimplemented"); +} + +void PosixSocketWrapper::ConfigureDefaultTcpUserTimeout(bool /*enable*/, + int /*timeout*/, + bool /*is_client*/) {} + +void PosixSocketWrapper::TrySetSocketTcpUserTimeout( + const PosixTcpOptions& /*options*/, bool /*is_client*/) { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status PosixSocketWrapper::SetSocketNoSigpipeIfPossible() { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status PosixSocketWrapper::SetSocketIpPktInfoIfPossible() { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status PosixSocketWrapper::SetSocketIpv6RecvPktInfoIfPossible() { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status PosixSocketWrapper::SetSocketSndBuf(int /*buffer_size_bytes*/) { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status PosixSocketWrapper::SetSocketRcvBuf(int /*buffer_size_bytes*/) { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status PosixSocketWrapper::SetSocketMutator( + grpc_fd_usage /*usage*/, grpc_socket_mutator* /*mutator*/) { + GPR_ASSERT(false && "unimplemented"); +} + +absl::Status PosixSocketWrapper::ApplySocketMutatorInOptions( + grpc_fd_usage /*usage*/, const PosixTcpOptions& /*options*/) { + GPR_ASSERT(false && "unimplemented"); +} + +bool PosixSocketWrapper::SetSocketDualStack() { + GPR_ASSERT(false && "unimplemented"); +} + +static bool PosixSocketWrapper::IsSocketReusePortSupported() { + GPR_ASSERT(false && "unimplemented"); +} + +static bool PosixSocketWrapper::IsIpv6LoopbackAvailable() { + GPR_ASSERT(false && "unimplemented"); +} + +static absl::StatusOr +PosixSocketWrapper::CreateDualStackSocket( + std::function + /* socket_factory */, + const experimental::EventEngine::ResolvedAddress& /*addr*/, int /*type*/, + int /*protocol*/, DSMode& /*dsmode*/) { + GPR_ASSERT(false && "unimplemented"); +} +} + +#endif /* GRPC_POSIX_SOCKET_UTILS_COMMON */ + +} // namespace posix_engine +} // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h new file mode 100644 index 00000000000..8a030b4fa48 --- /dev/null +++ b/src/core/lib/event_engine/posix_engine/tcp_socket_utils.h @@ -0,0 +1,311 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H +#define GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H + +#include + +#include + +#include +#include +#include + +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/utility/utility.h" + +#include +#include +#include +#include + +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/port.h" +#include "src/core/lib/iomgr/socket_mutator.h" +#include "src/core/lib/resource_quota/resource_quota.h" + +#ifdef GRPC_LINUX_ERRQUEUE +#ifndef SO_ZEROCOPY +#define SO_ZEROCOPY 60 +#endif +#ifndef SO_EE_ORIGIN_ZEROCOPY +#define SO_EE_ORIGIN_ZEROCOPY 5 +#endif +#endif /* ifdef GRPC_LINUX_ERRQUEUE */ + +namespace grpc_event_engine { +namespace posix_engine { + +using ::grpc_event_engine::experimental::EventEngine; + +struct PosixTcpOptions { + static constexpr int kDefaultReadChunkSize = 8192; + static constexpr int kDefaultMinReadChunksize = 256; + static constexpr int kDefaultMaxReadChunksize = 4 * 1024 * 1024; + static constexpr int kZerocpTxEnabledDefault = 0; + static constexpr int kMaxChunkSize = 32 * 1024 * 1024; + static constexpr int kDefaultMaxSends = 4; + static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024; + int tcp_read_chunk_size = kDefaultReadChunkSize; + int tcp_min_read_chunk_size = kDefaultMinReadChunksize; + int tcp_max_read_chunk_size = kDefaultMaxReadChunksize; + int tcp_tx_zerocopy_send_bytes_threshold = kDefaultSendBytesThreshold; + int tcp_tx_zerocopy_max_simultaneous_sends = kDefaultMaxSends; + bool tcp_tx_zero_copy_enabled = kZerocpTxEnabledDefault; + int keep_alive_time_ms = 0; + int keep_alive_timeout_ms = 0; + bool expand_wildcard_addrs = false; + bool allow_reuse_port = false; + grpc_core::RefCountedPtr resource_quota; + struct grpc_socket_mutator* socket_mutator = nullptr; + PosixTcpOptions() = default; + // Move ctor + PosixTcpOptions(PosixTcpOptions&& other) noexcept { + socket_mutator = absl::exchange(other.socket_mutator, nullptr); + resource_quota = std::move(other.resource_quota); + CopyIntegerOptions(other); + } + // Move assignment + PosixTcpOptions& operator=(PosixTcpOptions&& other) noexcept { + if (socket_mutator != nullptr) { + grpc_socket_mutator_unref(socket_mutator); + } + socket_mutator = absl::exchange(other.socket_mutator, nullptr); + resource_quota = std::move(other.resource_quota); + CopyIntegerOptions(other); + return *this; + } + // Copy ctor + PosixTcpOptions(const PosixTcpOptions& other) { + if (other.socket_mutator != nullptr) { + socket_mutator = grpc_socket_mutator_ref(other.socket_mutator); + } + resource_quota = other.resource_quota; + CopyIntegerOptions(other); + } + // Copy assignment + PosixTcpOptions& operator=(const PosixTcpOptions& other) { + if (&other == this) { + return *this; + } + if (socket_mutator != nullptr) { + grpc_socket_mutator_unref(socket_mutator); + socket_mutator = nullptr; + } + if (other.socket_mutator != nullptr) { + socket_mutator = grpc_socket_mutator_ref(other.socket_mutator); + } + resource_quota = other.resource_quota; + CopyIntegerOptions(other); + return *this; + } + // Destructor. + ~PosixTcpOptions() { + if (socket_mutator != nullptr) { + grpc_socket_mutator_unref(socket_mutator); + } + } + + private: + void CopyIntegerOptions(const PosixTcpOptions& other) { + tcp_read_chunk_size = other.tcp_read_chunk_size; + tcp_min_read_chunk_size = other.tcp_min_read_chunk_size; + tcp_max_read_chunk_size = other.tcp_max_read_chunk_size; + tcp_tx_zerocopy_send_bytes_threshold = + other.tcp_tx_zerocopy_send_bytes_threshold; + tcp_tx_zerocopy_max_simultaneous_sends = + other.tcp_tx_zerocopy_max_simultaneous_sends; + tcp_tx_zero_copy_enabled = other.tcp_tx_zero_copy_enabled; + keep_alive_time_ms = other.keep_alive_time_ms; + keep_alive_timeout_ms = other.keep_alive_timeout_ms; + expand_wildcard_addrs = other.expand_wildcard_addrs; + allow_reuse_port = other.allow_reuse_port; + } +}; + +PosixTcpOptions TcpOptionsFromEndpointConfig( + const grpc_event_engine::experimental::EndpointConfig& config); + +// a wrapper for accept or accept4 +int Accept4(int sockfd, + grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr, + int nonblock, int cloexec); + +// Returns true if resolved_addr is an IPv4-mapped IPv6 address within the +// ::ffff:0.0.0.0/96 range, or false otherwise. + +// If resolved_addr4_out is non-NULL, the inner IPv4 address will be copied +// here when returning true. +bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* resolved_addr, + EventEngine::ResolvedAddress* resolved_addr4_out); + +// If resolved_addr is an AF_INET address, writes the corresponding +// ::ffff:0.0.0.0/96 address to resolved_addr6_out and returns true. Otherwise +// returns false. +bool SockaddrToV4Mapped(const EventEngine::ResolvedAddress* resolved_addr, + EventEngine::ResolvedAddress* resolved_addr6_out); + +// Converts a EventEngine::ResolvedAddress into a newly-allocated human-readable +// string. +// +// Currently, only the AF_INET, AF_INET6, and AF_UNIX families are recognized. +// If the normalize flag is enabled, ::ffff:0.0.0.0/96 IPv6 addresses are +// displayed as plain IPv4. +absl::StatusOr SockaddrToString( + const EventEngine::ResolvedAddress* resolved_addr, bool normalize); + +class PosixSocketWrapper { + public: + explicit PosixSocketWrapper(int fd) : fd_(fd) { GPR_ASSERT(fd_ > 0); } + + ~PosixSocketWrapper() = default; + + // Instruct the kernel to wait for specified number of bytes to be received on + // the socket before generating an interrupt for packet receive. If the call + // succeeds, it returns the number of bytes (wait threshold) that was actually + // set. + absl::StatusOr SetSocketRcvLowat(int bytes); + + // Set socket to use zerocopy + absl::Status SetSocketZeroCopy(); + + // Set socket to non blocking mode + absl::Status SetSocketNonBlocking(int non_blocking); + + // Set socket to close on exec + absl::Status SetSocketCloexec(int close_on_exec); + + // Set socket to reuse old addresses + absl::Status SetSocketReuseAddr(int reuse); + + // Disable nagle algorithm + absl::Status SetSocketLowLatency(int low_latency); + + // Set SO_REUSEPORT + absl::Status SetSocketReusePort(int reuse); + + // Override default Tcp user timeout values if necessary. + void TrySetSocketTcpUserTimeout(const PosixTcpOptions& options, + bool is_client); + + // Tries to set SO_NOSIGPIPE if available on this platform. + // If SO_NO_SIGPIPE is not available, returns not OK status. + absl::Status SetSocketNoSigpipeIfPossible(); + + // Tries to set IP_PKTINFO if available on this platform. If IP_PKTINFO is not + // available, returns not OK status. + absl::Status SetSocketIpPktInfoIfPossible(); + + // Tries to set IPV6_RECVPKTINFO if available on this platform. If + // IPV6_RECVPKTINFO is not available, returns not OK status. + absl::Status SetSocketIpv6RecvPktInfoIfPossible(); + + // Tries to set the socket's send buffer to given size. + absl::Status SetSocketSndBuf(int buffer_size_bytes); + + // Tries to set the socket's receive buffer to given size. + absl::Status SetSocketRcvBuf(int buffer_size_bytes); + + // Tries to set the socket using a grpc_socket_mutator + absl::Status SetSocketMutator(grpc_fd_usage usage, + grpc_socket_mutator* mutator); + + // Extracts the first socket mutator from config if any and applies on the fd. + absl::Status ApplySocketMutatorInOptions(grpc_fd_usage usage, + const PosixTcpOptions& options); + + // Return LocalAddress as EventEngine::ResolvedAddress + absl::StatusOr LocalAddress(); + + // Return PeerAddress as EventEngine::ResolvedAddress + absl::StatusOr PeerAddress(); + + // Return LocalAddress as string + absl::StatusOr LocalAddressString(); + + // Return PeerAddress as string + absl::StatusOr PeerAddressString(); + + // An enum to keep track of IPv4/IPv6 socket modes. + + // Currently, this information is only used when a socket is first created, + // but in the future we may wish to store it alongside the fd. This would let + // calls like sendto() know which family to use without asking the kernel + // first. + enum DSMode { + // Uninitialized, or a non-IP socket. + DSMODE_NONE, + // AF_INET only. + DSMODE_IPV4, + // AF_INET6 only, because IPV6_V6ONLY could not be cleared. + DSMODE_IPV6, + // AF_INET6, which also supports ::ffff-mapped IPv4 addresses. + DSMODE_DUALSTACK + }; + + // Tries to set the socket to dualstack. Returns true on success. + bool SetSocketDualStack(); + + // Returns the underlying file-descriptor. + int Fd() const { return fd_; } + + // Static methods + + // Configure default values for tcp user timeout to be used by client + // and server side sockets. + static void ConfigureDefaultTcpUserTimeout(bool enable, int timeout, + bool is_client); + + // Return true if SO_REUSEPORT is supported + static bool IsSocketReusePortSupported(); + + // Returns true if this system can create AF_INET6 sockets bound to ::1. + // The value is probed once, and cached for the life of the process. + + // This is more restrictive than checking for socket(AF_INET6) to succeed, + // because Linux with "net.ipv6.conf.all.disable_ipv6 = 1" is able to create + // and bind IPv6 sockets, but cannot connect to a getsockname() of [::]:port + // without a valid loopback interface. Rather than expose this half-broken + // state to library users, we turn off IPv6 sockets. + static bool IsIpv6LoopbackAvailable(); + + // Creates a new socket for connecting to (or listening on) an address. + + // If addr is AF_INET6, this creates an IPv6 socket first. If that fails, + // and addr is within ::ffff:0.0.0.0/96, then it automatically falls back to + // an IPv4 socket. + + // If addr is AF_INET, AF_UNIX, or anything else, then this is similar to + // calling socket() directly. + + // Returns an PosixSocketWrapper on success, otherwise returns a not-OK + // absl::Status + + // The dsmode output indicates which address family was actually created. + static absl::StatusOr CreateDualStackSocket( + std::function + socket_factory, + const experimental::EventEngine::ResolvedAddress& addr, int type, + int protocol, DSMode& dsmode); + + private: + int fd_; +}; + +} // namespace posix_engine +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H diff --git a/test/core/event_engine/posix/BUILD b/test/core/event_engine/posix/BUILD index 4d5a94ef997..24b8f487c27 100644 --- a/test/core/event_engine/posix/BUILD +++ b/test/core/event_engine/posix/BUILD @@ -130,3 +130,21 @@ grpc_cc_test( "//test/core/util:grpc_test_util", ], ) + +grpc_cc_test( + name = "tcp_posix_socket_utils_test", + srcs = ["tcp_posix_socket_utils_test.cc"], + external_deps = ["gtest"], + language = "C++", + tags = [ + "no_windows", + ], + uses_event_engine = False, + uses_polling = False, + deps = [ + "//:event_engine_common", + "//:posix_event_engine_tcp_socket_utils", + "//:socket_mutator", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/event_engine/posix/tcp_posix_socket_utils_test.cc b/test/core/event_engine/posix/tcp_posix_socket_utils_test.cc new file mode 100644 index 00000000000..7a9cb2fe1da --- /dev/null +++ b/test/core/event_engine/posix/tcp_posix_socket_utils_test.cc @@ -0,0 +1,382 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "absl/status/status.h" + +#include "src/core/lib/iomgr/port.h" + +// This test won't work except with posix sockets enabled +#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON + +#include +#include +#include +#include +#ifdef GRPC_HAVE_UNIX_SOCKET +#include +#endif + +#include +#include + +#include +#include +#include + +#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/iomgr/socket_mutator.h" + +namespace grpc_event_engine { +namespace posix_engine { + +namespace { + +struct test_socket_mutator { + grpc_socket_mutator base; + int option_value; +}; + +bool MutateFd(int fd, grpc_socket_mutator* mutator) { + int newval; + socklen_t intlen = sizeof(newval); + struct test_socket_mutator* m = + reinterpret_cast(mutator); + + if (0 != setsockopt(fd, IPPROTO_IP, IP_TOS, &m->option_value, + sizeof(m->option_value))) { + return false; + } + if (0 != getsockopt(fd, IPPROTO_IP, IP_TOS, &newval, &intlen)) { + return false; + } + if (newval != m->option_value) { + return false; + } + return true; +} + +bool MutateFd2(const grpc_mutate_socket_info* info, + grpc_socket_mutator* mutator) { + int newval; + socklen_t intlen = sizeof(newval); + struct test_socket_mutator* m = + reinterpret_cast(mutator); + + if (0 != setsockopt(info->fd, IPPROTO_IP, IP_TOS, &m->option_value, + sizeof(m->option_value))) { + return false; + } + if (0 != getsockopt(info->fd, IPPROTO_IP, IP_TOS, &newval, &intlen)) { + return false; + } + if (newval != m->option_value) { + return false; + } + return true; +} + +void DestroyTestMutator(grpc_socket_mutator* mutator) { + struct test_socket_mutator* m = + reinterpret_cast(mutator); + gpr_free(m); +} + +int CompareTestMutator(grpc_socket_mutator* a, grpc_socket_mutator* b) { + struct test_socket_mutator* ma = + reinterpret_cast(a); + struct test_socket_mutator* mb = + reinterpret_cast(b); + return grpc_core::QsortCompare(ma->option_value, mb->option_value); +} + +const grpc_socket_mutator_vtable mutator_vtable = {MutateFd, CompareTestMutator, + DestroyTestMutator, nullptr}; + +const grpc_socket_mutator_vtable mutator_vtable2 = { + nullptr, CompareTestMutator, DestroyTestMutator, MutateFd2}; + +const uint8_t kMapped[] = {0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0xff, 0xff, 192, 0, 2, 1}; + +const uint8_t kNotQuiteMapped[] = {0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0xff, 0xfe, 192, 0, 2, 99}; +const uint8_t kIPv4[] = {192, 0, 2, 1}; + +const uint8_t kIPv6[] = {0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 1}; + +EventEngine::ResolvedAddress MakeAddr4(const uint8_t* data, size_t data_len) { + EventEngine::ResolvedAddress resolved_addr4; + sockaddr_in* addr4 = reinterpret_cast( + const_cast(resolved_addr4.address())); + memset(&resolved_addr4, 0, sizeof(resolved_addr4)); + addr4->sin_family = AF_INET; + GPR_ASSERT(data_len == sizeof(addr4->sin_addr.s_addr)); + memcpy(&addr4->sin_addr.s_addr, data, data_len); + addr4->sin_port = htons(12345); + return EventEngine::ResolvedAddress( + reinterpret_cast(addr4), + static_cast(sizeof(sockaddr_in))); +} + +EventEngine::ResolvedAddress MakeAddr6(const uint8_t* data, size_t data_len) { + EventEngine::ResolvedAddress resolved_addr6; + sockaddr_in6* addr6 = reinterpret_cast( + const_cast(resolved_addr6.address())); + memset(&resolved_addr6, 0, sizeof(resolved_addr6)); + addr6->sin6_family = AF_INET6; + GPR_ASSERT(data_len == sizeof(addr6->sin6_addr.s6_addr)); + memcpy(&addr6->sin6_addr.s6_addr, data, data_len); + addr6->sin6_port = htons(12345); + return EventEngine::ResolvedAddress( + reinterpret_cast(addr6), + static_cast(sizeof(sockaddr_in6))); +} + +void SetIPv6ScopeId(EventEngine::ResolvedAddress* addr, uint32_t scope_id) { + sockaddr_in6* addr6 = + reinterpret_cast(const_cast(addr->address())); + ASSERT_EQ(addr6->sin6_family, AF_INET6); + addr6->sin6_scope_id = scope_id; +} + +#ifdef GRPC_HAVE_UNIX_SOCKET +absl::StatusOr UnixSockaddrPopulate( + absl::string_view path) { + EventEngine::ResolvedAddress resolved_addr; + memset(const_cast(resolved_addr.address()), 0, + resolved_addr.size()); + struct sockaddr_un* un = reinterpret_cast( + const_cast(resolved_addr.address())); + const size_t maxlen = sizeof(un->sun_path) - 1; + if (path.size() > maxlen) { + return absl::InternalError(absl::StrCat( + "Path name should not have more than ", maxlen, " characters")); + } + un->sun_family = AF_UNIX; + path.copy(un->sun_path, path.size()); + un->sun_path[path.size()] = '\0'; + return EventEngine::ResolvedAddress(reinterpret_cast(un), + static_cast(sizeof(*un))); +} + +absl::StatusOr UnixAbstractSockaddrPopulate( + absl::string_view path) { + EventEngine::ResolvedAddress resolved_addr; + memset(const_cast(resolved_addr.address()), 0, + resolved_addr.size()); + struct sockaddr* addr = const_cast(resolved_addr.address()); + struct sockaddr_un* un = reinterpret_cast(addr); + const size_t maxlen = sizeof(un->sun_path) - 1; + if (path.size() > maxlen) { + return absl::InternalError(absl::StrCat( + "Path name should not have more than ", maxlen, " characters")); + } + un->sun_family = AF_UNIX; + un->sun_path[0] = '\0'; + path.copy(un->sun_path + 1, path.size()); +#ifdef GPR_APPLE + return EventEngine::ResolvedAddress( + addr, static_cast(sizeof(un->sun_len) + + sizeof(un->sun_family) + path.size() + 1)); +#else + return EventEngine::ResolvedAddress( + addr, static_cast(sizeof(un->sun_family) + path.size() + 1)); +#endif +} +#endif + +} // namespace + +TEST(TcpPosixSocketUtilsTest, SocketMutatorTest) { + auto test_with_vtable = [](const grpc_socket_mutator_vtable* vtable) { + int sock = socket(PF_INET, SOCK_STREAM, 0); + EXPECT_GT(sock, 0); + PosixSocketWrapper posix_sock(sock); + struct test_socket_mutator mutator; + grpc_socket_mutator_init(&mutator.base, vtable); + + mutator.option_value = IPTOS_LOWDELAY; + EXPECT_TRUE( + posix_sock + .SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE, + reinterpret_cast(&mutator)) + .ok()); + mutator.option_value = IPTOS_THROUGHPUT; + EXPECT_TRUE( + posix_sock + .SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE, + reinterpret_cast(&mutator)) + .ok()); + + mutator.option_value = IPTOS_RELIABILITY; + EXPECT_TRUE( + posix_sock + .SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE, + reinterpret_cast(&mutator)) + .ok()); + + mutator.option_value = -1; + EXPECT_FALSE( + posix_sock + .SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE, + reinterpret_cast(&mutator)) + .ok()); + close(sock); + }; + test_with_vtable(&mutator_vtable); + test_with_vtable(&mutator_vtable2); +} + +TEST(TcpPosixSocketUtilsTest, SocketOptionsTest) { + int sock = socket(PF_INET, SOCK_STREAM, 0); + EXPECT_GT(sock, 0); + PosixSocketWrapper posix_sock(sock); + EXPECT_TRUE(posix_sock.SetSocketNonBlocking(1).ok()); + EXPECT_TRUE(posix_sock.SetSocketNonBlocking(0).ok()); + EXPECT_TRUE(posix_sock.SetSocketCloexec(1).ok()); + EXPECT_TRUE(posix_sock.SetSocketCloexec(0).ok()); + EXPECT_TRUE(posix_sock.SetSocketReuseAddr(1).ok()); + EXPECT_TRUE(posix_sock.SetSocketReuseAddr(0).ok()); + EXPECT_TRUE(posix_sock.SetSocketLowLatency(1).ok()); + EXPECT_TRUE(posix_sock.SetSocketLowLatency(0).ok()); + close(sock); +} + +TEST(SockAddrUtilsTest, SockAddrIsV4MappedTest) { + // v4mapped input should succeed. + EventEngine::ResolvedAddress input6 = MakeAddr6(kMapped, sizeof(kMapped)); + ASSERT_TRUE(SockaddrIsV4Mapped(&input6, nullptr)); + EventEngine::ResolvedAddress output4; + ASSERT_TRUE(SockaddrIsV4Mapped(&input6, &output4)); + EventEngine::ResolvedAddress expect4 = MakeAddr4(kIPv4, sizeof(kIPv4)); + ASSERT_EQ(memcmp(expect4.address(), output4.address(), expect4.size()), 0); + + // Non-v4mapped input should fail. + input6 = MakeAddr6(kNotQuiteMapped, sizeof(kNotQuiteMapped)); + ASSERT_FALSE(SockaddrIsV4Mapped(&input6, nullptr)); + ASSERT_FALSE(SockaddrIsV4Mapped(&input6, &output4)); + // Output is unchanged. + ASSERT_EQ(memcmp(expect4.address(), output4.address(), expect4.size()), 0); + + // Plain IPv4 input should also fail. + EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4)); + ASSERT_FALSE(SockaddrIsV4Mapped(&input4, nullptr)); +} + +TEST(TcpPosixSocketUtilsTest, SockAddrToV4MappedTest) { + // IPv4 input should succeed. + EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4)); + EventEngine::ResolvedAddress output6; + ASSERT_TRUE(SockaddrToV4Mapped(&input4, &output6)); + EventEngine::ResolvedAddress expect6 = MakeAddr6(kMapped, sizeof(kMapped)); + ASSERT_EQ(memcmp(expect6.address(), output6.address(), output6.size()), 0); + + // IPv6 input should fail. + EventEngine::ResolvedAddress input6 = MakeAddr6(kIPv6, sizeof(kIPv6)); + ASSERT_TRUE(!SockaddrToV4Mapped(&input6, &output6)); + // Output is unchanged. + ASSERT_EQ(memcmp(expect6.address(), output6.address(), output6.size()), 0); + + // Already-v4mapped input should also fail. + input6 = MakeAddr6(kMapped, sizeof(kMapped)); + ASSERT_TRUE(!SockaddrToV4Mapped(&input6, &output6)); +} + +TEST(TcpPosixSocketUtilsTest, SockAddrToStringTest) { + errno = 0x7EADBEEF; + + EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4)); + EXPECT_EQ(SockaddrToString(&input4, false).value(), "192.0.2.1:12345"); + EXPECT_EQ(SockaddrToString(&input4, true).value(), "192.0.2.1:12345"); + + EventEngine::ResolvedAddress input6 = MakeAddr6(kIPv6, sizeof(kIPv6)); + EXPECT_EQ(SockaddrToString(&input6, false).value(), "[2001:db8::1]:12345"); + EXPECT_EQ(SockaddrToString(&input6, true).value(), "[2001:db8::1]:12345"); + + SetIPv6ScopeId(&input6, 2); + EXPECT_EQ(SockaddrToString(&input6, false).value(), "[2001:db8::1%2]:12345"); + EXPECT_EQ(SockaddrToString(&input6, true).value(), "[2001:db8::1%2]:12345"); + + SetIPv6ScopeId(&input6, 101); + EXPECT_EQ(SockaddrToString(&input6, false).value(), + "[2001:db8::1%101]:12345"); + EXPECT_EQ(SockaddrToString(&input6, true).value(), "[2001:db8::1%101]:12345"); + + EventEngine::ResolvedAddress input6x = MakeAddr6(kMapped, sizeof(kMapped)); + EXPECT_EQ(SockaddrToString(&input6x, false).value(), + "[::ffff:192.0.2.1]:12345"); + EXPECT_EQ(SockaddrToString(&input6x, true).value(), "192.0.2.1:12345"); + + EventEngine::ResolvedAddress input6y = + MakeAddr6(kNotQuiteMapped, sizeof(kNotQuiteMapped)); + EXPECT_EQ(SockaddrToString(&input6y, false).value(), + "[::fffe:c000:263]:12345"); + EXPECT_EQ(SockaddrToString(&input6y, true).value(), + "[::fffe:c000:263]:12345"); + + EventEngine::ResolvedAddress phony; + memset(const_cast(phony.address()), 0, phony.size()); + sockaddr* phony_addr = const_cast(phony.address()); + phony_addr->sa_family = 123; + EXPECT_EQ(SockaddrToString(&phony, false).status(), + absl::InvalidArgumentError("Unknown sockaddr family: 123")); + EXPECT_EQ(SockaddrToString(&phony, true).status(), + absl::InvalidArgumentError("Unknown sockaddr family: 123")); + +#ifdef GRPC_HAVE_UNIX_SOCKET + EventEngine::ResolvedAddress inputun = + *UnixSockaddrPopulate("/some/unix/path"); + struct sockaddr_un* sock_un = reinterpret_cast( + const_cast(inputun.address())); + EXPECT_EQ(SockaddrToString(&inputun, true).value(), "/some/unix/path"); + + std::string max_filepath(sizeof(sock_un->sun_path) - 1, 'x'); + inputun = *UnixSockaddrPopulate(max_filepath); + EXPECT_EQ(SockaddrToString(&inputun, true).value(), max_filepath); + + inputun = *UnixSockaddrPopulate(max_filepath); + sock_un->sun_path[sizeof(sockaddr_un::sun_path) - 1] = 'x'; + EXPECT_EQ(SockaddrToString(&inputun, true).status(), + absl::InvalidArgumentError("UDS path is not null-terminated")); + + EventEngine::ResolvedAddress inputun2 = + *UnixAbstractSockaddrPopulate("some_unix_path"); + EXPECT_EQ(SockaddrToString(&inputun2, true).value(), + absl::StrCat(std::string(1, '\0'), "some_unix_path")); + + std::string max_abspath(sizeof(sock_un->sun_path) - 1, '\0'); + EventEngine::ResolvedAddress inputun3 = + *UnixAbstractSockaddrPopulate(max_abspath); + EXPECT_EQ(SockaddrToString(&inputun3, true).value(), + absl::StrCat(std::string(1, '\0'), max_abspath)); +#endif +} + +} // namespace posix_engine +} // namespace grpc_event_engine + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else /* GRPC_POSIX_SOCKET_UTILS_COMMON */ + +int main(int argc, char** argv) { return 1; } + +#endif /* GRPC_POSIX_SOCKET_UTILS_COMMON */ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 189092557cc..57f30f9ed27 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -6801,6 +6801,28 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "tcp_posix_socket_utils_test", + "platforms": [ + "linux", + "mac", + "posix" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,