Revert "Forking tcp socket utils for posix event engine endpoints (#30383)" (#30946)

This reverts commit d7cce32449.
pull/30479/head
Richard Belleville 2 years ago committed by GitHub
parent 95aeffcb73
commit 33b3fe89c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 44
      BUILD
  2. 42
      CMakeLists.txt
  3. 18
      build_autogenerated.yaml
  4. 883
      src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
  5. 311
      src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
  6. 18
      test/core/event_engine/posix/BUILD
  7. 382
      test/core/event_engine/posix/tcp_posix_socket_utils_test.cc
  8. 22
      tools/run_tests/generated/tests.json

44
BUILD

@ -2788,34 +2788,6 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "posix_event_engine_tcp_socket_utils",
srcs = [
"src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc",
],
hdrs = [
"src/core/lib/event_engine/posix_engine/tcp_socket_utils.h",
],
external_deps = [
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/strings:str_format",
"absl/types:optional",
"absl/utility",
],
deps = [
"event_engine_base_hdrs",
"gpr",
"grpc_codegen",
"iomgr_port",
"ref_counted_ptr",
"resource_quota",
"socket_mutator",
"useful",
],
)
grpc_cc_library(
name = "posix_event_engine",
srcs = ["src/core/lib/event_engine/posix_engine/posix_engine.cc"],
@ -3043,22 +3015,6 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "socket_mutator",
srcs = [
"src/core/lib/iomgr/socket_mutator.cc",
],
hdrs = [
"src/core/lib/iomgr/socket_mutator.h",
],
deps = [
"channel_args",
"gpr",
"grpc_codegen",
"useful",
],
)
grpc_cc_library(
name = "backoff",
srcs = [

42
CMakeLists.txt generated

@ -1180,9 +1180,6 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx tcp_client_posix_test)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx tcp_posix_socket_utils_test)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx tcp_server_posix_test)
endif()
@ -17736,45 +17733,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
)
endif()
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(tcp_posix_socket_utils_test
src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
src/core/lib/iomgr/socket_mutator.cc
test/core/event_engine/posix/tcp_posix_socket_utils_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(tcp_posix_socket_utils_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(tcp_posix_socket_utils_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)

@ -9767,24 +9767,6 @@ targets:
- linux
- posix
- mac
- name: tcp_posix_socket_utils_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
- src/core/lib/iomgr/socket_mutator.h
src:
- src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
- src/core/lib/iomgr/socket_mutator.cc
- test/core/event_engine/posix/tcp_posix_socket_utils_test.cc
deps:
- grpc_test_util
platforms:
- linux
- posix
- mac
uses_polling: false
- name: tcp_server_posix_test
gtest: true
build: test

@ -1,883 +0,0 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include <arpa/inet.h>
#include <errno.h>
#include <inttypes.h>
#include <limits.h>
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
#ifdef GRPC_LINUX_TCP_H
#include <linux/tcp.h>
#else
#include <netinet/in.h> // IWYU pragma: keep
#include <netinet/tcp.h>
#endif
#include <fcntl.h>
#include <sys/socket.h>
#include <unistd.h>
#endif
#include <atomic>
#include <cstring>
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/host_port.h"
#ifdef GRPC_HAVE_UNIX_SOCKET
#include <sys/un.h>
#endif
namespace grpc_event_engine {
namespace posix_engine {
using ::grpc_event_engine::experimental::EndpointConfig;
using ::grpc_event_engine::experimental::EventEngine;
namespace {
int AdjustValue(int default_value, int min_value, int max_value,
absl::optional<int> actual_value) {
if (!actual_value.has_value() || *actual_value < min_value ||
*actual_value > max_value) {
return default_value;
}
return *actual_value;
}
// The default values for TCP_USER_TIMEOUT are currently configured to be in
// line with the default values of KEEPALIVE_TIMEOUT as proposed in
// https://github.com/grpc/proposal/blob/master/A18-tcp-user-timeout.md */
int kDefaultClientUserTimeoutMs = 20000;
int kDefaultServerUserTimeoutMs = 20000;
bool kDefaultClientUserTimeoutEnabled = false;
bool kDefaultServerUserTimeoutEnabled = true;
#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
absl::Status ErrorForFd(
int fd, const experimental::EventEngine::ResolvedAddress& addr) {
if (fd >= 0) return absl::OkStatus();
const char* addr_str = reinterpret_cast<const char*>(addr.address());
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("socket: ", strerror(errno),
std::string(addr_str, addr.size())));
}
int CreateSocket(std::function<int(int, int, int)> socket_factory, int family,
int type, int protocol) {
return socket_factory != nullptr ? socket_factory(family, type, protocol)
: socket(family, type, protocol);
}
const uint8_t kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};
#endif /* GRPC_POSIX_SOCKET_UTILS_COMMON */
} // namespace
PosixTcpOptions TcpOptionsFromEndpointConfig(const EndpointConfig& config) {
void* value;
PosixTcpOptions options;
options.tcp_read_chunk_size = AdjustValue(
PosixTcpOptions::kDefaultReadChunkSize, 1, PosixTcpOptions::kMaxChunkSize,
config.GetInt(GRPC_ARG_TCP_READ_CHUNK_SIZE));
options.tcp_min_read_chunk_size =
AdjustValue(PosixTcpOptions::kDefaultMinReadChunksize, 1,
PosixTcpOptions::kMaxChunkSize,
config.GetInt(GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE));
options.tcp_max_read_chunk_size =
AdjustValue(PosixTcpOptions::kDefaultMaxReadChunksize, 1,
PosixTcpOptions::kMaxChunkSize,
config.GetInt(GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE));
options.tcp_tx_zerocopy_send_bytes_threshold =
AdjustValue(PosixTcpOptions::kDefaultSendBytesThreshold, 0, INT_MAX,
config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD));
options.tcp_tx_zerocopy_max_simultaneous_sends =
AdjustValue(PosixTcpOptions::kDefaultMaxSends, 0, INT_MAX,
config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS));
options.tcp_tx_zero_copy_enabled =
(AdjustValue(PosixTcpOptions::kZerocpTxEnabledDefault, 0, 1,
config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED)) != 0);
options.keep_alive_time_ms =
AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_KEEPALIVE_TIME_MS));
options.keep_alive_timeout_ms =
AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS));
options.expand_wildcard_addrs =
(AdjustValue(0, 1, INT_MAX,
config.GetInt(GRPC_ARG_EXPAND_WILDCARD_ADDRS)) != 0);
options.allow_reuse_port =
(AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_ALLOW_REUSEPORT)) !=
0);
if (options.tcp_min_read_chunk_size > options.tcp_max_read_chunk_size) {
options.tcp_min_read_chunk_size = options.tcp_max_read_chunk_size;
}
options.tcp_read_chunk_size = grpc_core::Clamp(
options.tcp_read_chunk_size, options.tcp_min_read_chunk_size,
options.tcp_max_read_chunk_size);
value = config.GetVoidPointer(GRPC_ARG_RESOURCE_QUOTA);
if (value != nullptr) {
options.resource_quota =
reinterpret_cast<grpc_core::ResourceQuota*>(value)->Ref();
}
value = config.GetVoidPointer(GRPC_ARG_SOCKET_MUTATOR);
if (value != nullptr) {
options.socket_mutator =
grpc_socket_mutator_ref(static_cast<grpc_socket_mutator*>(value));
}
return options;
}
#ifdef GRPC_POSIX_SOCKETUTILS
int Accept4(int sockfd,
grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr,
int nonblock, int cloexec) {
int fd, flags;
socklen_t len = addr.size();
fd = accept(sockfd, const_cast<sockaddr*>(addr.address()), &len);
if (fd >= 0) {
if (nonblock) {
flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) goto close_and_error;
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0) goto close_and_error;
}
if (cloexec) {
flags = fcntl(fd, F_GETFD, 0);
if (flags < 0) goto close_and_error;
if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC) != 0) goto close_and_error;
}
}
return fd;
close_and_error:
close(fd);
return -1;
}
#elif GRPC_LINUX_SOCKETUTILS
int Accept4(int sockfd,
grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr,
int nonblock, int cloexec) {
int flags = 0;
flags |= nonblock ? SOCK_NONBLOCK : 0;
flags |= cloexec ? SOCK_CLOEXEC : 0;
socklen_t len = addr.size();
return accept4(sockfd, const_cast<sockaddr*>(addr.address()), &len, flags);
}
#endif /* GRPC_LINUX_SOCKETUTILS */
#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* resolved_addr,
EventEngine::ResolvedAddress* resolved_addr4_out) {
const sockaddr* addr = resolved_addr->address();
if (addr->sa_family == AF_INET6) {
const sockaddr_in6* addr6 = reinterpret_cast<const sockaddr_in6*>(addr);
sockaddr_in* addr4_out =
resolved_addr4_out == nullptr
? nullptr
: reinterpret_cast<sockaddr_in*>(
const_cast<sockaddr*>(resolved_addr4_out->address()));
if (memcmp(addr6->sin6_addr.s6_addr, kV4MappedPrefix,
sizeof(kV4MappedPrefix)) == 0) {
if (resolved_addr4_out != nullptr) {
// Normalize ::ffff:0.0.0.0/96 to IPv4.
memset(addr4_out, 0, sizeof(sockaddr_in));
addr4_out->sin_family = AF_INET;
// s6_addr32 would be nice, but it's non-standard.
memcpy(&addr4_out->sin_addr, &addr6->sin6_addr.s6_addr[12], 4);
addr4_out->sin_port = addr6->sin6_port;
*resolved_addr4_out = EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(addr4_out),
static_cast<socklen_t>(sizeof(sockaddr_in)));
}
return true;
}
}
return false;
}
bool SockaddrToV4Mapped(const EventEngine::ResolvedAddress* resolved_addr,
EventEngine::ResolvedAddress* resolved_addr6_out) {
GPR_ASSERT(resolved_addr != resolved_addr6_out);
const sockaddr* addr = resolved_addr->address();
sockaddr_in6* addr6_out = const_cast<sockaddr_in6*>(
reinterpret_cast<const sockaddr_in6*>(resolved_addr6_out->address()));
if (addr->sa_family == AF_INET) {
const sockaddr_in* addr4 = reinterpret_cast<const sockaddr_in*>(addr);
memset(resolved_addr6_out, 0, sizeof(*resolved_addr6_out));
addr6_out->sin6_family = AF_INET6;
memcpy(&addr6_out->sin6_addr.s6_addr[0], kV4MappedPrefix, 12);
memcpy(&addr6_out->sin6_addr.s6_addr[12], &addr4->sin_addr, 4);
addr6_out->sin6_port = addr4->sin_port;
*resolved_addr6_out = EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(addr6_out),
static_cast<socklen_t>(sizeof(sockaddr_in6)));
return true;
}
return false;
}
absl::StatusOr<std::string> SockaddrToString(
const EventEngine::ResolvedAddress* resolved_addr, bool normalize) {
const int save_errno = errno;
EventEngine::ResolvedAddress addr_normalized;
if (normalize && SockaddrIsV4Mapped(resolved_addr, &addr_normalized)) {
resolved_addr = &addr_normalized;
}
const sockaddr* addr =
reinterpret_cast<const sockaddr*>(resolved_addr->address());
std::string out;
#ifdef GRPC_HAVE_UNIX_SOCKET
if (addr->sa_family == AF_UNIX) {
const sockaddr_un* addr_un = reinterpret_cast<const sockaddr_un*>(addr);
bool abstract = addr_un->sun_path[0] == '\0';
if (abstract) {
#ifdef GPR_APPLE
int len = resolved_addr->size() - sizeof(addr_un->sun_family) -
sizeof(addr_un->sun_len);
#else
int len = resolved_addr->size() - sizeof(addr_un->sun_family);
#endif
if (len <= 0) {
return absl::InvalidArgumentError("Empty UDS abstract path");
}
out = std::string(addr_un->sun_path, len);
} else {
size_t maxlen = sizeof(addr_un->sun_path);
if (strnlen(addr_un->sun_path, maxlen) == maxlen) {
return absl::InvalidArgumentError("UDS path is not null-terminated");
}
out = std::string(addr_un->sun_path);
}
return out;
}
#endif
const void* ip = nullptr;
int port = 0;
uint32_t sin6_scope_id = 0;
if (addr->sa_family == AF_INET) {
const sockaddr_in* addr4 = reinterpret_cast<const sockaddr_in*>(addr);
ip = &addr4->sin_addr;
port = ntohs(addr4->sin_port);
} else if (addr->sa_family == AF_INET6) {
const sockaddr_in6* addr6 = reinterpret_cast<const sockaddr_in6*>(addr);
ip = &addr6->sin6_addr;
port = ntohs(addr6->sin6_port);
sin6_scope_id = addr6->sin6_scope_id;
}
char ntop_buf[INET6_ADDRSTRLEN];
if (ip != nullptr &&
inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != nullptr) {
if (sin6_scope_id != 0) {
// Enclose sin6_scope_id with the format defined in RFC 6874 section 2.
std::string host_with_scope =
absl::StrFormat("%s%%%" PRIu32, ntop_buf, sin6_scope_id);
out = grpc_core::JoinHostPort(host_with_scope, port);
} else {
out = grpc_core::JoinHostPort(ntop_buf, port);
}
} else {
return absl::InvalidArgumentError(
absl::StrCat("Unknown sockaddr family: ", addr->sa_family));
}
// This is probably redundant, but we wouldn't want to log the wrong error.
errno = save_errno;
return out;
}
// Instruct the kernel to wait for specified number of bytes to be received on
// the socket before generating an interrupt for packet receive. If the call
// succeeds, it returns the number of bytes (wait threshold) that was actually
// set.
absl::StatusOr<int> PosixSocketWrapper::SetSocketRcvLowat(int bytes) {
if (setsockopt(fd_, SOL_SOCKET, SO_RCVLOWAT, &bytes, sizeof(bytes)) != 0) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("setsockopt(SO_RCVLOWAT): ", strerror(errno)));
}
return bytes;
}
// Set a socket to use zerocopy
absl::Status PosixSocketWrapper::SetSocketZeroCopy() {
#ifdef GRPC_LINUX_ERRQUEUE
const int enable = 1;
auto err = setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable));
if (err != 0) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("setsockopt(SO_ZEROCOPY): ", strerror(errno)));
}
return absl::OkStatus();
#else
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("setsockopt(SO_ZEROCOPY): ", strerror(ENOSYS)));
#endif
}
// Set a socket to non blocking mode
absl::Status PosixSocketWrapper::SetSocketNonBlocking(int non_blocking) {
int oldflags = fcntl(fd_, F_GETFL, 0);
if (oldflags < 0) {
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("fcntl: ", strerror(errno)));
}
if (non_blocking) {
oldflags |= O_NONBLOCK;
} else {
oldflags &= ~O_NONBLOCK;
}
if (fcntl(fd_, F_SETFL, oldflags) != 0) {
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("fcntl: ", strerror(errno)));
}
return absl::OkStatus();
}
absl::Status PosixSocketWrapper::SetSocketNoSigpipeIfPossible() {
#ifdef GRPC_HAVE_SO_NOSIGPIPE
int val = 1;
int newval;
socklen_t intlen = sizeof(newval);
if (0 != setsockopt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("setsockopt(SO_NOSIGPIPE): ", strerror(errno)));
}
if (0 != getsockopt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("getsockopt(SO_NOSIGPIPE): ", strerror(errno)));
}
if ((newval != 0) != (val != 0)) {
return absl::Status(absl::StatusCode::kInternal,
"Failed to set SO_NOSIGPIPE");
}
#endif
return absl::OkStatus();
}
absl::Status PosixSocketWrapper::SetSocketIpPktInfoIfPossible() {
#ifdef GRPC_HAVE_IP_PKTINFO
int get_local_ip = 1;
if (0 != setsockopt(fd_, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
sizeof(get_local_ip))) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("setsockopt(IP_PKTINFO): ", strerror(errno)));
}
#endif
return absl::OkStatus();
}
absl::Status PosixSocketWrapper::SetSocketIpv6RecvPktInfoIfPossible() {
#ifdef GRPC_HAVE_IPV6_RECVPKTINFO
int get_local_ip = 1;
if (0 != setsockopt(fd_, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
sizeof(get_local_ip))) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("setsockopt(IPV6_RECVPKTINFO): ", strerror(errno)));
}
#endif
return absl::OkStatus();
}
absl::Status PosixSocketWrapper::SetSocketSndBuf(int buffer_size_bytes) {
return 0 == setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &buffer_size_bytes,
sizeof(buffer_size_bytes))
? absl::OkStatus()
: absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("setsockopt(SO_SNDBUF): ", strerror(errno)));
}
absl::Status PosixSocketWrapper::SetSocketRcvBuf(int buffer_size_bytes) {
return 0 == setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &buffer_size_bytes,
sizeof(buffer_size_bytes))
? absl::OkStatus()
: absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("setsockopt(SO_RCVBUF): ", strerror(errno)));
}
// Set a socket to close on exec
absl::Status PosixSocketWrapper::SetSocketCloexec(int close_on_exec) {
int oldflags = fcntl(fd_, F_GETFD, 0);
if (oldflags < 0) {
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("fcntl: ", strerror(errno)));
}
if (close_on_exec) {
oldflags |= FD_CLOEXEC;
} else {
oldflags &= ~FD_CLOEXEC;
}
if (fcntl(fd_, F_SETFD, oldflags) != 0) {
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("fcntl: ", strerror(errno)));
}
return absl::OkStatus();
}
// set a socket to reuse old addresses
absl::Status PosixSocketWrapper::SetSocketReuseAddr(int reuse) {
int val = (reuse != 0);
int newval;
socklen_t intlen = sizeof(newval);
if (0 != setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val))) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("setsockopt(SO_REUSEADDR): ", strerror(errno)));
}
if (0 != getsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen)) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("getsockopt(SO_REUSEADDR): ", strerror(errno)));
}
if ((newval != 0) != val) {
return absl::Status(absl::StatusCode::kInternal,
"Failed to set SO_REUSEADDR");
}
return absl::OkStatus();
}
// set a socket to reuse old ports
absl::Status PosixSocketWrapper::SetSocketReusePort(int reuse) {
#ifndef SO_REUSEPORT
return absl::Status(absl::StatusCode::kInternal,
"SO_REUSEPORT unavailable on compiling system");
#else
int val = (reuse != 0);
int newval;
socklen_t intlen = sizeof(newval);
if (0 != setsockopt(fd_, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val))) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("setsockopt(SO_REUSEPORT): ", strerror(errno)));
}
if (0 != getsockopt(fd_, SOL_SOCKET, SO_REUSEPORT, &newval, &intlen)) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("getsockopt(SO_REUSEPORT): ", strerror(errno)));
}
if ((newval != 0) != val) {
return absl::Status(absl::StatusCode::kInternal,
"Failed to set SO_REUSEPORT");
}
return absl::OkStatus();
#endif
}
bool PosixSocketWrapper::IsSocketReusePortSupported() {
static bool kSupportSoReusePort = []() -> bool {
int s = socket(AF_INET, SOCK_STREAM, 0);
if (s < 0) {
// This might be an ipv6-only environment in which case
// 'socket(AF_INET,..)' call would fail. Try creating IPv6 socket in that
// case
s = socket(AF_INET6, SOCK_STREAM, 0);
}
if (s >= 0) {
PosixSocketWrapper sock(s);
return sock.SetSocketReusePort(1).ok();
} else {
return false;
}
}();
return kSupportSoReusePort;
}
// Disable nagle algorithm
absl::Status PosixSocketWrapper::SetSocketLowLatency(int low_latency) {
int val = (low_latency != 0);
int newval;
socklen_t intlen = sizeof(newval);
if (0 != setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("setsockopt(TCP_NODELAY): ", strerror(errno)));
}
if (0 != getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen)) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("getsockopt(TCP_NODELAY): ", strerror(errno)));
}
if ((newval != 0) != val) {
return absl::Status(absl::StatusCode::kInternal,
"Failed to set TCP_NODELAY");
}
return absl::OkStatus();
}
#if GPR_LINUX == 1
// For Linux, it will be detected to support TCP_USER_TIMEOUT
#ifndef TCP_USER_TIMEOUT
#define TCP_USER_TIMEOUT 18
#endif
#define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT 0
#else
// For non-Linux, TCP_USER_TIMEOUT will be used if TCP_USER_TIMEOUT is defined.
#ifdef TCP_USER_TIMEOUT
#define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT 0
#else
#define TCP_USER_TIMEOUT 0
#define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT -1
#endif // TCP_USER_TIMEOUT
#endif // GPR_LINUX == 1
// Whether the socket supports TCP_USER_TIMEOUT option.
// (0: don't know, 1: support, -1: not support)
static std::atomic<int> g_socket_supports_tcp_user_timeout(
SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT);
void PosixSocketWrapper::ConfigureDefaultTcpUserTimeout(bool enable,
int timeout,
bool is_client) {
if (is_client) {
kDefaultClientUserTimeoutEnabled = enable;
if (timeout > 0) {
kDefaultClientUserTimeoutMs = timeout;
}
} else {
kDefaultServerUserTimeoutEnabled = enable;
if (timeout > 0) {
kDefaultServerUserTimeoutMs = timeout;
}
}
}
// Set TCP_USER_TIMEOUT
void PosixSocketWrapper::TrySetSocketTcpUserTimeout(
const PosixTcpOptions& options, bool is_client) {
if (g_socket_supports_tcp_user_timeout.load() < 0) {
return;
}
bool enable = is_client ? kDefaultClientUserTimeoutEnabled
: kDefaultServerUserTimeoutEnabled;
int timeout =
is_client ? kDefaultClientUserTimeoutMs : kDefaultServerUserTimeoutMs;
if (options.keep_alive_time_ms > 0) {
enable = options.keep_alive_time_ms != INT_MAX;
}
if (options.keep_alive_timeout_ms > 0) {
timeout = options.keep_alive_timeout_ms;
}
if (enable) {
int newval;
socklen_t len = sizeof(newval);
// If this is the first time to use TCP_USER_TIMEOUT, try to check
// if it is available.
if (g_socket_supports_tcp_user_timeout.load() == 0) {
if (0 != getsockopt(fd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) {
gpr_log(GPR_INFO,
"TCP_USER_TIMEOUT is not available. TCP_USER_TIMEOUT won't "
"be used thereafter");
g_socket_supports_tcp_user_timeout.store(-1);
} else {
gpr_log(GPR_INFO,
"TCP_USER_TIMEOUT is available. TCP_USER_TIMEOUT will be "
"used thereafter");
g_socket_supports_tcp_user_timeout.store(1);
}
}
if (g_socket_supports_tcp_user_timeout.load() > 0) {
if (0 != setsockopt(fd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout,
sizeof(timeout))) {
gpr_log(GPR_ERROR, "setsockopt(TCP_USER_TIMEOUT) %s", strerror(errno));
return;
}
if (0 != getsockopt(fd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) {
gpr_log(GPR_ERROR, "getsockopt(TCP_USER_TIMEOUT) %s", strerror(errno));
return;
}
if (newval != timeout) {
// Do not fail on failing to set TCP_USER_TIMEOUT
gpr_log(GPR_ERROR, "Failed to set TCP_USER_TIMEOUT");
return;
}
}
}
}
// Set a socket using a grpc_socket_mutator
absl::Status PosixSocketWrapper::SetSocketMutator(
grpc_fd_usage usage, grpc_socket_mutator* mutator) {
GPR_ASSERT(mutator);
if (!grpc_socket_mutator_mutate_fd(mutator, fd_, usage)) {
return absl::Status(absl::StatusCode::kInternal,
"grpc_socket_mutator failed.");
}
return absl::OkStatus();
}
absl::Status PosixSocketWrapper::ApplySocketMutatorInOptions(
grpc_fd_usage usage, const PosixTcpOptions& options) {
if (options.socket_mutator == nullptr) {
return absl::OkStatus();
}
return SetSocketMutator(usage, options.socket_mutator);
}
bool PosixSocketWrapper::SetSocketDualStack() {
const int off = 0;
return 0 == setsockopt(fd_, IPPROTO_IPV6, IPV6_V6ONLY, &off, sizeof(off));
}
bool PosixSocketWrapper::IsIpv6LoopbackAvailable() {
static bool kIpv6LoopbackAvailable = []() -> bool {
int fd = socket(AF_INET6, SOCK_STREAM, 0);
bool loopback_available = false;
if (fd < 0) {
gpr_log(GPR_INFO, "Disabling AF_INET6 sockets because socket() failed.");
} else {
sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_addr.s6_addr[15] = 1; /* [::1]:0 */
if (bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == 0) {
loopback_available = true;
} else {
gpr_log(GPR_INFO,
"Disabling AF_INET6 sockets because ::1 is not available.");
}
close(fd);
}
return loopback_available;
}();
return kIpv6LoopbackAvailable;
}
absl::StatusOr<EventEngine::ResolvedAddress>
PosixSocketWrapper::LocalAddress() {
EventEngine::ResolvedAddress addr;
socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
if (getsockname(fd_, const_cast<sockaddr*>(addr.address()), &len) < 0) {
return absl::InternalError(absl::StrCat("getsockname:", strerror(errno)));
}
return addr;
}
absl::StatusOr<EventEngine::ResolvedAddress> PosixSocketWrapper::PeerAddress() {
EventEngine::ResolvedAddress addr;
socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
if (getpeername(fd_, const_cast<sockaddr*>(addr.address()), &len) < 0) {
return absl::InternalError(absl::StrCat("getpeername:", strerror(errno)));
}
return addr;
}
absl::StatusOr<std::string> PosixSocketWrapper::LocalAddressString() {
auto status = LocalAddress();
if (!status.ok()) {
return status.status();
}
return SockaddrToString(&(*status), true);
}
absl::StatusOr<std::string> PosixSocketWrapper::PeerAddressString() {
auto status = PeerAddress();
if (!status.ok()) {
return status.status();
}
return SockaddrToString(&(*status), true);
}
absl::StatusOr<PosixSocketWrapper> PosixSocketWrapper::CreateDualStackSocket(
std::function<int(int, int, int)> socket_factory,
const experimental::EventEngine::ResolvedAddress& addr, int type,
int protocol, PosixSocketWrapper::DSMode& dsmode) {
const sockaddr* sock_addr = addr.address();
int family = sock_addr->sa_family;
int newfd;
if (family == AF_INET6) {
if (IsIpv6LoopbackAvailable()) {
newfd = CreateSocket(socket_factory, family, type, protocol);
} else {
newfd = -1;
errno = EAFNOSUPPORT;
}
if (newfd < 0) {
return ErrorForFd(newfd, addr);
}
PosixSocketWrapper sock(newfd);
// Check if we've got a valid dualstack socket.
if (sock.SetSocketDualStack()) {
dsmode = PosixSocketWrapper::DSMode::DSMODE_DUALSTACK;
return sock;
}
// If this isn't an IPv4 address, then return whatever we've got.
if (!SockaddrIsV4Mapped(&addr, nullptr)) {
dsmode = PosixSocketWrapper::DSMode::DSMODE_IPV6;
return sock;
}
// Fall back to AF_INET.
if (newfd >= 0) {
close(newfd);
}
family = AF_INET;
}
dsmode = family == AF_INET ? PosixSocketWrapper::DSMode::DSMODE_IPV4
: PosixSocketWrapper::DSMode::DSMODE_NONE;
newfd = CreateSocket(socket_factory, family, type, protocol);
if (newfd < 0) {
return ErrorForFd(newfd, addr);
}
return PosixSocketWrapper(newfd);
}
#else /* GRPC_POSIX_SOCKET_UTILS_COMMON */
bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* /*resolved_addr*/,
EventEngine::ResolvedAddress* /*resolved_addr4_out*/) {
GPR_ASSERT(false && "unimplemented");
}
bool SockaddrToV4Mapped(const EventEngine::ResolvedAddress* /*resolved_addr*/,
EventEngine::ResolvedAddress* /*resolved_addr6_out*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::StatusOr<std::string> SockaddrToString(
const EventEngine::ResolvedAddress* /*resolved_addr*/, bool /*normalize*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::StatusOr<int> PosixSocketWrapper::SetSocketRcvLowat(int /*bytes*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::Status PosixSocketWrapper::SetSocketZeroCopy() {
GPR_ASSERT(false && "unimplemented");
}
absl::Status PosixSocketWrapper::SetSocketNonBlocking(int /*non_blocking*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::Status PosixSocketWrapper::SetSocketCloexec(int /*close_on_exec*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::Status PosixSocketWrapper::SetSocketReuseAddr(int /*reuse*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::Status PosixSocketWrapper::SetSocketLowLatency(int /*low_latency*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::Status PosixSocketWrapper::SetSocketReusePort(int /*reuse*/) {
GPR_ASSERT(false && "unimplemented");
}
void PosixSocketWrapper::ConfigureDefaultTcpUserTimeout(bool /*enable*/,
int /*timeout*/,
bool /*is_client*/) {}
void PosixSocketWrapper::TrySetSocketTcpUserTimeout(
const PosixTcpOptions& /*options*/, bool /*is_client*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::Status PosixSocketWrapper::SetSocketNoSigpipeIfPossible() {
GPR_ASSERT(false && "unimplemented");
}
absl::Status PosixSocketWrapper::SetSocketIpPktInfoIfPossible() {
GPR_ASSERT(false && "unimplemented");
}
absl::Status PosixSocketWrapper::SetSocketIpv6RecvPktInfoIfPossible() {
GPR_ASSERT(false && "unimplemented");
}
absl::Status PosixSocketWrapper::SetSocketSndBuf(int /*buffer_size_bytes*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::Status PosixSocketWrapper::SetSocketRcvBuf(int /*buffer_size_bytes*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::Status PosixSocketWrapper::SetSocketMutator(
grpc_fd_usage /*usage*/, grpc_socket_mutator* /*mutator*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::Status PosixSocketWrapper::ApplySocketMutatorInOptions(
grpc_fd_usage /*usage*/, const PosixTcpOptions& /*options*/) {
GPR_ASSERT(false && "unimplemented");
}
bool PosixSocketWrapper::SetSocketDualStack() {
GPR_ASSERT(false && "unimplemented");
}
static bool PosixSocketWrapper::IsSocketReusePortSupported() {
GPR_ASSERT(false && "unimplemented");
}
static bool PosixSocketWrapper::IsIpv6LoopbackAvailable() {
GPR_ASSERT(false && "unimplemented");
}
static absl::StatusOr<PosixSocketWrapper>
PosixSocketWrapper::CreateDualStackSocket(
std::function<int(int /*domain*/, int /*type*/, int /*protocol*/)>
/* socket_factory */,
const experimental::EventEngine::ResolvedAddress& /*addr*/, int /*type*/,
int /*protocol*/, DSMode& /*dsmode*/) {
GPR_ASSERT(false && "unimplemented");
}
}
#endif /* GRPC_POSIX_SOCKET_UTILS_COMMON */
} // namespace posix_engine
} // namespace grpc_event_engine

