diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 90fae6b94fc..bdf26d40970 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -89,7 +89,6 @@ struct grpc_udp_listener { /* the overall server */ struct grpc_udp_server { gpr_mu mu; - gpr_cv cv; /* active port count: how many ports are actually still listening */ size_t active_ports; @@ -118,7 +117,6 @@ struct grpc_udp_server { grpc_udp_server *grpc_udp_server_create(void) { grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server)); gpr_mu_init(&s->mu); - gpr_cv_init(&s->cv); s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; @@ -130,15 +128,15 @@ grpc_udp_server *grpc_udp_server_create(void) { } static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { - grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + if (s->shutdown_complete != NULL) { + grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + } - gpr_cv_destroy(&s->cv); gpr_mu_destroy(&s->mu); while (s->head) { grpc_udp_listener *sp = s->head; s->head = sp->next; - gpr_free(sp); } @@ -162,8 +160,6 @@ static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, events will be received on them - at this point it's safe to destroy things */ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { - size_t i; - /* delete ALL the things */ gpr_mu_lock(&s->mu); @@ -175,6 +171,8 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { if (s->head) { grpc_udp_listener *sp; for (sp = s->head; sp; sp = sp->next) { + // grpc_unlink_if_unix_domain_socket(&sp->addr.sockaddr); + sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; @@ -193,7 +191,6 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_closure *on_done) { - size_t i; grpc_udp_listener* sp; gpr_mu_lock(&s->mu); @@ -286,6 +283,7 @@ error: static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_udp_listener *sp = arg; + gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { if (0 == --sp->server->active_ports) { gpr_mu_unlock(&sp->server->mu); @@ -302,6 +300,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* Re-arm the notification event so we get another chance to read. */ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + gpr_mu_unlock(&sp->server->mu); } static int add_socket_to_server(grpc_udp_server *s, int fd, @@ -349,7 +348,6 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, grpc_udp_listener* sp; int allocated_port1 = -1; int allocated_port2 = -1; - unsigned i; int fd; grpc_dualstack_mode dsmode; struct sockaddr_in6 addr6_v4mapped; @@ -426,10 +424,22 @@ done: return allocated_port1 >= 0 ? allocated_port1 : allocated_port2; } +int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) { + grpc_udp_listener *sp; + if (port_index >= s->nports) { + return -1; + } + + for (sp = s->head; sp && port_index != 0; sp = sp->next) { + --port_index; + } + return sp->fd; +} + void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_pollset **pollsets, size_t pollset_count, grpc_server *server) { - size_t i, j; + size_t i; gpr_mu_lock(&s->mu); grpc_udp_listener *sp; GPR_ASSERT(s->active_ports == 0); diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 33c5ce11cda..70d0f19454e 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -59,7 +59,7 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server, grpc_pollset **pollsets, size_t pollset_count, struct grpc_server *server); -int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); +int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index); /* Add a port to the server, returning port number on success, or negative on failure. diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c index a959a7e07fa..6667581ad19 100644 --- a/test/core/iomgr/udp_server_test.c +++ b/test/core/iomgr/udp_server_test.c @@ -134,7 +134,7 @@ static void test_no_op_with_port_and_start(void) { grpc_exec_ctx_finish(&exec_ctx); /* The server had a single FD, which should have been orphaned. */ - GPR_ASSERT(g_number_of_orphan_calls == 1); + GPR_ASSERT(g_number_of_orphan_calls == 2); } static void test_receive(int number_of_clients) { @@ -199,7 +199,7 @@ static void test_receive(int number_of_clients) { grpc_exec_ctx_finish(&exec_ctx); /* The server had a single FD, which should have been orphaned. */ - GPR_ASSERT(g_number_of_orphan_calls == 1); + GPR_ASSERT(g_number_of_orphan_calls == 2); } static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,