diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 68ab9355ca3..5c82b258fb0 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -47,6 +47,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -71,14 +72,22 @@ struct grpc_udp_listener { grpc_udp_server_read_cb read_cb; grpc_udp_server_write_cb write_cb; grpc_udp_server_orphan_cb orphan_cb; + // To be scheduled on another thread to actually read/write. + grpc_closure do_read_closure; + grpc_closure do_write_closure; + grpc_closure notify_on_write_closure; // True if orphan_cb is trigered. bool orphan_notified; + // True if grpc_fd_notify_on_write() is called after on_write() call. + bool notify_on_write_armed; + // True if fd has been shutdown. + bool already_shutdown; struct grpc_udp_listener* next; }; struct shutdown_fd_args { - grpc_fd* fd; + grpc_udp_listener* sp; gpr_mu* server_mu; }; @@ -144,8 +153,17 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { static void shutdown_fd(grpc_exec_ctx* exec_ctx, void* args, grpc_error* error) { struct shutdown_fd_args* shutdown_args = (struct shutdown_fd_args*)args; + grpc_udp_listener* sp = shutdown_args->sp; + gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd); gpr_mu_lock(shutdown_args->server_mu); - grpc_fd_shutdown(exec_ctx, shutdown_args->fd, GRPC_ERROR_REF(error)); + grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_REF(error)); + sp->already_shutdown = true; + if (!sp->notify_on_write_armed) { + // Re-arm write notification to notify listener with error. This is + // necessary to decrement active_ports. + sp->notify_on_write_armed = true; + grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + } gpr_mu_unlock(shutdown_args->server_mu); gpr_free(shutdown_args); } @@ -161,6 +179,7 @@ static void finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) { gpr_mu_destroy(&s->mu); + gpr_log(GPR_DEBUG, "Destroy all listeners."); while (s->head) { grpc_udp_listener* sp = s->head; s->head = sp->next; @@ -207,9 +226,10 @@ static void deactivated_all_ports(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) { /* Call the orphan_cb to signal that the FD is about to be closed and * should no longer be used. Because at this point, all listening ports * have been shutdown already, no need to shutdown again.*/ - GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp->emfd, + GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp, grpc_schedule_on_exec_ctx); GPR_ASSERT(sp->orphan_cb); + gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd); sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); } @@ -233,13 +253,14 @@ void grpc_udp_server_destroy(grpc_exec_ctx* exec_ctx, grpc_udp_server* s, s->shutdown_complete = on_done; + gpr_log(GPR_DEBUG, "start to destroy udp_server"); /* shutdown all fd's */ if (s->active_ports) { for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); struct shutdown_fd_args* args = (struct shutdown_fd_args*)gpr_malloc(sizeof(*args)); - args->fd = sp->emfd; + args->sp = sp; args->server_mu = &s->mu; GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args, grpc_schedule_on_exec_ctx); @@ -329,6 +350,28 @@ error: return -1; } +static void do_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_udp_listener* sp = reinterpret_cast(arg); + GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE); + // TODO(danzh): the reason we hold server->mu here is merely to prevent fd + // shutdown while we are reading. However, it blocks do_write(). Switch to + // read lock if available. + gpr_mu_lock(&sp->server->mu); + /* Tell the registered callback that data is available to read. */ + if (!sp->already_shutdown && + sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) { + // There maybe more packets to read. Schedule + // read_more_cb_ closure to run after finishing this event loop. + GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE); + } else { + // Finish reading all the packets, re-arm the notification event so we can + // get another chance to read. + // Or fd already shutdown, re-arm to get a notification with shutdown error. + grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + } + gpr_mu_unlock(&sp->server->mu); +} + /* event manager callback when reads are ready */ static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { grpc_udp_listener* sp = (grpc_udp_listener*)arg; @@ -344,12 +387,40 @@ static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { return; } - /* Tell the registered callback that data is available to read. */ - GPR_ASSERT(sp->read_cb); - sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data); + // Schedule actual read in another thread. + GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg, + grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); + GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE); + gpr_mu_unlock(&sp->server->mu); +} + +// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface. +void fd_notify_on_write_wrapper(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_udp_listener* sp = reinterpret_cast(arg); + gpr_mu_lock(&sp->server->mu); + if (!sp->notify_on_write_armed) { + grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + sp->notify_on_write_armed = true; + } + gpr_mu_unlock(&sp->server->mu); +} - /* Re-arm the notification event so we get another chance to read. */ - grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); +static void do_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_udp_listener* sp = reinterpret_cast(arg); + gpr_mu_lock(&(sp->server->mu)); + if (sp->already_shutdown) { + // If fd has been shutdown, don't write any more and re-arm notification. + grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + } else { + sp->notify_on_write_armed = false; + /* Tell the registered callback that the socket is writeable. */ + GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE); + GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper, + arg, grpc_schedule_on_exec_ctx); + sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data, + &sp->notify_on_write_closure); + } gpr_mu_unlock(&sp->server->mu); } @@ -367,12 +438,11 @@ static void on_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { return; } - /* Tell the registered callback that the socket is writeable. */ - GPR_ASSERT(sp->write_cb); - sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data); + // Schedule actual write in another thread. + GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg, + grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); - /* Re-arm the notification event so we get another chance to write. */ - grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_write_closure, GRPC_ERROR_NONE); gpr_mu_unlock(&sp->server->mu); } @@ -409,6 +479,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, sp->write_cb = write_cb; sp->orphan_cb = orphan_cb; sp->orphan_notified = false; + sp->already_shutdown = false; GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); gpr_free(name); @@ -533,6 +604,7 @@ void grpc_udp_server_start(grpc_exec_ctx* exec_ctx, grpc_udp_server* s, GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp, grpc_schedule_on_exec_ctx); + sp->notify_on_write_armed = true; grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); /* Registered for both read and write callbacks: increment active_ports diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index bca0f049fb3..cf9a9e57d87 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -34,13 +34,16 @@ struct grpc_server; /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; -/* Called when data is available to read from the socket. */ -typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, +/* Called when data is available to read from the socket. + * Return true if there is more data to read from fd. */ +typedef bool (*grpc_udp_server_read_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data); -/* Called when the socket is writeable. */ +/* Called when the socket is writeable. The given closure should be scheduled + * when the socket becomes blocked next time. */ typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, - void* user_data); + void* user_data, + grpc_closure* notify_on_write_closure); /* Called when the grpc_fd is about to be orphaned (and the FD closed). */ typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx* exec_ctx, diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc index 803f017106e..12c9219c30f 100644 --- a/test/core/iomgr/udp_server_test.cc +++ b/test/core/iomgr/udp_server_test.cc @@ -50,7 +50,7 @@ static int g_number_of_writes = 0; static int g_number_of_bytes_read = 0; static int g_number_of_orphan_calls = 0; -static void on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) { +static bool on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) { char read_buffer[512]; ssize_t byte_count; @@ -64,9 +64,11 @@ static void on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) { GPR_ASSERT(GRPC_LOG_IF_ERROR( "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); gpr_mu_unlock(g_mu); + return false; } -static void on_write(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) { +static void on_write(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data, + grpc_closure* notify_on_write_closure) { gpr_mu_lock(g_mu); g_number_of_writes++; @@ -79,6 +81,7 @@ static void on_fd_orphaned(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, grpc_closure* closure, void* user_data) { gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d", grpc_fd_wrapped_fd(emfd)); + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); g_number_of_orphan_calls++; } @@ -226,7 +229,6 @@ static void test_receive(int number_of_clients) { int clifd, svrfd; grpc_udp_server* s = grpc_udp_server_create(nullptr); int i; - int number_of_reads_before; grpc_millis deadline; grpc_pollset* pollsets[1]; LOG_TEST("test_receive"); @@ -256,7 +258,7 @@ static void test_receive(int number_of_clients) { deadline = grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10)); - number_of_reads_before = g_number_of_reads; + int number_of_bytes_read_before = g_number_of_bytes_read; /* Create a socket, send a packet to the UDP server. */ clifd = socket(addr->ss_family, SOCK_DGRAM, 0); GPR_ASSERT(clifd >= 0); @@ -273,7 +275,6 @@ static void test_receive(int number_of_clients) { grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); } - GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1); close(clifd); } GPR_ASSERT(g_number_of_bytes_read == 5 * number_of_clients); @@ -286,9 +287,6 @@ static void test_receive(int number_of_clients) { /* The server had a single FD, which is orphaned exactly once in * * grpc_udp_server_destroy. */ GPR_ASSERT(g_number_of_orphan_calls == 1); - - /* The write callback should have fired a few times. */ - GPR_ASSERT(g_number_of_writes > 0); } static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,