@ -1,311 +0,0 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H
#define GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H
#include <grpc/support/port_platform.h>
#include <sys/socket.h>
#include <functional>
#include <string>
#include <utility>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/utility/utility.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/port.h"
#include "src/core/lib/iomgr/socket_mutator.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#ifdef GRPC_LINUX_ERRQUEUE
#ifndef SO_ZEROCOPY
#define SO_ZEROCOPY 60
#endif
#ifndef SO_EE_ORIGIN_ZEROCOPY
#define SO_EE_ORIGIN_ZEROCOPY 5
#endif
#endif /* ifdef GRPC_LINUX_ERRQUEUE */
namespace grpc_event_engine {
namespace posix_engine {
using ::grpc_event_engine::experimental::EventEngine;
struct PosixTcpOptions {
static constexpr int kDefaultReadChunkSize = 8192;
static constexpr int kDefaultMinReadChunksize = 256;
static constexpr int kDefaultMaxReadChunksize = 4 * 1024 * 1024;
static constexpr int kZerocpTxEnabledDefault = 0;
static constexpr int kMaxChunkSize = 32 * 1024 * 1024;
static constexpr int kDefaultMaxSends = 4;
static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024;
int tcp_read_chunk_size = kDefaultReadChunkSize;
int tcp_min_read_chunk_size = kDefaultMinReadChunksize;
int tcp_max_read_chunk_size = kDefaultMaxReadChunksize;
int tcp_tx_zerocopy_send_bytes_threshold = kDefaultSendBytesThreshold;
int tcp_tx_zerocopy_max_simultaneous_sends = kDefaultMaxSends;
bool tcp_tx_zero_copy_enabled = kZerocpTxEnabledDefault;
int keep_alive_time_ms = 0;
int keep_alive_timeout_ms = 0;
bool expand_wildcard_addrs = false;
bool allow_reuse_port = false;
grpc_core::RefCountedPtr<grpc_core::ResourceQuota> resource_quota;
struct grpc_socket_mutator* socket_mutator = nullptr;
PosixTcpOptions() = default;
// Move ctor
PosixTcpOptions(PosixTcpOptions&& other) noexcept {
socket_mutator = absl::exchange(other.socket_mutator, nullptr);
resource_quota = std::move(other.resource_quota);
CopyIntegerOptions(other);
}
// Move assignment
PosixTcpOptions& operator=(PosixTcpOptions&& other) noexcept {
if (socket_mutator != nullptr) {
grpc_socket_mutator_unref(socket_mutator);
}
socket_mutator = absl::exchange(other.socket_mutator, nullptr);
resource_quota = std::move(other.resource_quota);
CopyIntegerOptions(other);
return *this;
}
// Copy ctor
PosixTcpOptions(const PosixTcpOptions& other) {
if (other.socket_mutator != nullptr) {
socket_mutator = grpc_socket_mutator_ref(other.socket_mutator);
}
resource_quota = other.resource_quota;
CopyIntegerOptions(other);
}
// Copy assignment
PosixTcpOptions& operator=(const PosixTcpOptions& other) {
if (&other == this) {
return *this;
}
if (socket_mutator != nullptr) {
grpc_socket_mutator_unref(socket_mutator);
socket_mutator = nullptr;
}
if (other.socket_mutator != nullptr) {
socket_mutator = grpc_socket_mutator_ref(other.socket_mutator);
}
resource_quota = other.resource_quota;
CopyIntegerOptions(other);
return *this;
}
// Destructor.
~PosixTcpOptions() {
if (socket_mutator != nullptr) {
grpc_socket_mutator_unref(socket_mutator);
}
}
private:
void CopyIntegerOptions(const PosixTcpOptions& other) {
tcp_read_chunk_size = other.tcp_read_chunk_size;
tcp_min_read_chunk_size = other.tcp_min_read_chunk_size;
tcp_max_read_chunk_size = other.tcp_max_read_chunk_size;
tcp_tx_zerocopy_send_bytes_threshold =
other.tcp_tx_zerocopy_send_bytes_threshold;
tcp_tx_zerocopy_max_simultaneous_sends =
other.tcp_tx_zerocopy_max_simultaneous_sends;
tcp_tx_zero_copy_enabled = other.tcp_tx_zero_copy_enabled;
keep_alive_time_ms = other.keep_alive_time_ms;
keep_alive_timeout_ms = other.keep_alive_timeout_ms;
expand_wildcard_addrs = other.expand_wildcard_addrs;
allow_reuse_port = other.allow_reuse_port;
}
};
PosixTcpOptions TcpOptionsFromEndpointConfig(
const grpc_event_engine::experimental::EndpointConfig& config);
// a wrapper for accept or accept4
int Accept4(int sockfd,
grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr,
int nonblock, int cloexec);
// Returns true if resolved_addr is an IPv4-mapped IPv6 address within the
// ::ffff:0.0.0.0/96 range, or false otherwise.
// If resolved_addr4_out is non-NULL, the inner IPv4 address will be copied
// here when returning true.
bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* resolved_addr,
EventEngine::ResolvedAddress* resolved_addr4_out);
// If resolved_addr is an AF_INET address, writes the corresponding
// ::ffff:0.0.0.0/96 address to resolved_addr6_out and returns true. Otherwise
// returns false.
bool SockaddrToV4Mapped(const EventEngine::ResolvedAddress* resolved_addr,
EventEngine::ResolvedAddress* resolved_addr6_out);
// Converts a EventEngine::ResolvedAddress into a newly-allocated human-readable
// string.
//
// Currently, only the AF_INET, AF_INET6, and AF_UNIX families are recognized.
// If the normalize flag is enabled, ::ffff:0.0.0.0/96 IPv6 addresses are
// displayed as plain IPv4.
absl::StatusOr<std::string> SockaddrToString(
const EventEngine::ResolvedAddress* resolved_addr, bool normalize);
class PosixSocketWrapper {
public:
explicit PosixSocketWrapper(int fd) : fd_(fd) { GPR_ASSERT(fd_ > 0); }
~PosixSocketWrapper() = default;
// Instruct the kernel to wait for specified number of bytes to be received on
// the socket before generating an interrupt for packet receive. If the call
// succeeds, it returns the number of bytes (wait threshold) that was actually
// set.
absl::StatusOr<int> SetSocketRcvLowat(int bytes);
// Set socket to use zerocopy
absl::Status SetSocketZeroCopy();
// Set socket to non blocking mode
absl::Status SetSocketNonBlocking(int non_blocking);
// Set socket to close on exec
absl::Status SetSocketCloexec(int close_on_exec);
// Set socket to reuse old addresses
absl::Status SetSocketReuseAddr(int reuse);
// Disable nagle algorithm
absl::Status SetSocketLowLatency(int low_latency);
// Set SO_REUSEPORT
absl::Status SetSocketReusePort(int reuse);
// Override default Tcp user timeout values if necessary.
void TrySetSocketTcpUserTimeout(const PosixTcpOptions& options,
bool is_client);
// Tries to set SO_NOSIGPIPE if available on this platform.
// If SO_NO_SIGPIPE is not available, returns not OK status.
absl::Status SetSocketNoSigpipeIfPossible();
// Tries to set IP_PKTINFO if available on this platform. If IP_PKTINFO is not
// available, returns not OK status.
absl::Status SetSocketIpPktInfoIfPossible();
// Tries to set IPV6_RECVPKTINFO if available on this platform. If
// IPV6_RECVPKTINFO is not available, returns not OK status.
absl::Status SetSocketIpv6RecvPktInfoIfPossible();
// Tries to set the socket's send buffer to given size.
absl::Status SetSocketSndBuf(int buffer_size_bytes);
// Tries to set the socket's receive buffer to given size.
absl::Status SetSocketRcvBuf(int buffer_size_bytes);
// Tries to set the socket using a grpc_socket_mutator
absl::Status SetSocketMutator(grpc_fd_usage usage,
grpc_socket_mutator* mutator);
// Extracts the first socket mutator from config if any and applies on the fd.
absl::Status ApplySocketMutatorInOptions(grpc_fd_usage usage,
const PosixTcpOptions& options);
// Return LocalAddress as EventEngine::ResolvedAddress
absl::StatusOr<EventEngine::ResolvedAddress> LocalAddress();
// Return PeerAddress as EventEngine::ResolvedAddress
absl::StatusOr<EventEngine::ResolvedAddress> PeerAddress();
// Return LocalAddress as string
absl::StatusOr<std::string> LocalAddressString();
// Return PeerAddress as string
absl::StatusOr<std::string> PeerAddressString();
// An enum to keep track of IPv4/IPv6 socket modes.
// Currently, this information is only used when a socket is first created,
// but in the future we may wish to store it alongside the fd. This would let
// calls like sendto() know which family to use without asking the kernel
// first.
enum DSMode {
// Uninitialized, or a non-IP socket.
DSMODE_NONE,
// AF_INET only.
DSMODE_IPV4,
// AF_INET6 only, because IPV6_V6ONLY could not be cleared.
DSMODE_IPV6,
// AF_INET6, which also supports ::ffff-mapped IPv4 addresses.
DSMODE_DUALSTACK
};
// Tries to set the socket to dualstack. Returns true on success.
bool SetSocketDualStack();
// Returns the underlying file-descriptor.
int Fd() const { return fd_; }
// Static methods
// Configure default values for tcp user timeout to be used by client
// and server side sockets.
static void ConfigureDefaultTcpUserTimeout(bool enable, int timeout,
bool is_client);
// Return true if SO_REUSEPORT is supported
static bool IsSocketReusePortSupported();
// Returns true if this system can create AF_INET6 sockets bound to ::1.
// The value is probed once, and cached for the life of the process.
// This is more restrictive than checking for socket(AF_INET6) to succeed,
// because Linux with "net.ipv6.conf.all.disable_ipv6 = 1" is able to create
// and bind IPv6 sockets, but cannot connect to a getsockname() of [::]:port
// without a valid loopback interface. Rather than expose this half-broken
// state to library users, we turn off IPv6 sockets.
static bool IsIpv6LoopbackAvailable();
// Creates a new socket for connecting to (or listening on) an address.
// If addr is AF_INET6, this creates an IPv6 socket first. If that fails,
// and addr is within ::ffff:0.0.0.0/96, then it automatically falls back to
// an IPv4 socket.
// If addr is AF_INET, AF_UNIX, or anything else, then this is similar to
// calling socket() directly.
// Returns an PosixSocketWrapper on success, otherwise returns a not-OK
// absl::Status
// The dsmode output indicates which address family was actually created.
static absl::StatusOr<PosixSocketWrapper> CreateDualStackSocket(
std::function<int(int /*domain*/, int /*type*/, int /*protocol*/)>
socket_factory,
const experimental::EventEngine::ResolvedAddress& addr, int type,
int protocol, DSMode& dsmode);
private:
int fd_;
};
} // namespace posix_engine
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H

