Merge pull request #9558 from rjshade/add_on_write_to_udp_server

Add an on_write callback to the UDP server.
pull/9589/head
Robbie Shade 8 years ago committed by GitHub
commit 1d0eb4c89a
  1. 38
      src/core/lib/iomgr/udp_server.c
  2. 5
      src/core/lib/iomgr/udp_server.h
  3. 25
      test/core/iomgr/udp_server_test.c

@ -76,8 +76,10 @@ struct grpc_udp_listener {
grpc_udp_server *server;
grpc_resolved_address addr;
grpc_closure read_closure;
grpc_closure write_closure;
grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb;
grpc_udp_server_write_cb write_cb;
grpc_udp_server_orphan_cb orphan_cb;
struct grpc_udp_listener *next;
@ -304,9 +306,33 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
gpr_mu_unlock(&sp->server->mu);
}
static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_udp_listener *sp = arg;
gpr_mu_lock(&(sp->server->mu));
if (error != GRPC_ERROR_NONE) {
if (0 == --sp->server->active_ports) {
gpr_mu_unlock(&sp->server->mu);
deactivated_all_ports(exec_ctx, sp->server);
} else {
gpr_mu_unlock(&sp->server->mu);
}
return;
}
/* Tell the registered callback that the socket is writeable. */
GPR_ASSERT(sp->write_cb);
sp->write_cb(exec_ctx, sp->emfd);
/* Re-arm the notification event so we get another chance to write. */
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
gpr_mu_unlock(&sp->server->mu);
}
static int add_socket_to_server(grpc_udp_server *s, int fd,
const grpc_resolved_address *addr,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
grpc_udp_listener *sp;
int port;
@ -333,6 +359,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
sp->emfd = grpc_fd_create(fd, name);
memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
sp->read_cb = read_cb;
sp->write_cb = write_cb;
sp->orphan_cb = orphan_cb;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
@ -345,6 +372,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
int grpc_udp_server_add_port(grpc_udp_server *s,
const grpc_resolved_address *addr,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
grpc_udp_listener *sp;
int allocated_port1 = -1;
@ -391,7 +419,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
// TODO(rjshade): Test and propagate the returned grpc_error*:
GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP,
&dsmode, &fd));
allocated_port1 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb);
allocated_port1 =
add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
@ -413,7 +442,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
addr = &addr4_copy;
}
allocated_port2 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb);
allocated_port2 =
add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb);
done:
gpr_free(allocated_addr);
@ -451,6 +481,10 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
grpc_closure_init(&sp->write_closure, on_write, sp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
s->active_ports++;
sp = sp->next;
}

@ -49,6 +49,10 @@ typedef struct grpc_udp_server grpc_udp_server;
typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
struct grpc_server *server);
/* Called when the socket is writeable. */
typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx,
grpc_fd *emfd);
/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
typedef void (*grpc_udp_server_orphan_cb)(grpc_fd *emfd);
@ -75,6 +79,7 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index);
int grpc_udp_server_add_port(grpc_udp_server *s,
const grpc_resolved_address *addr,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb);
void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server,

@ -58,6 +58,7 @@
static grpc_pollset *g_pollset;
static gpr_mu *g_mu;
static int g_number_of_reads = 0;
static int g_number_of_writes = 0;
static int g_number_of_bytes_read = 0;
static int g_number_of_orphan_calls = 0;
@ -78,6 +79,15 @@ static void on_read(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
gpr_mu_unlock(g_mu);
}
static void on_write(grpc_exec_ctx *exec_ctx, grpc_fd *emfd) {
gpr_mu_lock(g_mu);
g_number_of_writes++;
GPR_ASSERT(
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
gpr_mu_unlock(g_mu);
}
static void on_fd_orphaned(grpc_fd *emfd) {
gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
grpc_fd_wrapped_fd(emfd));
@ -111,8 +121,8 @@ static void test_no_op_with_port(void) {
memset(&resolved_addr, 0, sizeof(resolved_addr));
resolved_addr.len = sizeof(struct sockaddr_in);
addr->sin_family = AF_INET;
GPR_ASSERT(
grpc_udp_server_add_port(s, &resolved_addr, on_read, on_fd_orphaned));
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write,
on_fd_orphaned));
grpc_udp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx);
@ -132,8 +142,8 @@ static void test_no_op_with_port_and_start(void) {
memset(&resolved_addr, 0, sizeof(resolved_addr));
resolved_addr.len = sizeof(struct sockaddr_in);
addr->sin_family = AF_INET;
GPR_ASSERT(
grpc_udp_server_add_port(s, &resolved_addr, on_read, on_fd_orphaned));
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write,
on_fd_orphaned));
grpc_udp_server_start(&exec_ctx, s, NULL, 0, NULL);
@ -164,8 +174,8 @@ static void test_receive(int number_of_clients) {
memset(&resolved_addr, 0, sizeof(resolved_addr));
resolved_addr.len = sizeof(struct sockaddr_storage);
addr->ss_family = AF_INET;
GPR_ASSERT(
grpc_udp_server_add_port(s, &resolved_addr, on_read, on_fd_orphaned));
GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write,
on_fd_orphaned));
svrfd = grpc_udp_server_get_fd(s, 0);
GPR_ASSERT(svrfd >= 0);
@ -212,6 +222,9 @@ static void test_receive(int number_of_clients) {
/* The server had a single FD, which is orphaned once in *
* deactivated_all_ports, and once in grpc_udp_server_destroy. */
GPR_ASSERT(g_number_of_orphan_calls == 2);
/* The write callback should have fired a few times. */
GPR_ASSERT(g_number_of_writes > 0);
}
static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,

Loading…
Cancel
Save