Modify udp_server to do asynchronous read and write and only re-arm

notification when needed.

This change prevents event loop thread from doing actual I/O work which
might take long time.
pull/13554/head
Dan Zhang 7 years ago
parent f836c7e941
commit 0f7bc57e11
  1. 100
      src/core/lib/iomgr/udp_server.cc
  2. 11
      src/core/lib/iomgr/udp_server.h
  3. 14
      test/core/iomgr/udp_server_test.cc

@ -47,6 +47,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@ -71,14 +72,22 @@ struct grpc_udp_listener {
grpc_udp_server_read_cb read_cb;
grpc_udp_server_write_cb write_cb;
grpc_udp_server_orphan_cb orphan_cb;
// To be scheduled on another thread to actually read/write.
grpc_closure do_read_closure;
grpc_closure do_write_closure;
grpc_closure notify_on_write_closure;
// True if orphan_cb is trigered.
bool orphan_notified;
// True if grpc_fd_notify_on_write() is called after on_write() call.
bool notify_on_write_armed;
// True if fd has been shutdown.
bool already_shutdown;
struct grpc_udp_listener* next;
};
struct shutdown_fd_args {
grpc_fd* fd;
grpc_udp_listener* sp;
gpr_mu* server_mu;
};
@ -144,8 +153,17 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) {
static void shutdown_fd(grpc_exec_ctx* exec_ctx, void* args,
grpc_error* error) {
struct shutdown_fd_args* shutdown_args = (struct shutdown_fd_args*)args;
grpc_udp_listener* sp = shutdown_args->sp;
gpr_log(GPR_DEBUG, "shutdown fd %d", sp->fd);
gpr_mu_lock(shutdown_args->server_mu);
grpc_fd_shutdown(exec_ctx, shutdown_args->fd, GRPC_ERROR_REF(error));
grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_REF(error));
sp->already_shutdown = true;
if (!sp->notify_on_write_armed) {
// Re-arm write notification to notify listener with error. This is
// necessary to decrement active_ports.
sp->notify_on_write_armed = true;
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
}
gpr_mu_unlock(shutdown_args->server_mu);
gpr_free(shutdown_args);
}
@ -161,6 +179,7 @@ static void finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) {
gpr_mu_destroy(&s->mu);
gpr_log(GPR_DEBUG, "Destroy all listeners.");
while (s->head) {
grpc_udp_listener* sp = s->head;
s->head = sp->next;
@ -207,9 +226,10 @@ static void deactivated_all_ports(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) {
/* Call the orphan_cb to signal that the FD is about to be closed and
* should no longer be used. Because at this point, all listening ports
* have been shutdown already, no need to shutdown again.*/
GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp->emfd,
GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp,
grpc_schedule_on_exec_ctx);
GPR_ASSERT(sp->orphan_cb);
gpr_log(GPR_DEBUG, "Orphan fd %d", sp->fd);
sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
sp->server->user_data);
}
@ -233,13 +253,14 @@ void grpc_udp_server_destroy(grpc_exec_ctx* exec_ctx, grpc_udp_server* s,
s->shutdown_complete = on_done;
gpr_log(GPR_DEBUG, "start to destroy udp_server");
/* shutdown all fd's */
if (s->active_ports) {
for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb);
struct shutdown_fd_args* args =
(struct shutdown_fd_args*)gpr_malloc(sizeof(*args));
args->fd = sp->emfd;
args->sp = sp;
args->server_mu = &s->mu;
GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args,
grpc_schedule_on_exec_ctx);
@ -329,6 +350,28 @@ error:
return -1;
}
static void do_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
GPR_ASSERT(sp->read_cb && error == GRPC_ERROR_NONE);
// TODO(danzh): the reason we hold server->mu here is merely to prevent fd
// shutdown while we are reading. However, it blocks do_write(). Switch to
// read lock if available.
gpr_mu_lock(&sp->server->mu);
/* Tell the registered callback that data is available to read. */
if (!sp->already_shutdown &&
sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data)) {
// There maybe more packets to read. Schedule
// read_more_cb_ closure to run after finishing this event loop.
GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE);
} else {
// Finish reading all the packets, re-arm the notification event so we can
// get another chance to read.
// Or fd already shutdown, re-arm to get a notification with shutdown error.
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
}
gpr_mu_unlock(&sp->server->mu);
}
/* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
grpc_udp_listener* sp = (grpc_udp_listener*)arg;
@ -344,12 +387,40 @@ static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
return;
}
/* Tell the registered callback that data is available to read. */
GPR_ASSERT(sp->read_cb);
sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data);
// Schedule actual read in another thread.
GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg,
grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_read_closure, GRPC_ERROR_NONE);
gpr_mu_unlock(&sp->server->mu);
}
// Wrapper of grpc_fd_notify_on_write() with a grpc_closure callback interface.
void fd_notify_on_write_wrapper(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
gpr_mu_lock(&sp->server->mu);
if (!sp->notify_on_write_armed) {
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
sp->notify_on_write_armed = true;
}
gpr_mu_unlock(&sp->server->mu);
}
/* Re-arm the notification event so we get another chance to read. */
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
static void do_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
gpr_mu_lock(&(sp->server->mu));
if (sp->already_shutdown) {
// If fd has been shutdown, don't write any more and re-arm notification.
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
} else {
sp->notify_on_write_armed = false;
/* Tell the registered callback that the socket is writeable. */
GPR_ASSERT(sp->write_cb && error == GRPC_ERROR_NONE);
GRPC_CLOSURE_INIT(&sp->notify_on_write_closure, fd_notify_on_write_wrapper,
arg, grpc_schedule_on_exec_ctx);
sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data,
&sp->notify_on_write_closure);
}
gpr_mu_unlock(&sp->server->mu);
}
@ -367,12 +438,11 @@ static void on_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
return;
}
/* Tell the registered callback that the socket is writeable. */
GPR_ASSERT(sp->write_cb);
sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data);
// Schedule actual write in another thread.
GRPC_CLOSURE_INIT(&sp->do_write_closure, do_write, arg,
grpc_executor_scheduler(GRPC_EXECUTOR_LONG));
/* Re-arm the notification event so we get another chance to write. */
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
GRPC_CLOSURE_SCHED(exec_ctx, &sp->do_write_closure, GRPC_ERROR_NONE);
gpr_mu_unlock(&sp->server->mu);
}
@ -409,6 +479,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
sp->write_cb = write_cb;
sp->orphan_cb = orphan_cb;
sp->orphan_notified = false;
sp->already_shutdown = false;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(name);
@ -533,6 +604,7 @@ void grpc_udp_server_start(grpc_exec_ctx* exec_ctx, grpc_udp_server* s,
GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp,
grpc_schedule_on_exec_ctx);
sp->notify_on_write_armed = true;
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
/* Registered for both read and write callbacks: increment active_ports

@ -34,13 +34,16 @@ struct grpc_server;
/* Forward decl of grpc_udp_server */
typedef struct grpc_udp_server grpc_udp_server;
/* Called when data is available to read from the socket. */
typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd,
/* Called when data is available to read from the socket.
* Return true if there is more data to read from fd. */
typedef bool (*grpc_udp_server_read_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd,
void* user_data);
/* Called when the socket is writeable. */
/* Called when the socket is writeable. The given closure should be scheduled
* when the socket becomes blocked next time. */
typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx* exec_ctx, grpc_fd* emfd,
void* user_data);
void* user_data,
grpc_closure* notify_on_write_closure);
/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx* exec_ctx,

