diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 3b23b47d4f1..02a194c9825 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -76,8 +76,10 @@ struct grpc_udp_listener { grpc_udp_server *server; grpc_resolved_address addr; grpc_closure read_closure; + grpc_closure write_closure; grpc_closure destroyed_closure; grpc_udp_server_read_cb read_cb; + grpc_udp_server_write_cb write_cb; grpc_udp_server_orphan_cb orphan_cb; struct grpc_udp_listener *next; @@ -304,9 +306,33 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_mu_unlock(&sp->server->mu); } +static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + grpc_udp_listener *sp = arg; + + gpr_mu_lock(&(sp->server->mu)); + if (error != GRPC_ERROR_NONE) { + if (0 == --sp->server->active_ports) { + gpr_mu_unlock(&sp->server->mu); + deactivated_all_ports(exec_ctx, sp->server); + } else { + gpr_mu_unlock(&sp->server->mu); + } + return; + } + + /* Tell the registered callback that the socket is writeable. */ + GPR_ASSERT(sp->write_cb); + sp->write_cb(exec_ctx, sp->emfd); + + /* Re-arm the notification event so we get another chance to write. */ + grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + gpr_mu_unlock(&sp->server->mu); +} + static int add_socket_to_server(grpc_udp_server *s, int fd, const grpc_resolved_address *addr, grpc_udp_server_read_cb read_cb, + grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { grpc_udp_listener *sp; int port; @@ -333,6 +359,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, sp->emfd = grpc_fd_create(fd, name); memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); sp->read_cb = read_cb; + sp->write_cb = write_cb; sp->orphan_cb = orphan_cb; GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); @@ -345,6 +372,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, int grpc_udp_server_add_port(grpc_udp_server *s, const grpc_resolved_address *addr, grpc_udp_server_read_cb read_cb, + grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { grpc_udp_listener *sp; int allocated_port1 = -1; @@ -391,7 +419,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, // TODO(rjshade): Test and propagate the returned grpc_error*: GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); - allocated_port1 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb); + allocated_port1 = + add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -413,7 +442,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { addr = &addr4_copy; } - allocated_port2 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb); + allocated_port2 = + add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); done: gpr_free(allocated_addr); @@ -451,6 +481,10 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_schedule_on_exec_ctx); grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + grpc_closure_init(&sp->write_closure, on_write, sp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + s->active_ports++; sp = sp->next; } diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index f3c466a031d..ce068cbf04c 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -49,6 +49,10 @@ typedef struct grpc_udp_server grpc_udp_server; typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, struct grpc_server *server); +/* Called when the socket is writeable. */ +typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx, + grpc_fd *emfd); + /* Called when the grpc_fd is about to be orphaned (and the FD closed). */ typedef void (*grpc_udp_server_orphan_cb)(grpc_fd *emfd); @@ -75,6 +79,7 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index); int grpc_udp_server_add_port(grpc_udp_server *s, const grpc_resolved_address *addr, grpc_udp_server_read_cb read_cb, + grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb); void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server, diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c index 9ec6d7bf207..ba7a52ea21c 100644 --- a/test/core/iomgr/udp_server_test.c +++ b/test/core/iomgr/udp_server_test.c @@ -58,6 +58,7 @@ static grpc_pollset *g_pollset; static gpr_mu *g_mu; static int g_number_of_reads = 0; +static int g_number_of_writes = 0; static int g_number_of_bytes_read = 0; static int g_number_of_orphan_calls = 0; @@ -78,6 +79,15 @@ static void on_read(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, gpr_mu_unlock(g_mu); } +static void on_write(grpc_exec_ctx *exec_ctx, grpc_fd *emfd) { + gpr_mu_lock(g_mu); + g_number_of_writes++; + + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); + gpr_mu_unlock(g_mu); +} + static void on_fd_orphaned(grpc_fd *emfd) { gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d", grpc_fd_wrapped_fd(emfd)); @@ -111,8 +121,8 @@ static void test_no_op_with_port(void) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; - GPR_ASSERT( - grpc_udp_server_add_port(s, &resolved_addr, on_read, on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write, + on_fd_orphaned)); grpc_udp_server_destroy(&exec_ctx, s, NULL); grpc_exec_ctx_finish(&exec_ctx); @@ -132,8 +142,8 @@ static void test_no_op_with_port_and_start(void) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; - GPR_ASSERT( - grpc_udp_server_add_port(s, &resolved_addr, on_read, on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write, + on_fd_orphaned)); grpc_udp_server_start(&exec_ctx, s, NULL, 0, NULL); @@ -164,8 +174,8 @@ static void test_receive(int number_of_clients) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_storage); addr->ss_family = AF_INET; - GPR_ASSERT( - grpc_udp_server_add_port(s, &resolved_addr, on_read, on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write, + on_fd_orphaned)); svrfd = grpc_udp_server_get_fd(s, 0); GPR_ASSERT(svrfd >= 0); @@ -212,6 +222,9 @@ static void test_receive(int number_of_clients) { /* The server had a single FD, which is orphaned once in * * deactivated_all_ports, and once in grpc_udp_server_destroy. */ GPR_ASSERT(g_number_of_orphan_calls == 2); + + /* The write callback should have fired a few times. */ + GPR_ASSERT(g_number_of_writes > 0); } static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,