Add grpc_server argument to grpc_udp_server_start

pull/3488/head
Robbie Shade 9 years ago
parent 2d73f05d33
commit 147fe701a0
  1. 8
      src/core/iomgr/udp_server.c
  2. 10
      src/core/iomgr/udp_server.h
  3. 8
      test/core/iomgr/udp_server_test.c

@ -117,6 +117,8 @@ struct grpc_udp_server {
grpc_pollset **pollsets; grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */ /* number of pollsets in the pollsets array */
size_t pollset_count; size_t pollset_count;
/* The parent grpc server */
grpc_server* grpc_server;
}; };
grpc_udp_server *grpc_udp_server_create(void) { grpc_udp_server *grpc_udp_server_create(void) {
@ -276,7 +278,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
/* Tell the registered callback that data is available to read. */ /* Tell the registered callback that data is available to read. */
GPR_ASSERT(sp->read_cb); GPR_ASSERT(sp->read_cb);
sp->read_cb(sp->fd); sp->read_cb(sp->fd, sp->server->grpc_server);
/* Re-arm the notification event so we get another chance to read. */ /* Re-arm the notification event so we get another chance to read. */
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
@ -402,11 +404,13 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) {
} }
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
grpc_pollset **pollsets, size_t pollset_count) { grpc_pollset **pollsets, size_t pollset_count,
grpc_server *grpc_server) {
size_t i, j; size_t i, j;
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu);
GPR_ASSERT(s->active_ports == 0); GPR_ASSERT(s->active_ports == 0);
s->pollsets = pollsets; s->pollsets = pollsets;
s->grpc_server = grpc_server;
for (i = 0; i < s->nports; i++) { for (i = 0; i < s->nports; i++) {
for (j = 0; j < pollset_count; j++) { for (j = 0; j < pollset_count; j++) {
grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd); grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd);

@ -36,18 +36,22 @@
#include "src/core/iomgr/endpoint.h" #include "src/core/iomgr/endpoint.h"
/* Forward decl of grpc_server */
typedef struct grpc_server grpc_server;
/* Forward decl of grpc_udp_server */ /* Forward decl of grpc_udp_server */
typedef struct grpc_udp_server grpc_udp_server; typedef struct grpc_udp_server grpc_udp_server;
/* Called when data is available to read from the socket. */ /* Called when data is available to read from the socket. */
typedef void (*grpc_udp_server_read_cb)(int fd); typedef void (*grpc_udp_server_read_cb)(int fd, grpc_server* server);
/* Create a server, initially not bound to any ports */ /* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(void); grpc_udp_server *grpc_udp_server_create(void);
/* Start listening to bound ports */ /* Start listening to bound ports */
void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server, void grpc_udp_server_start(grpc_exec_ctx *exec_ctx grpc_udp_server *udp_server,
grpc_pollset **pollsets, size_t pollset_count); grpc_pollset **pollsets, size_t pollset_count,
grpc_server *grpc_server);
int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);

@ -49,7 +49,7 @@ static grpc_pollset g_pollset;
static int g_number_of_reads = 0; static int g_number_of_reads = 0;
static int g_number_of_bytes_read = 0; static int g_number_of_bytes_read = 0;
static void on_read(int fd) { static void on_read(int fd, grpc_server *grpc_server) {
char read_buffer[512]; char read_buffer[512];
ssize_t byte_count; ssize_t byte_count;
@ -74,7 +74,7 @@ static void test_no_op_with_start(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_udp_server *s = grpc_udp_server_create(); grpc_udp_server *s = grpc_udp_server_create();
LOG_TEST("test_no_op_with_start"); LOG_TEST("test_no_op_with_start");
grpc_udp_server_start(&exec_ctx, s, NULL, 0); grpc_udp_server_start(&exec_ctx, s, NULL, 0, NULL);
grpc_udp_server_destroy(&exec_ctx, s, NULL); grpc_udp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
@ -105,7 +105,7 @@ static void test_no_op_with_port_and_start(void) {
GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr), GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr),
on_read)); on_read));
grpc_udp_server_start(&exec_ctx, s, NULL, 0); grpc_udp_server_start(&exec_ctx, s, NULL, 0, NULL);
grpc_udp_server_destroy(&exec_ctx, s, NULL); grpc_udp_server_destroy(&exec_ctx, s, NULL);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
@ -137,7 +137,7 @@ static void test_receive(int number_of_clients) {
GPR_ASSERT(addr_len <= sizeof(addr)); GPR_ASSERT(addr_len <= sizeof(addr));
pollsets[0] = &g_pollset; pollsets[0] = &g_pollset;
grpc_udp_server_start(&exec_ctx, s, pollsets, 1); grpc_udp_server_start(&exec_ctx, s, pollsets, 1, NULL);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));

Loading…
Cancel
Save