@ -50,7 +50,7 @@ static int g_number_of_writes = 0;
static int g_number_of_bytes_read = 0;
static int g_number_of_orphan_calls = 0;
static void on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) {
static bool on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) {
char read_buffer[512];
ssize_t byte_count;
@ -64,9 +64,11 @@ static void on_read(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) {
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
return false;
}
static void on_write(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data) {
static void on_write(grpc_exec_ctx* exec_ctx, grpc_fd* emfd, void* user_data,
grpc_closure* notify_on_write_closure) {
gpr_mu_lock(g_mu);
g_number_of_writes++;
@ -79,6 +81,7 @@ static void on_fd_orphaned(grpc_exec_ctx* exec_ctx, grpc_fd* emfd,
grpc_closure* closure, void* user_data) {
gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
grpc_fd_wrapped_fd(emfd));
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
g_number_of_orphan_calls++;
}
@ -226,7 +229,6 @@ static void test_receive(int number_of_clients) {
int clifd, svrfd;
grpc_udp_server* s = grpc_udp_server_create(nullptr);
int i;
int number_of_reads_before;
grpc_millis deadline;
grpc_pollset* pollsets[1];
LOG_TEST("test_receive");
@ -256,7 +258,7 @@ static void test_receive(int number_of_clients) {
deadline =
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(10));
number_of_reads_before = g_number_of_reads;
int number_of_bytes_read_before = g_number_of_bytes_read;
/* Create a socket, send a packet to the UDP server. */
clifd = socket(addr->ss_family, SOCK_DGRAM, 0);
GPR_ASSERT(clifd >= 0);
@ -273,7 +275,6 @@ static void test_receive(int number_of_clients) {
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
}
GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1);
close(clifd);
}
GPR_ASSERT(g_number_of_bytes_read == 5 * number_of_clients);
@ -286,9 +287,6 @@ static void test_receive(int number_of_clients) {
/* The server had a single FD, which is orphaned exactly once in *
* grpc_udp_server_destroy. */
GPR_ASSERT(g_number_of_orphan_calls == 1);
/* The write callback should have fired a few times. */
GPR_ASSERT(g_number_of_writes > 0);
}
static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,

Loading…
Cancel
Save