From 5c708f4aeee99261ed7851304f4bc74c46a91831 Mon Sep 17 00:00:00 2001 From: Dan Zhang Date: Thu, 30 Mar 2017 17:44:26 -0400 Subject: [PATCH 1/2] modify udp_server --- src/core/lib/iomgr/udp_server.c | 38 +++++++++++++++++++++++-------- src/core/lib/iomgr/udp_server.h | 4 +++- test/core/iomgr/udp_server_test.c | 14 ++++++------ 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 60579e18ba5..ca283d034f9 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -79,10 +79,15 @@ struct grpc_udp_listener { grpc_resolved_address addr; grpc_closure read_closure; grpc_closure write_closure; + // To be called when corresponding QuicGrpcServer closes all active + // connections. + grpc_closure orphan_fd_closure; grpc_closure destroyed_closure; grpc_udp_server_read_cb read_cb; grpc_udp_server_write_cb write_cb; grpc_udp_server_orphan_cb orphan_cb; + // True if orphan_cb is trigered. + bool orphan_notified; struct grpc_udp_listener *next; }; @@ -146,6 +151,14 @@ grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) { return s; } +static void shutdown_fd(grpc_exec_ctx *exec_ctx, void *fd, grpc_error *error) { + grpc_fd_shutdown(exec_ctx, (grpc_fd *)fd, GRPC_ERROR_REF(error)); +} + +static void dummy_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + // No-op. +} + static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { if (s->shutdown_complete != NULL) { grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); @@ -195,12 +208,16 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { grpc_closure_init(&sp->destroyed_closure, destroyed_port, s, grpc_schedule_on_exec_ctx); - - /* Call the orphan_cb to signal that the FD is about to be closed and - * should no longer be used. */ - GPR_ASSERT(sp->orphan_cb); - sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data); - + if (!sp->orphan_notified) { + /* Call the orphan_cb to signal that the FD is about to be closed and + * should no longer be used. Because at this point, all listening ports + * have been shutdown already, no need to shutdown again.*/ + grpc_closure_init(&sp->orphan_fd_closure, dummy_cb, sp->emfd, + grpc_schedule_on_exec_ctx); + GPR_ASSERT(sp->orphan_cb); + sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure, + sp->server->user_data); + } grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, "udp_listener_shutdown"); } @@ -225,9 +242,11 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, if (s->active_ports) { for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); - sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data); - grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Server destroyed")); + grpc_closure_init(&sp->orphan_fd_closure, shutdown_fd, sp->emfd, + grpc_schedule_on_exec_ctx); + sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure, + sp->server->user_data); + sp->orphan_notified = true; } gpr_mu_unlock(&s->mu); } else { @@ -391,6 +410,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, sp->read_cb = read_cb; sp->write_cb = write_cb; sp->orphan_cb = orphan_cb; + sp->orphan_notified = false; GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); gpr_free(name); diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 9df3fe4d1f0..2a72638709b 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -55,7 +55,9 @@ typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, /* Called when the grpc_fd is about to be orphaned (and the FD closed). */ typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx *exec_ctx, - grpc_fd *emfd, void *user_data); + grpc_fd *emfd, + grpc_closure* shutdown_fd_callback, + void *user_data); /* Create a server, initially not bound to any ports */ grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args); diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c index 12d84063239..9cc0ea94983 100644 --- a/test/core/iomgr/udp_server_test.c +++ b/test/core/iomgr/udp_server_test.c @@ -91,7 +91,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, void *user_data) { } static void on_fd_orphaned(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, - void *user_data) { + grpc_closure* closure, void *user_data) { gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d", grpc_fd_wrapped_fd(emfd)); g_number_of_orphan_calls++; @@ -228,9 +228,9 @@ static void test_no_op_with_port_and_start(void) { grpc_udp_server_destroy(&exec_ctx, s, NULL); grpc_exec_ctx_finish(&exec_ctx); - /* The server had a single FD, which is orphaned once in * - * deactivated_all_ports, and once in grpc_udp_server_destroy. */ - GPR_ASSERT(g_number_of_orphan_calls == 2); + /* The server had a single FD, which is orphaned exactly once in * + * grpc_udp_server_destroy. */ + GPR_ASSERT(g_number_of_orphan_calls == 1); } static void test_receive(int number_of_clients) { @@ -297,9 +297,9 @@ static void test_receive(int number_of_clients) { grpc_udp_server_destroy(&exec_ctx, s, NULL); grpc_exec_ctx_finish(&exec_ctx); - /* The server had a single FD, which is orphaned once in * - * deactivated_all_ports, and once in grpc_udp_server_destroy. */ - GPR_ASSERT(g_number_of_orphan_calls == 2); + /* The server had a single FD, which is orphaned exactly once in * + * grpc_udp_server_destroy. */ + GPR_ASSERT(g_number_of_orphan_calls == 1); /* The write callback should have fired a few times. */ GPR_ASSERT(g_number_of_writes > 0); From 2d38b05671ca0c6006c775ccb8cbb945118a794c Mon Sep 17 00:00:00 2001 From: Dan Zhang Date: Thu, 6 Apr 2017 13:09:34 -0400 Subject: [PATCH 2/2] clang format --- src/core/lib/iomgr/udp_server.h | 2 +- test/core/iomgr/udp_server_test.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 2a72638709b..8006037644d 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -56,7 +56,7 @@ typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, /* Called when the grpc_fd is about to be orphaned (and the FD closed). */ typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, - grpc_closure* shutdown_fd_callback, + grpc_closure *shutdown_fd_callback, void *user_data); /* Create a server, initially not bound to any ports */ diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c index 9cc0ea94983..1f1696a7a7a 100644 --- a/test/core/iomgr/udp_server_test.c +++ b/test/core/iomgr/udp_server_test.c @@ -91,7 +91,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, void *user_data) { } static void on_fd_orphaned(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, - grpc_closure* closure, void *user_data) { + grpc_closure *closure, void *user_data) { gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d", grpc_fd_wrapped_fd(emfd)); g_number_of_orphan_calls++;