diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 4a97f3353d9..946846a8b87 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -21,6 +21,10 @@ #define _GNU_SOURCE #endif +#ifndef SO_RXQ_OVFL +#define SO_RXQ_OVFL 40 +#endif + #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET @@ -280,11 +284,10 @@ static int bind_socket(grpc_socket_factory* socket_factory, int sockfd, /* Prepare a recently-created socket for listening. */ static int prepare_socket(grpc_socket_factory* socket_factory, int fd, - const grpc_resolved_address* addr) { + const grpc_resolved_address* addr, int rcv_buf_size, + int snd_buf_size) { grpc_resolved_address sockname_temp; struct sockaddr* addr_ptr = (struct sockaddr*)addr->addr; - /* Set send/receive socket buffers to 1 MB */ - int buffer_size_bytes = 1024 * 1024; if (fd < 0) { goto error; @@ -325,18 +328,25 @@ static int prepare_socket(grpc_socket_factory* socket_factory, int fd, goto error; } - if (grpc_set_socket_sndbuf(fd, buffer_size_bytes) != GRPC_ERROR_NONE) { + if (grpc_set_socket_sndbuf(fd, snd_buf_size) != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Failed to set send buffer size to %d bytes", - buffer_size_bytes); + snd_buf_size); goto error; } - if (grpc_set_socket_rcvbuf(fd, buffer_size_bytes) != GRPC_ERROR_NONE) { + if (grpc_set_socket_rcvbuf(fd, rcv_buf_size) != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Failed to set receive buffer size to %d bytes", - buffer_size_bytes); + rcv_buf_size); goto error; } + { + int get_overflow = 1; + if (0 != setsockopt(fd, SOL_SOCKET, SO_RXQ_OVFL, &get_overflow, + sizeof(get_overflow))) { + gpr_log(GPR_INFO, "Failed to set socket overflow support"); + } + } return grpc_sockaddr_get_port(&sockname_temp); error: @@ -451,6 +461,7 @@ static void on_write(void* arg, grpc_error* error) { static int add_socket_to_server(grpc_udp_server* s, int fd, const grpc_resolved_address* addr, + int rcv_buf_size, int snd_buf_size, grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, @@ -460,7 +471,8 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, char* addr_str; char* name; - port = prepare_socket(s->socket_factory, fd, addr); + port = + prepare_socket(s->socket_factory, fd, addr, rcv_buf_size, snd_buf_size); if (port >= 0) { grpc_sockaddr_to_string(&addr_str, addr, 1); gpr_asprintf(&name, "udp-server-listener:%s", addr_str); @@ -495,6 +507,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + int rcv_buf_size, int snd_buf_size, grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, @@ -545,8 +558,9 @@ int grpc_udp_server_add_port(grpc_udp_server* s, // TODO(rjshade): Test and propagate the returned grpc_error*: GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); - allocated_port1 = add_socket_to_server(s, fd, addr, start_cb, read_cb, - write_cb, orphan_cb); + allocated_port1 = + add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb, + read_cb, write_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -569,7 +583,8 @@ int grpc_udp_server_add_port(grpc_udp_server* s, addr = &addr4_copy; } allocated_port2 = - add_socket_to_server(s, fd, addr, start_cb, read_cb, write_cb, orphan_cb); + add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb, + read_cb, write_cb, orphan_cb); done: gpr_free(allocated_addr); diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index a469ab9be57..c1aa49f15dd 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -68,6 +68,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index); all of the multiple socket port matching logic in one place */ int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + int rcv_buf_size, int snd_buf_size, grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc index dc1248bc1c9..09f02830138 100644 --- a/test/core/iomgr/udp_server_test.cc +++ b/test/core/iomgr/udp_server_test.cc @@ -51,6 +51,9 @@ static int g_number_of_bytes_read = 0; static int g_number_of_orphan_calls = 0; static int g_number_of_starts = 0; +int rcv_buf_size = 1024; +int snd_buf_size = 1024; + static void on_start(grpc_fd* emfd, void* user_data) { g_number_of_starts++; } static bool on_read(grpc_fd* emfd) { @@ -177,8 +180,9 @@ static void test_no_op_with_port(void) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, - on_write, on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, + snd_buf_size, on_start, on_read, on_write, + on_fd_orphaned)); grpc_udp_server_destroy(s, nullptr); @@ -207,8 +211,9 @@ static void test_no_op_with_port_and_socket_factory(void) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, - on_write, on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, + snd_buf_size, on_start, on_read, on_write, + on_fd_orphaned)); GPR_ASSERT(socket_factory->number_of_socket_calls == 1); GPR_ASSERT(socket_factory->number_of_bind_calls == 1); @@ -233,8 +238,9 @@ static void test_no_op_with_port_and_start(void) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, - on_write, on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, + snd_buf_size, on_start, on_read, on_write, + on_fd_orphaned)); grpc_udp_server_start(s, nullptr, 0, nullptr); GPR_ASSERT(g_number_of_starts == 1); @@ -265,8 +271,9 @@ static void test_receive(int number_of_clients) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_storage); addr->ss_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, - on_write, on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, + snd_buf_size, on_start, on_read, on_write, + on_fd_orphaned)); svrfd = grpc_udp_server_get_fd(s, 0); GPR_ASSERT(svrfd >= 0);