@ -130,21 +130,3 @@ grpc_cc_test(
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "tcp_posix_socket_utils_test",
srcs = ["tcp_posix_socket_utils_test.cc"],
external_deps = ["gtest"],
language = "C++",
tags = [
"no_windows",
],
uses_event_engine = False,
uses_polling = False,
deps = [
"//:event_engine_common",
"//:posix_event_engine_tcp_socket_utils",
"//:socket_mutator",
"//test/core/util:grpc_test_util",
],
)

@ -1,382 +0,0 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <sys/socket.h>
#include "absl/status/status.h"
#include "src/core/lib/iomgr/port.h"
// This test won't work except with posix sockets enabled
#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
#include <errno.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <string.h>
#ifdef GRPC_HAVE_UNIX_SOCKET
#include <sys/un.h>
#endif
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/iomgr/socket_mutator.h"
namespace grpc_event_engine {
namespace posix_engine {
namespace {
struct test_socket_mutator {
grpc_socket_mutator base;
int option_value;
};
bool MutateFd(int fd, grpc_socket_mutator* mutator) {
int newval;
socklen_t intlen = sizeof(newval);
struct test_socket_mutator* m =
reinterpret_cast<struct test_socket_mutator*>(mutator);
if (0 != setsockopt(fd, IPPROTO_IP, IP_TOS, &m->option_value,
sizeof(m->option_value))) {
return false;
}
if (0 != getsockopt(fd, IPPROTO_IP, IP_TOS, &newval, &intlen)) {
return false;
}
if (newval != m->option_value) {
return false;
}
return true;
}
bool MutateFd2(const grpc_mutate_socket_info* info,
grpc_socket_mutator* mutator) {
int newval;
socklen_t intlen = sizeof(newval);
struct test_socket_mutator* m =
reinterpret_cast<struct test_socket_mutator*>(mutator);
if (0 != setsockopt(info->fd, IPPROTO_IP, IP_TOS, &m->option_value,
sizeof(m->option_value))) {
return false;
}
if (0 != getsockopt(info->fd, IPPROTO_IP, IP_TOS, &newval, &intlen)) {
return false;
}
if (newval != m->option_value) {
return false;
}
return true;
}
void DestroyTestMutator(grpc_socket_mutator* mutator) {
struct test_socket_mutator* m =
reinterpret_cast<struct test_socket_mutator*>(mutator);
gpr_free(m);
}
int CompareTestMutator(grpc_socket_mutator* a, grpc_socket_mutator* b) {
struct test_socket_mutator* ma =
reinterpret_cast<struct test_socket_mutator*>(a);
struct test_socket_mutator* mb =
reinterpret_cast<struct test_socket_mutator*>(b);
return grpc_core::QsortCompare(ma->option_value, mb->option_value);
}
const grpc_socket_mutator_vtable mutator_vtable = {MutateFd, CompareTestMutator,
DestroyTestMutator, nullptr};
const grpc_socket_mutator_vtable mutator_vtable2 = {
nullptr, CompareTestMutator, DestroyTestMutator, MutateFd2};
const uint8_t kMapped[] = {0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0xff, 0xff, 192, 0, 2, 1};
const uint8_t kNotQuiteMapped[] = {0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0xff, 0xfe, 192, 0, 2, 99};
const uint8_t kIPv4[] = {192, 0, 2, 1};
const uint8_t kIPv6[] = {0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 1};
EventEngine::ResolvedAddress MakeAddr4(const uint8_t* data, size_t data_len) {
EventEngine::ResolvedAddress resolved_addr4;
sockaddr_in* addr4 = reinterpret_cast<sockaddr_in*>(
const_cast<sockaddr*>(resolved_addr4.address()));
memset(&resolved_addr4, 0, sizeof(resolved_addr4));
addr4->sin_family = AF_INET;
GPR_ASSERT(data_len == sizeof(addr4->sin_addr.s_addr));
memcpy(&addr4->sin_addr.s_addr, data, data_len);
addr4->sin_port = htons(12345);
return EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(addr4),
static_cast<socklen_t>(sizeof(sockaddr_in)));
}
EventEngine::ResolvedAddress MakeAddr6(const uint8_t* data, size_t data_len) {
EventEngine::ResolvedAddress resolved_addr6;
sockaddr_in6* addr6 = reinterpret_cast<sockaddr_in6*>(
const_cast<sockaddr*>(resolved_addr6.address()));
memset(&resolved_addr6, 0, sizeof(resolved_addr6));
addr6->sin6_family = AF_INET6;
GPR_ASSERT(data_len == sizeof(addr6->sin6_addr.s6_addr));
memcpy(&addr6->sin6_addr.s6_addr, data, data_len);
addr6->sin6_port = htons(12345);
return EventEngine::ResolvedAddress(
reinterpret_cast<sockaddr*>(addr6),
static_cast<socklen_t>(sizeof(sockaddr_in6)));
}
void SetIPv6ScopeId(EventEngine::ResolvedAddress* addr, uint32_t scope_id) {
sockaddr_in6* addr6 =
reinterpret_cast<sockaddr_in6*>(const_cast<sockaddr*>(addr->address()));
ASSERT_EQ(addr6->sin6_family, AF_INET6);
addr6->sin6_scope_id = scope_id;
}
#ifdef GRPC_HAVE_UNIX_SOCKET
absl::StatusOr<EventEngine::ResolvedAddress> UnixSockaddrPopulate(
absl::string_view path) {
EventEngine::ResolvedAddress resolved_addr;
memset(const_cast<sockaddr*>(resolved_addr.address()), 0,
resolved_addr.size());
struct sockaddr_un* un = reinterpret_cast<struct sockaddr_un*>(
const_cast<sockaddr*>(resolved_addr.address()));
const size_t maxlen = sizeof(un->sun_path) - 1;
if (path.size() > maxlen) {
return absl::InternalError(absl::StrCat(
"Path name should not have more than ", maxlen, " characters"));
}
un->sun_family = AF_UNIX;
path.copy(un->sun_path, path.size());
un->sun_path[path.size()] = '\0';
return EventEngine::ResolvedAddress(reinterpret_cast<sockaddr*>(un),
static_cast<socklen_t>(sizeof(*un)));
}
absl::StatusOr<EventEngine::ResolvedAddress> UnixAbstractSockaddrPopulate(
absl::string_view path) {
EventEngine::ResolvedAddress resolved_addr;
memset(const_cast<sockaddr*>(resolved_addr.address()), 0,
resolved_addr.size());
struct sockaddr* addr = const_cast<sockaddr*>(resolved_addr.address());
struct sockaddr_un* un = reinterpret_cast<struct sockaddr_un*>(addr);
const size_t maxlen = sizeof(un->sun_path) - 1;
if (path.size() > maxlen) {
return absl::InternalError(absl::StrCat(
"Path name should not have more than ", maxlen, " characters"));
}
un->sun_family = AF_UNIX;
un->sun_path[0] = '\0';
path.copy(un->sun_path + 1, path.size());
#ifdef GPR_APPLE
return EventEngine::ResolvedAddress(
addr, static_cast<socklen_t>(sizeof(un->sun_len) +
sizeof(un->sun_family) + path.size() + 1));
#else
return EventEngine::ResolvedAddress(
addr, static_cast<socklen_t>(sizeof(un->sun_family) + path.size() + 1));
#endif
}
#endif
} // namespace
TEST(TcpPosixSocketUtilsTest, SocketMutatorTest) {
auto test_with_vtable = [](const grpc_socket_mutator_vtable* vtable) {
int sock = socket(PF_INET, SOCK_STREAM, 0);
EXPECT_GT(sock, 0);
PosixSocketWrapper posix_sock(sock);
struct test_socket_mutator mutator;
grpc_socket_mutator_init(&mutator.base, vtable);
mutator.option_value = IPTOS_LOWDELAY;
EXPECT_TRUE(
posix_sock
.SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE,
reinterpret_cast<grpc_socket_mutator*>(&mutator))
.ok());
mutator.option_value = IPTOS_THROUGHPUT;
EXPECT_TRUE(
posix_sock
.SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE,
reinterpret_cast<grpc_socket_mutator*>(&mutator))
.ok());
mutator.option_value = IPTOS_RELIABILITY;
EXPECT_TRUE(
posix_sock
.SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE,
reinterpret_cast<grpc_socket_mutator*>(&mutator))
.ok());
mutator.option_value = -1;
EXPECT_FALSE(
posix_sock
.SetSocketMutator(GRPC_FD_CLIENT_CONNECTION_USAGE,
reinterpret_cast<grpc_socket_mutator*>(&mutator))
.ok());
close(sock);
};
test_with_vtable(&mutator_vtable);
test_with_vtable(&mutator_vtable2);
}
TEST(TcpPosixSocketUtilsTest, SocketOptionsTest) {
int sock = socket(PF_INET, SOCK_STREAM, 0);
EXPECT_GT(sock, 0);
PosixSocketWrapper posix_sock(sock);
EXPECT_TRUE(posix_sock.SetSocketNonBlocking(1).ok());
EXPECT_TRUE(posix_sock.SetSocketNonBlocking(0).ok());
EXPECT_TRUE(posix_sock.SetSocketCloexec(1).ok());
EXPECT_TRUE(posix_sock.SetSocketCloexec(0).ok());
EXPECT_TRUE(posix_sock.SetSocketReuseAddr(1).ok());
EXPECT_TRUE(posix_sock.SetSocketReuseAddr(0).ok());
EXPECT_TRUE(posix_sock.SetSocketLowLatency(1).ok());
EXPECT_TRUE(posix_sock.SetSocketLowLatency(0).ok());
close(sock);
}
TEST(SockAddrUtilsTest, SockAddrIsV4MappedTest) {
// v4mapped input should succeed.
EventEngine::ResolvedAddress input6 = MakeAddr6(kMapped, sizeof(kMapped));
ASSERT_TRUE(SockaddrIsV4Mapped(&input6, nullptr));
EventEngine::ResolvedAddress output4;
ASSERT_TRUE(SockaddrIsV4Mapped(&input6, &output4));
EventEngine::ResolvedAddress expect4 = MakeAddr4(kIPv4, sizeof(kIPv4));
ASSERT_EQ(memcmp(expect4.address(), output4.address(), expect4.size()), 0);
// Non-v4mapped input should fail.
input6 = MakeAddr6(kNotQuiteMapped, sizeof(kNotQuiteMapped));
ASSERT_FALSE(SockaddrIsV4Mapped(&input6, nullptr));
ASSERT_FALSE(SockaddrIsV4Mapped(&input6, &output4));
// Output is unchanged.
ASSERT_EQ(memcmp(expect4.address(), output4.address(), expect4.size()), 0);
// Plain IPv4 input should also fail.
EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4));
ASSERT_FALSE(SockaddrIsV4Mapped(&input4, nullptr));
}
TEST(TcpPosixSocketUtilsTest, SockAddrToV4MappedTest) {
// IPv4 input should succeed.
EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4));
EventEngine::ResolvedAddress output6;
ASSERT_TRUE(SockaddrToV4Mapped(&input4, &output6));
EventEngine::ResolvedAddress expect6 = MakeAddr6(kMapped, sizeof(kMapped));
ASSERT_EQ(memcmp(expect6.address(), output6.address(), output6.size()), 0);
// IPv6 input should fail.
EventEngine::ResolvedAddress input6 = MakeAddr6(kIPv6, sizeof(kIPv6));
ASSERT_TRUE(!SockaddrToV4Mapped(&input6, &output6));
// Output is unchanged.
ASSERT_EQ(memcmp(expect6.address(), output6.address(), output6.size()), 0);
// Already-v4mapped input should also fail.
input6 = MakeAddr6(kMapped, sizeof(kMapped));
ASSERT_TRUE(!SockaddrToV4Mapped(&input6, &output6));
}
TEST(TcpPosixSocketUtilsTest, SockAddrToStringTest) {
errno = 0x7EADBEEF;
EventEngine::ResolvedAddress input4 = MakeAddr4(kIPv4, sizeof(kIPv4));
EXPECT_EQ(SockaddrToString(&input4, false).value(), "192.0.2.1:12345");
EXPECT_EQ(SockaddrToString(&input4, true).value(), "192.0.2.1:12345");
EventEngine::ResolvedAddress input6 = MakeAddr6(kIPv6, sizeof(kIPv6));
EXPECT_EQ(SockaddrToString(&input6, false).value(), "[2001:db8::1]:12345");
EXPECT_EQ(SockaddrToString(&input6, true).value(), "[2001:db8::1]:12345");
SetIPv6ScopeId(&input6, 2);
EXPECT_EQ(SockaddrToString(&input6, false).value(), "[2001:db8::1%2]:12345");
EXPECT_EQ(SockaddrToString(&input6, true).value(), "[2001:db8::1%2]:12345");
SetIPv6ScopeId(&input6, 101);
EXPECT_EQ(SockaddrToString(&input6, false).value(),
"[2001:db8::1%101]:12345");
EXPECT_EQ(SockaddrToString(&input6, true).value(), "[2001:db8::1%101]:12345");
EventEngine::ResolvedAddress input6x = MakeAddr6(kMapped, sizeof(kMapped));
EXPECT_EQ(SockaddrToString(&input6x, false).value(),
"[::ffff:192.0.2.1]:12345");
EXPECT_EQ(SockaddrToString(&input6x, true).value(), "192.0.2.1:12345");
EventEngine::ResolvedAddress input6y =
MakeAddr6(kNotQuiteMapped, sizeof(kNotQuiteMapped));
EXPECT_EQ(SockaddrToString(&input6y, false).value(),
"[::fffe:c000:263]:12345");
EXPECT_EQ(SockaddrToString(&input6y, true).value(),
"[::fffe:c000:263]:12345");
EventEngine::ResolvedAddress phony;
memset(const_cast<sockaddr*>(phony.address()), 0, phony.size());
sockaddr* phony_addr = const_cast<sockaddr*>(phony.address());
phony_addr->sa_family = 123;
EXPECT_EQ(SockaddrToString(&phony, false).status(),
absl::InvalidArgumentError("Unknown sockaddr family: 123"));
EXPECT_EQ(SockaddrToString(&phony, true).status(),
absl::InvalidArgumentError("Unknown sockaddr family: 123"));
#ifdef GRPC_HAVE_UNIX_SOCKET
EventEngine::ResolvedAddress inputun =
*UnixSockaddrPopulate("/some/unix/path");
struct sockaddr_un* sock_un = reinterpret_cast<struct sockaddr_un*>(
const_cast<sockaddr*>(inputun.address()));
EXPECT_EQ(SockaddrToString(&inputun, true).value(), "/some/unix/path");
std::string max_filepath(sizeof(sock_un->sun_path) - 1, 'x');
inputun = *UnixSockaddrPopulate(max_filepath);
EXPECT_EQ(SockaddrToString(&inputun, true).value(), max_filepath);
inputun = *UnixSockaddrPopulate(max_filepath);
sock_un->sun_path[sizeof(sockaddr_un::sun_path) - 1] = 'x';
EXPECT_EQ(SockaddrToString(&inputun, true).status(),
absl::InvalidArgumentError("UDS path is not null-terminated"));
EventEngine::ResolvedAddress inputun2 =
*UnixAbstractSockaddrPopulate("some_unix_path");
EXPECT_EQ(SockaddrToString(&inputun2, true).value(),
absl::StrCat(std::string(1, '\0'), "some_unix_path"));
std::string max_abspath(sizeof(sock_un->sun_path) - 1, '\0');
EventEngine::ResolvedAddress inputun3 =
*UnixAbstractSockaddrPopulate(max_abspath);
EXPECT_EQ(SockaddrToString(&inputun3, true).value(),
absl::StrCat(std::string(1, '\0'), max_abspath));
#endif
}
} // namespace posix_engine
} // namespace grpc_event_engine
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else /* GRPC_POSIX_SOCKET_UTILS_COMMON */
int main(int argc, char** argv) { return 1; }
#endif /* GRPC_POSIX_SOCKET_UTILS_COMMON */

@ -6801,28 +6801,6 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "tcp_posix_socket_utils_test",
"platforms": [
"linux",
"mac",
"posix"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save