diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc index c52e237fa83..04a17677312 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.cc +++ b/src/core/lib/iomgr/socket_utils_common_posix.cc @@ -181,6 +181,30 @@ grpc_error* grpc_set_socket_reuse_port(int fd, int reuse) { #endif } +static gpr_once g_probe_so_reuesport_once = GPR_ONCE_INIT; +static int g_support_so_reuseport = false; + +void probe_so_reuseport_once(void) { +#ifndef GPR_MANYLINUX1 + int s = socket(AF_INET, SOCK_STREAM, 0); + if (s < 0) { + /* This might be an ipv6-only environment in which case 'socket(AF_INET,..)' + call would fail. Try creating IPv6 socket in that case */ + s = socket(AF_INET6, SOCK_STREAM, 0); + } + if (s >= 0) { + g_support_so_reuseport = GRPC_LOG_IF_ERROR( + "check for SO_REUSEPORT", grpc_set_socket_reuse_port(s, 1)); + close(s); + } +#endif +} + +bool grpc_is_socket_reuse_port_supported() { + gpr_once_init(&g_probe_so_reuesport_once, probe_so_reuseport_once); + return g_support_so_reuseport; +} + /* disable nagle */ grpc_error* grpc_set_socket_low_latency(int fd, int low_latency) { int val = (low_latency != 0); diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index 1f50e8d3156..b3fd58a5302 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -44,6 +44,9 @@ grpc_error* grpc_set_socket_cloexec(int fd, int close_on_exec); /* set a socket to reuse old addresses */ grpc_error* grpc_set_socket_reuse_addr(int fd, int reuse); +/* return true if SO_REUSEPORT is supported */ +bool grpc_is_socket_reuse_port_supported(); + /* disable nagle */ grpc_error* grpc_set_socket_low_latency(int fd, int low_latency); diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 4e1d90e86a2..f11b82f7ab1 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -55,39 +55,18 @@ #include "src/core/lib/iomgr/tcp_server_utils_posix.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" -static gpr_once check_init = GPR_ONCE_INIT; -static bool has_so_reuseport = false; - -static void init(void) { -#ifndef GPR_MANYLINUX1 - int s = socket(AF_INET, SOCK_STREAM, 0); - if (s < 0) { - /* This might be an ipv6-only environment in which case 'socket(AF_INET,..)' - call would fail. Try creating IPv6 socket in that case */ - s = socket(AF_INET6, SOCK_STREAM, 0); - } - if (s >= 0) { - has_so_reuseport = GRPC_LOG_IF_ERROR("check for SO_REUSEPORT", - grpc_set_socket_reuse_port(s, 1)); - close(s); - } -#endif -} - static grpc_error* tcp_server_create(grpc_closure* shutdown_complete, const grpc_channel_args* args, grpc_tcp_server** server) { - gpr_once_init(&check_init, init); - grpc_tcp_server* s = static_cast(gpr_zalloc(sizeof(grpc_tcp_server))); - s->so_reuseport = has_so_reuseport; + s->so_reuseport = grpc_is_socket_reuse_port_supported(); s->expand_wildcard_addrs = false; for (size_t i = 0; i < (args == nullptr ? 0 : args->num_args); i++) { if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_INTEGER) { - s->so_reuseport = - has_so_reuseport && (args->args[i].value.integer != 0); + s->so_reuseport = grpc_is_socket_reuse_port_supported() && + (args->args[i].value.integer != 0); } else { gpr_free(s); return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 9990deec7a3..51d17eb1749 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -191,6 +191,9 @@ struct grpc_udp_server { size_t pollset_count; /* opaque object to pass to callbacks */ void* user_data; + + /* latch has_so_reuseport during server creation */ + bool so_reuseport; }; static grpc_socket_factory* get_socket_factory(const grpc_channel_args* args) { @@ -214,6 +217,7 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; + s->so_reuseport = grpc_is_socket_reuse_port_supported(); return s; } @@ -353,7 +357,7 @@ static int bind_socket(grpc_socket_factory* socket_factory, int sockfd, /* Prepare a recently-created socket for listening. */ static int prepare_socket(grpc_socket_factory* socket_factory, int fd, const grpc_resolved_address* addr, int rcv_buf_size, - int snd_buf_size) { + int snd_buf_size, bool so_reuseport) { grpc_resolved_address sockname_temp; grpc_sockaddr* addr_ptr = reinterpret_cast(const_cast(addr->addr)); @@ -381,21 +385,6 @@ static int prepare_socket(grpc_socket_factory* socket_factory, int fd, } } - if (bind_socket(socket_factory, fd, addr) < 0) { - char* addr_str; - grpc_sockaddr_to_string(&addr_str, addr, 0); - gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); - gpr_free(addr_str); - goto error; - } - - sockname_temp.len = static_cast(sizeof(struct sockaddr_storage)); - - if (getsockname(fd, reinterpret_cast(sockname_temp.addr), - &sockname_temp.len) < 0) { - goto error; - } - if (grpc_set_socket_sndbuf(fd, snd_buf_size) != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Failed to set send buffer size to %d bytes", snd_buf_size); @@ -415,6 +404,30 @@ static int prepare_socket(grpc_socket_factory* socket_factory, int fd, gpr_log(GPR_INFO, "Failed to set socket overflow support"); } } + + if (so_reuseport && !grpc_is_unix_socket(addr) && + grpc_set_socket_reuse_port(fd, 1) != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Failed to set SO_REUSEPORT for fd %d", fd); + goto error; + } + + if (bind_socket(socket_factory, fd, addr) < 0) { + char* addr_str; + grpc_sockaddr_to_string(&addr_str, addr, 0); + gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); + gpr_free(addr_str); + goto error; + } + + sockname_temp.len = static_cast(sizeof(struct sockaddr_storage)); + + if (getsockname(fd, reinterpret_cast(sockname_temp.addr), + &sockname_temp.len) < 0) { + gpr_log(GPR_ERROR, "Unable to get the address socket %d is bound to: %s", + fd, strerror(errno)); + goto error; + } + return grpc_sockaddr_get_port(&sockname_temp); error: @@ -541,8 +554,8 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, int rcv_buf_size, int snd_buf_size) { gpr_log(GPR_DEBUG, "add socket %d to server", fd); - int port = - prepare_socket(s->socket_factory, fd, addr, rcv_buf_size, snd_buf_size); + int port = prepare_socket(s->socket_factory, fd, addr, rcv_buf_size, + snd_buf_size, s->so_reuseport); if (port >= 0) { gpr_mu_lock(&s->mu); s->listeners.emplace_back(s, fd, addr); @@ -557,7 +570,18 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, int rcv_buf_size, int snd_buf_size, - GrpcUdpHandlerFactory* handler_factory) { + GrpcUdpHandlerFactory* handler_factory, + size_t num_listeners) { + if (num_listeners > 1 && !s->so_reuseport) { + gpr_log(GPR_ERROR, + "Try to have multiple listeners on same port, but SO_REUSEPORT is " + "not supported. Only create 1 listener."); + } + char* addr_str; + grpc_sockaddr_to_string(&addr_str, addr, 1); + gpr_log(GPR_DEBUG, "add address: %s to server", addr_str); + gpr_free(addr_str); + int allocated_port1 = -1; int allocated_port2 = -1; int fd; @@ -568,11 +592,12 @@ int grpc_udp_server_add_port(grpc_udp_server* s, grpc_resolved_address addr4_copy; grpc_resolved_address* allocated_addr = nullptr; grpc_resolved_address sockname_temp; - int port; + int port = 0; /* Check if this is a wildcard port, and if so, try to keep the port the same as some previously created listener. */ if (grpc_sockaddr_get_port(addr) == 0) { + /* Loop through existing listeners to find the port in use. */ for (size_t i = 0; i < s->listeners.size(); ++i) { sockname_temp.len = static_cast(sizeof(struct sockaddr_storage)); @@ -581,6 +606,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, &sockname_temp.len)) { port = grpc_sockaddr_get_port(&sockname_temp); if (port > 0) { + /* Found such a port, update |addr| to reflects this port. */ allocated_addr = static_cast( gpr_malloc(sizeof(grpc_resolved_address))); memcpy(allocated_addr, addr, sizeof(grpc_resolved_address)); @@ -597,44 +623,73 @@ int grpc_udp_server_add_port(grpc_udp_server* s, } s->handler_factory = handler_factory; - /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ - if (grpc_sockaddr_is_wildcard(addr, &port)) { - grpc_sockaddr_make_wildcards(port, &wild4, &wild6); + for (size_t i = 0; i < num_listeners; ++i) { + /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ + if (grpc_sockaddr_is_wildcard(addr, &port)) { + grpc_sockaddr_make_wildcards(port, &wild4, &wild6); + + /* Try listening on IPv6 first. */ + addr = &wild6; + // TODO(rjshade): Test and propagate the returned grpc_error*: + GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( + s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); + allocated_port1 = + add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size); + if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { + if (port == 0) { + /* This is the first time to bind to |addr|. If its port is still + * wildcard port, update |addr| with the ephermeral port returned by + * kernel. Thus |addr| can have a specific port in following + * iterations. */ + grpc_sockaddr_set_port(addr, allocated_port1); + port = allocated_port1; + } else if (allocated_port1 >= 0) { + /* The following sucessfully created socket should have same port as + * the first one. */ + GPR_ASSERT(port == allocated_port1); + } + /* A dualstack socket is created, no need to create corresponding IPV4 + * socket. */ + continue; + } + + /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ + if (port == 0 && allocated_port1 > 0) { + /* |port| hasn't been assigned to an emphemeral port yet, |wild4| must + * have a wildcard port. Update it with the emphemeral port created + * during binding.*/ + grpc_sockaddr_set_port(&wild4, allocated_port1); + port = allocated_port1; + } + /* |wild4| should have been updated with an emphemeral port by now. Use + * this IPV4 address to create a IPV4 socket. */ + addr = &wild4; + } - /* Try listening on IPv6 first. */ - addr = &wild6; // TODO(rjshade): Test and propagate the returned grpc_error*: GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); - allocated_port1 = - add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size); - if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { - goto done; + if (fd < 0) { + gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); } - - /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ - if (port == 0 && allocated_port1 > 0) { - grpc_sockaddr_set_port(&wild4, allocated_port1); + if (dsmode == GRPC_DSMODE_IPV4 && + grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { + addr = &addr4_copy; + } + allocated_port2 = + add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size); + if (port == 0) { + /* Update |addr| with the ephermeral port returned by kernel. So |addr| + * can have a specific port in following iterations. */ + grpc_sockaddr_set_port(addr, allocated_port2); + port = allocated_port2; + } else if (allocated_port2 >= 0) { + GPR_ASSERT(port == allocated_port2); } - addr = &wild4; - } - - // TODO(rjshade): Test and propagate the returned grpc_error*: - GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( - s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); - if (fd < 0) { - gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); - } - if (dsmode == GRPC_DSMODE_IPV4 && - grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { - addr = &addr4_copy; } - allocated_port2 = - add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size); -done: gpr_free(allocated_addr); - return allocated_port1 >= 0 ? allocated_port1 : allocated_port2; + return port; } int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) { diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 4e384d2cdf6..3656791c1f8 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -86,17 +86,21 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index); /* Add a port to the server, returning port number on success, or negative on failure. + Create |num_listeners| sockets for given address to listen on using + SO_REUSEPORT if supported. + The :: and 0.0.0.0 wildcard addresses are treated identically, accepting - both IPv4 and IPv6 connections, but :: is the preferred style. This usually - creates one socket, but possibly two on systems which support IPv6, - but not dualstack sockets. */ + both IPv4 and IPv6 connections, but :: is the preferred style. This usually + creates |num_listeners| sockets, but possibly 2 * |num_listeners| on systems + which support IPv6, but not dualstack sockets. */ /* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle all of the multiple socket port matching logic in one place */ int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, int rcv_buf_size, int snd_buf_size, - GrpcUdpHandlerFactory* handler_factory); + GrpcUdpHandlerFactory* handler_factory, + size_t num_listeners); void grpc_udp_server_destroy(grpc_udp_server* server, grpc_closure* on_done); diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc index 3058e87bea3..d167c0131f3 100644 --- a/test/core/iomgr/udp_server_test.cc +++ b/test/core/iomgr/udp_server_test.cc @@ -40,6 +40,7 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/socket_factory_posix.h" +#include "src/core/lib/iomgr/socket_utils_posix.h" #include "test/core/util/test_config.h" #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x) @@ -55,6 +56,8 @@ static int g_number_of_starts = 0; int rcv_buf_size = 1024; int snd_buf_size = 1024; +static int g_num_listeners = 1; + class TestGrpcUdpHandler : public GrpcUdpHandler { public: TestGrpcUdpHandler(grpc_fd* emfd, void* user_data) @@ -75,6 +78,7 @@ class TestGrpcUdpHandler : public GrpcUdpHandler { g_number_of_reads++; g_number_of_bytes_read += static_cast(byte_count); + gpr_log(GPR_DEBUG, "receive %zu on handler %p", byte_count, this); GPR_ASSERT(GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); gpr_mu_unlock(g_mu); @@ -213,7 +217,8 @@ static void test_no_op_with_port(void) { resolved_addr.len = static_cast(sizeof(struct sockaddr_in)); addr->sin_family = AF_INET; GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, - snd_buf_size, &handler_factory)); + snd_buf_size, &handler_factory, + g_num_listeners)); grpc_udp_server_destroy(s, nullptr); @@ -244,9 +249,10 @@ static void test_no_op_with_port_and_socket_factory(void) { resolved_addr.len = static_cast(sizeof(struct sockaddr_in)); addr->sin_family = AF_INET; GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, - snd_buf_size, &handler_factory)); - GPR_ASSERT(socket_factory->number_of_socket_calls == 1); - GPR_ASSERT(socket_factory->number_of_bind_calls == 1); + snd_buf_size, &handler_factory, + g_num_listeners)); + GPR_ASSERT(socket_factory->number_of_socket_calls == g_num_listeners); + GPR_ASSERT(socket_factory->number_of_bind_calls == g_num_listeners); grpc_udp_server_destroy(s, nullptr); @@ -271,15 +277,16 @@ static void test_no_op_with_port_and_start(void) { resolved_addr.len = static_cast(sizeof(struct sockaddr_in)); addr->sin_family = AF_INET; GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, - snd_buf_size, &handler_factory)); + snd_buf_size, &handler_factory, + g_num_listeners)); grpc_udp_server_start(s, nullptr, 0, nullptr); - GPR_ASSERT(g_number_of_starts == 1); + GPR_ASSERT(g_number_of_starts == g_num_listeners); grpc_udp_server_destroy(s, nullptr); /* The server had a single FD, which is orphaned exactly once in * * grpc_udp_server_destroy. */ - GPR_ASSERT(g_number_of_orphan_calls == 1); + GPR_ASSERT(g_number_of_orphan_calls == g_num_listeners); shutdown_and_destroy_pollset(); } @@ -304,7 +311,8 @@ static void test_receive(int number_of_clients) { resolved_addr.len = static_cast(sizeof(struct sockaddr_storage)); addr->ss_family = AF_INET; GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, - snd_buf_size, &handler_factory)); + snd_buf_size, &handler_factory, + g_num_listeners)); svrfd = grpc_udp_server_get_fd(s, 0); GPR_ASSERT(svrfd >= 0); @@ -347,13 +355,16 @@ static void test_receive(int number_of_clients) { /* The server had a single FD, which is orphaned exactly once in * * grpc_udp_server_destroy. */ - GPR_ASSERT(g_number_of_orphan_calls == 1); + GPR_ASSERT(g_number_of_orphan_calls == g_num_listeners); shutdown_and_destroy_pollset(); } int main(int argc, char** argv) { grpc_test_init(argc, argv); grpc_init(); + if (grpc_is_socket_reuse_port_supported()) { + g_num_listeners = 10; + } { grpc_core::ExecCtx exec_ctx; g_pollset = static_cast(gpr_zalloc(grpc_pollset_size()));