diff --git a/src/core/lib/iomgr/tcp_custom.h b/src/core/lib/iomgr/tcp_custom.h index 784ef842221..9c81e85012e 100644 --- a/src/core/lib/iomgr/tcp_custom.h +++ b/src/core/lib/iomgr/tcp_custom.h @@ -24,6 +24,8 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/sockaddr.h" +#define GRPC_CUSTOM_SOCKET_OPT_SO_REUSEPORT (0x00000010u) + typedef struct grpc_tcp_listener grpc_tcp_listener; typedef struct grpc_custom_tcp_connect grpc_custom_tcp_connect; diff --git a/src/core/lib/iomgr/tcp_server_custom.cc b/src/core/lib/iomgr/tcp_server_custom.cc index 2df94adb5fc..bd4becba127 100644 --- a/src/core/lib/iomgr/tcp_server_custom.cc +++ b/src/core/lib/iomgr/tcp_server_custom.cc @@ -31,6 +31,7 @@ #include "src/core/lib/iomgr/iomgr_custom.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_custom.h" #include "src/core/lib/iomgr/tcp_server.h" @@ -72,6 +73,7 @@ struct grpc_tcp_server { grpc_closure* shutdown_complete; bool shutdown; + bool so_reuseport; grpc_resource_quota* resource_quota; }; @@ -80,9 +82,19 @@ static grpc_error* tcp_server_create(grpc_closure* shutdown_complete, const grpc_channel_args* args, grpc_tcp_server** server) { grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server)); + s->so_reuseport = grpc_is_socket_reuse_port_supported(); s->resource_quota = grpc_resource_quota_create(nullptr); for (size_t i = 0; i < (args == nullptr ? 0 : args->num_args); i++) { - if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { + if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) { + if (args->args[i].type == GRPC_ARG_INTEGER) { + s->so_reuseport = grpc_is_socket_reuse_port_supported() && + (args->args[i].value.integer != 0); + } else { + gpr_free(s); + return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT + " must be an integer"); + } + } else if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_POINTER) { grpc_resource_quota_unref_internal(s->resource_quota); s->resource_quota = grpc_resource_quota_ref_internal( @@ -280,9 +292,15 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, grpc_error* error; grpc_resolved_address sockname_temp; - // The last argument to uv_tcp_bind is flags + // NOTE(lidiz) The last argument is flags unused by other implementations. + // Use it to specify SO_REUSEPORT for Python IO managers. + int flags = 0; + if (s->so_reuseport) { + flags |= GRPC_CUSTOM_SOCKET_OPT_SO_REUSEPORT; + } + error = grpc_custom_socket_vtable->bind(socket, (grpc_sockaddr*)addr->addr, - addr->len, 0); + addr->len, flags); if (error != GRPC_ERROR_NONE) { return error; } diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi index 121efb02d65..b44ce35d416 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi @@ -122,11 +122,11 @@ cdef grpc_error* asyncio_socket_listen(grpc_custom_socket* grpc_socket) with gil return grpc_error_none() -def _asyncio_apply_socket_options(object socket): - # TODO(https://github.com/grpc/grpc/issues/20667) - # Connects the so_reuse_port option to channel arguments - socket.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEADDR, 1) - socket.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True) +def _asyncio_apply_socket_options(object s, int flags): + s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEADDR, 1) + if GRPC_CUSTOM_SOCKET_OPT_SO_REUSEPORT & flags: + s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEPORT, 1) + s.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True) cdef grpc_error* asyncio_socket_bind( @@ -142,7 +142,7 @@ cdef grpc_error* asyncio_socket_bind( family = native_socket.AF_INET socket = native_socket.socket(family=family) - _asyncio_apply_socket_options(socket) + _asyncio_apply_socket_options(socket, flags) socket.bind((host, port)) except IOError as io_error: return socket_error("bind", str(io_error)) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi index 11ac401bfe4..7bf90c38884 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi @@ -54,6 +54,8 @@ cdef extern from "src/core/lib/iomgr/resolve_address_custom.h": grpc_error* error); cdef extern from "src/core/lib/iomgr/tcp_custom.h": + cdef int GRPC_CUSTOM_SOCKET_OPT_SO_REUSEPORT + struct grpc_custom_socket: void* impl # We don't care about the rest of the fields diff --git a/src/python/grpcio_tests/tests_aio/unit/channel_argument_test.py b/src/python/grpcio_tests/tests_aio/unit/channel_argument_test.py index 6037bade998..e90d8b09505 100644 --- a/src/python/grpcio_tests/tests_aio/unit/channel_argument_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/channel_argument_test.py @@ -89,7 +89,6 @@ class TestChannelArgument(AioTestBase): async def setUp(self): random.seed(_RANDOM_SEED) - @unittest.skip('https://github.com/grpc/grpc/issues/20667') @unittest.skipIf(platform.system() == 'Windows', 'SO_REUSEPORT only available in Linux-like OS.') async def test_server_so_reuse_port_is_set_properly(self):