From 76cfc6ac97cd542f331aff60aaa273fccdaed815 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 7 Apr 2016 18:32:44 -0700 Subject: [PATCH 01/11] Some comments --- src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index 3c8127e1a8c..306d312dc4e 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -756,9 +756,14 @@ static void pollset_kick_ext(grpc_pollset *p, specific_worker = pop_front_worker(p); if (specific_worker != NULL) { if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { + /* Prefer not to kick self. Push the worker to the end of the list and + * pop the one from front */ GPR_TIMER_MARK("kick_anonymous_not_self", 0); push_back_worker(p, specific_worker); specific_worker = pop_front_worker(p); + /* If there was only one worker on the pollset, we would get the same + * worker we pushed (the one set on current thread local) back. If so, + * kick it only if GRPC_POLLSET_CAN_KICK_SELF flag is set */ if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 && gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { From 42b004a2a5f785094f6c9bccaf4090e2c7c6e9b5 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Fri, 8 Apr 2016 14:41:49 -0700 Subject: [PATCH 02/11] first cut of changes --- src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 130 ++++++++++++++++--- src/core/lib/iomgr/ev_posix.c | 4 + src/core/lib/iomgr/ev_posix.h | 6 + src/core/lib/iomgr/tcp_server_posix.c | 29 ++++- src/core/lib/surface/server.c | 5 +- 5 files changed, 151 insertions(+), 23 deletions(-) diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index 306d312dc4e..77a67d20078 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -126,6 +126,9 @@ struct grpc_fd { grpc_closure *on_done_closure; grpc_iomgr_object iomgr_object; + + /* The pollset that last noticed and notified that the fd is readable */ + grpc_pollset *read_notifier_pollset; }; /* Begin polling on an fd. @@ -147,7 +150,8 @@ static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, if got_read or got_write are 1, also does the become_{readable,writable} as appropriate. */ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec, - int got_read, int got_write); + int got_read, int got_write, + grpc_pollset *read_notifier_pollset); /* Return 1 if this fd is orphaned, 0 otherwise */ static bool fd_is_orphaned(grpc_fd *fd); @@ -342,6 +346,7 @@ static grpc_fd *alloc_fd(int fd) { r->on_done_closure = NULL; r->closed = 0; r->released = 0; + r->read_notifier_pollset = NULL; gpr_mu_unlock(&r->mu); return r; } @@ -511,9 +516,17 @@ static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { if (*st == CLOSURE_NOT_READY) { + /* TODO (sreek): Remove following log line */ + gpr_log(GPR_INFO, "\t>> notify_on_locked: (fd:%d) CLOSURE_NOT_READY -> %p", + fd->fd, closure); /* not ready ==> switch to a waiting state by setting the closure */ *st = closure; } else if (*st == CLOSURE_READY) { + /* TODO (sreek): Remove following log line */ + gpr_log(GPR_INFO, + "\t>> notify_on_locked: (fd:%d) CLOSURE_READY -> CLOSURE_NOT_READY " + "(enqueue: %p)", + fd->fd, closure); /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL); @@ -532,19 +545,41 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) { if (*st == CLOSURE_READY) { /* duplicate ready ==> ignore */ + /* TODO (sreek): Remove following log line */ + gpr_log(GPR_INFO, + "\t>> set_ready_locked: (fd:%d) CLOSURE_READY -> CLOSURE_READY (no " + "change)", + fd->fd); return 0; } else if (*st == CLOSURE_NOT_READY) { /* not ready, and not waiting ==> flag ready */ + /* TODO (sreek): Remove following log line */ + gpr_log(GPR_INFO, + "\t>> set_ready_locked: (fd:%d) CLOSURE_NOT_READY -> CLOSURE_READY", + fd->fd); *st = CLOSURE_READY; return 0; } else { /* waiting ==> queue closure */ + /* TODO (sreek): Remove following log line */ + gpr_log(GPR_INFO, + "\t>> set_ready_locked: (fd:%d) Enqueue %p -> CLOSURE_NOT_READY", + fd->fd, *st); grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL); *st = CLOSURE_NOT_READY; return 1; } } +static void set_read_notifier_pollset_locked( + grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) { + /* TODO(sreek): Remove the following log line */ + gpr_log(GPR_INFO, "\t>> Set read notifier (fd:%d): %p --> %p", fd->fd, + fd->read_notifier_pollset, read_notifier_pollset); + + fd->read_notifier_pollset = read_notifier_pollset; +} + static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_lock(&fd->mu); GPR_ASSERT(!fd->shutdown); @@ -568,6 +603,18 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_mu_unlock(&fd->mu); } +/* Return the read-notifier pollset */ +static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, + grpc_fd *fd) { + grpc_pollset *notifier = NULL; + + gpr_mu_lock(&fd->mu); + notifier = fd->read_notifier_pollset; + gpr_mu_unlock(&fd->mu); + + return notifier; +} + static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, grpc_pollset_worker *worker, uint32_t read_mask, uint32_t write_mask, grpc_fd_watcher *watcher) { @@ -620,7 +667,8 @@ static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, } static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, - int got_read, int got_write) { + int got_read, int got_write, + grpc_pollset *read_notifier_pollset) { int was_polling = 0; int kick = 0; grpc_fd *fd = watcher->fd; @@ -653,11 +701,27 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, watcher->prev->next = watcher->next; } if (got_read) { + /*TODO(sreek): Delete this log line */ + gpr_log(GPR_INFO, + "\t>> fd_end_poll(): GOT READ Calling set_ready_locked. fd: %d, " + "fd->read_closure: %p, " + "notifier_pollset: %p", + fd->fd, fd->read_closure, read_notifier_pollset); + if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) { kick = 1; } + + if (read_notifier_pollset != NULL) { + set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset); + } } if (got_write) { + /*TODO(sreek): Delete this log line */ + gpr_log(GPR_INFO, + "\t>> fd_end_poll(): GOT WRITE set_ready_locked. fd: %d, " + "fd->write_closure: %p", + fd->fd, fd->write_closure); if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) { kick = 1; } @@ -1208,11 +1272,11 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); } if (fd) { - fd_end_poll(exec_ctx, &fd_watcher, 0, 0); + fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL); } } else if (r == 0) { if (fd) { - fd_end_poll(exec_ctx, &fd_watcher, 0, 0); + fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL); } } else { if (pfd[0].revents & POLLIN_CHECK) { @@ -1222,10 +1286,16 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); } if (nfds > 2) { + /* TODO(sreek): delete the following comment line */ + gpr_log( + GPR_INFO, + "\t>> basic_pollset_maybe_work_and_unlock(): fd->fd: %d, pollset: %p " + "is readable (calling fd_end_poll()) -------------------------------", + pfd[2].fd, pollset); fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, - pfd[2].revents & POLLOUT_CHECK); + pfd[2].revents & POLLOUT_CHECK, pollset); } else if (fd) { - fd_end_poll(exec_ctx, &fd_watcher, 0, 0); + fd_end_poll(exec_ctx, &fd_watcher, 0, 0, NULL); } } @@ -1361,11 +1431,11 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); } for (i = 2; i < pfd_count; i++) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0); + fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); } } else if (r == 0) { for (i = 2; i < pfd_count; i++) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0); + fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); } } else { if (pfds[0].revents & POLLIN_CHECK) { @@ -1376,11 +1446,16 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( } for (i = 2; i < pfd_count; i++) { if (watchers[i].fd == NULL) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0); + fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); continue; } + /*TODO(sree) - Delete this log line*/ + gpr_log(GPR_INFO, + "multipoll_with_poll_pollset(). fd: %d became redable. Pollset: " + "%p (calling fd_end_poll())*************", + pfds[i].fd, pollset); fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, - pfds[i].revents & POLLOUT_CHECK); + pfds[i].revents & POLLOUT_CHECK, pollset); } } @@ -1456,20 +1531,31 @@ static void poll_become_multipoller(grpc_exec_ctx *exec_ctx, #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" -static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) { +static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, + grpc_pollset *read_notifier_pollset) { /* only one set_ready can be active at once (but there may be a racing notify_on) */ gpr_mu_lock(&fd->mu); set_ready_locked(exec_ctx, fd, st); + + /* A non-NULL read_notifier_pollset means that the fd is readable. */ + if (read_notifier_pollset != NULL) { + /* Note: Since the fd might be a part of multiple pollsets, this might be + * called multiple times (for each time the fd becomes readable) and it is + * okay to set the fd's read-notifier pollset to anyone of these pollsets */ + set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset); + } + gpr_mu_unlock(&fd->mu); } -static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - set_ready(exec_ctx, fd, &fd->read_closure); +static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_pollset *notifier_pollset) { + set_ready(exec_ctx, fd, &fd->read_closure, notifier_pollset); } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - set_ready(exec_ctx, fd, &fd->write_closure); + set_ready(exec_ctx, fd, &fd->write_closure, NULL); } struct epoll_fd_list { @@ -1561,7 +1647,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } } - fd_end_poll(exec_ctx, &watcher, 0, 0); + fd_end_poll(exec_ctx, &watcher, 0, 0, NULL); } static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, @@ -1675,9 +1761,20 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); } else { if (read_ev || cancel) { - fd_become_readable(exec_ctx, fd); + /* TODO(sreek): Delete this once the issue #5470 is resolved */ + gpr_log( + GPR_INFO, + "\t>> multipoll_with_epoll_pollset: Calling " + "fd_become_readable(fd->fd: %d, pollset: %p) ++++++++++++", + fd->fd, pollset); + fd_become_readable(exec_ctx, fd, pollset); } if (write_ev || cancel) { + /* TODO(sreek): Delete the following log line */ + gpr_log(GPR_INFO, + "\t>> multipoll_with_epoll_pollset: Calling " + "fd_become_writable(fd: %d)", + fd->fd); fd_become_writable(exec_ctx, fd); } } @@ -1904,6 +2001,7 @@ static const grpc_event_engine_vtable vtable = { .fd_shutdown = fd_shutdown, .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, + .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 0eb95a2e091..af4126c900c 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -83,6 +83,10 @@ void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, g_event_engine->fd_notify_on_write(exec_ctx, fd, closure); } +grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { + return g_event_engine->fd_get_read_notifier_pollset(exec_ctx, fd); +} + size_t grpc_pollset_size(void) { return g_event_engine->pollset_size; } void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 1fa9f5ef2d6..4cfa83e6a23 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -55,6 +55,8 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); + grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx, + grpc_fd *fd); void (*pollset_init)(grpc_pollset *pollset, gpr_mu **mu); void (*pollset_shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -137,6 +139,10 @@ void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); +/* Return the read notifier pollset from the fd */ +grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, + grpc_fd *fd); + /* pollset_posix functions */ /* Add an fd to a pollset */ diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index cfb52516845..03318151ccf 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -310,13 +310,20 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { grpc_tcp_listener *sp = arg; grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, sp->fd_index}; + grpc_pollset *read_notifier_pollset = NULL; grpc_fd *fdobj; - size_t i; if (!success) { goto error; } + /* TODO(sreek): Delete the following log line */ + gpr_log(GPR_INFO, "\t\t** tcp_server_posix.on_read(): Getting read notifier"); + read_notifier_pollset = grpc_fd_get_read_notifier_pollset(exec_ctx, sp->emfd); + /* TODO(sreek): Delete the following log line */ + gpr_log(GPR_INFO, "\t\t** tcp_server_posix.on_read(): Got read notifier: %p", + read_notifier_pollset); + /* loop until accept4 returns EAGAIN, and then re-arm notification */ for (;;) { struct sockaddr_storage addr; @@ -349,12 +356,22 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { } fdobj = grpc_fd_create(fd, name); - /* TODO(ctiller): revise this when we have server-side sharding - of channels -- we certainly should not be automatically adding every - incoming channel to every pollset owned by the server */ - for (i = 0; i < sp->server->pollset_count; i++) { - grpc_pollset_add_fd(exec_ctx, sp->server->pollsets[i], fdobj); + + if (read_notifier_pollset == NULL) { + /* TODO(sreek): Check when this would happen - Ideally this should not + * happen. Remove the next log-line once this is resolved */ + gpr_log(GPR_INFO, "\t** *******!!! tcp_server_posix.on_read(): " + "read_notifier_pollset is NULL. !!!**********************"); + + gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd"); + goto error; } + + /* TODO(sreek): Delete the following log line */ + gpr_log(GPR_INFO, "\t\t** tcp_server_posix.on_read(): Adding fd %d *only* to pollset %p", + fd, read_notifier_pollset); + grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj); + sp->server->on_accept_cb( exec_ctx, sp->server->on_accept_cb_arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index ad8ee8c7a99..25b6886f241 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1018,7 +1018,6 @@ void grpc_server_start(grpc_server *server) { void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, grpc_transport *transport, const grpc_channel_args *args) { - size_t i; size_t num_registered_methods; size_t alloc; registered_method *rm; @@ -1033,11 +1032,15 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, uint32_t max_probes = 0; grpc_transport_op op; + /* TODO(sreek): Delete this commented block once issue #5470 is resolved */ + /* + size_t i; for (i = 0; i < s->cq_count; i++) { memset(&op, 0, sizeof(op)); op.bind_pollset = grpc_cq_pollset(s->cqs[i]); grpc_transport_perform_op(exec_ctx, transport, &op); } + */ channel = grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport); From 47ef37a9ad495a09ffe4820c9b46963796ff3370 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Mon, 11 Apr 2016 18:59:23 -0700 Subject: [PATCH 03/11] test cases --- test/core/iomgr/fd_posix_test.c | 98 +++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index f97f33712eb..18cd825df00 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -518,6 +518,103 @@ static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { grpc_pollset_destroy(p); } +typedef struct read_notifier_test_fd_context { + grpc_fd *fd; + bool is_cb_called; +} read_notifier_test_fd_context; + +static void read_notifier_test_callback( + grpc_exec_ctx *exec_ctx, void *arg /* (read_notifier_test_fd_context *) */, + bool success) { + read_notifier_test_fd_context *fd_context = arg; + grpc_fd *fd = fd_context->fd; + + /* Verify that the read notifier pollset is set */ + GPR_ASSERT(grpc_fd_get_read_notifier_pollset(exec_ctx, fd) != NULL); + fd_context->is_cb_called = true; +} + +/* sv MUST to be an array of size 2 */ +static void get_socket_pair(int sv[]) { + int flags = 0; + GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); + flags = fcntl(sv[0], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); + flags = fcntl(sv[1], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); +} + +static grpc_pollset *create_grpc_pollset(gpr_mu **mu) { + grpc_pollset *pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(pollset, mu); + return pollset; +} + +static void free_grpc_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { + grpc_closure destroyed; + grpc_closure_init(&destroyed, destroy_pollset, pollset); + grpc_pollset_shutdown(exec_ctx, pollset, &destroyed); + grpc_exec_ctx_finish(exec_ctx); + gpr_free(pollset); +} + +static void test_grpc_fd_read_notifier_pollset(void) { + grpc_fd *em_fd[2]; + read_notifier_test_fd_context fd_context; + int sv[2][2]; + char data; + ssize_t result; + int i; + grpc_closure on_read_closure; + gpr_mu *mu; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + grpc_pollset *pollset = create_grpc_pollset(&mu); + + for (i = 0; i < 2; i++) { + get_socket_pair(sv[i]); + + em_fd[i] = grpc_fd_create(sv[i][0], "test_grpc_fd_1_read_notifier_pollset"); + + grpc_pollset_add_fd(&exec_ctx, pollset, em_fd[i]); + + on_read_closure.cb = read_notifier_test_callback; + fd_context.fd = em_fd[i]; + fd_context.is_cb_called = false; + on_read_closure.cb_arg = &fd_context; + grpc_fd_notify_on_read(&exec_ctx, em_fd[i], &on_read_closure); + + data = 0; + result = write(sv[i][1], &data, sizeof(data)); + GPR_ASSERT(result == 1); + + gpr_mu_lock(mu); + while (!fd_context.is_cb_called) { + grpc_pollset_worker *worker = NULL; + grpc_pollset_work(&exec_ctx, pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_mu_unlock(mu); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(mu); + } + gpr_mu_unlock(mu); + + /* Drain the socket (Not really needed for the test) */ + result = read(sv[i][0], &data, 1); + GPR_ASSERT(result == 1); + } + + + for (i = 0; i < 2; i++) { + grpc_fd_orphan(&exec_ctx, em_fd[i], NULL, NULL, ""); + close(sv[i][1]); + } + + free_grpc_pollset(&exec_ctx, pollset); + grpc_exec_ctx_finish(&exec_ctx); +} + int main(int argc, char **argv) { grpc_closure destroyed; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -527,6 +624,7 @@ int main(int argc, char **argv) { grpc_pollset_init(g_pollset, &g_mu); test_grpc_fd(); test_grpc_fd_change(); + test_grpc_fd_read_notifier_pollset(); grpc_closure_init(&destroyed, destroy_pollset, g_pollset); grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); From 89bbc7817a7e68d9bbc6207af34614e1610c70e6 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Mon, 11 Apr 2016 23:10:13 -0700 Subject: [PATCH 04/11] Rewrite test case to handle more scenarios --- test/core/iomgr/fd_posix_test.c | 78 +++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 23 deletions(-) diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 18cd825df00..187720e1de2 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -554,64 +554,95 @@ static void free_grpc_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { grpc_closure destroyed; grpc_closure_init(&destroyed, destroy_pollset, pollset); grpc_pollset_shutdown(exec_ctx, pollset, &destroyed); - grpc_exec_ctx_finish(exec_ctx); + grpc_exec_ctx_flush(exec_ctx); gpr_free(pollset); } -static void test_grpc_fd_read_notifier_pollset(void) { +/* This tests that the read_notifier_pollset field of a grpc_fd is properly + set when the grpc_fd becomes readable + - This tests both basic and multi pollsets + - The parameter register_cb_after_read_event controls whether the on-read + callback registration (i.e the one done by grpc_fd_notify_on_read()) is + done either before or after the fd becomes readable + */ +static void test_grpc_fd_read_notifier_pollset( + bool register_cb_after_read_event) { grpc_fd *em_fd[2]; - read_notifier_test_fd_context fd_context; int sv[2][2]; + gpr_mu *mu[2]; + grpc_pollset *pollset[2]; char data; ssize_t result; int i; + grpc_pollset_worker *worker; + read_notifier_test_fd_context fd_context; grpc_closure on_read_closure; - gpr_mu *mu; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_pollset *pollset = create_grpc_pollset(&mu); - for (i = 0; i < 2; i++) { - get_socket_pair(sv[i]); + pollset[i] = create_grpc_pollset(&mu[i]); + get_socket_pair(sv[i]); /* sv[i][0] & sv[i][1] will have the socket pair */ + em_fd[i] = grpc_fd_create(sv[i][0], "test_grpc_fd_read_notifier_pollset"); + grpc_pollset_add_fd(&exec_ctx, pollset[i], em_fd[i]); + } - em_fd[i] = grpc_fd_create(sv[i][0], "test_grpc_fd_1_read_notifier_pollset"); + /* At this point pollset[0] has em_fd[0] and pollset[1] has em_fd[1] and both + are basic pollsets. Make pollset[1] a multi-pollset by adding em_fd[0] to + it */ + grpc_pollset_add_fd(&exec_ctx, pollset[1], em_fd[0]); + grpc_exec_ctx_flush(&exec_ctx); - grpc_pollset_add_fd(&exec_ctx, pollset, em_fd[i]); + /* The following tests that the read_notifier_pollset is correctly set on the + grpc_fd structure in both basic pollset and multi pollset cases. + pollset[0] is a basic pollset containing just em_fd[0] + pollset[1] is a multi pollset containing em_fd[0] and em_fd[1] */ + for (i = 0; i < 2; i++) { on_read_closure.cb = read_notifier_test_callback; fd_context.fd = em_fd[i]; fd_context.is_cb_called = false; on_read_closure.cb_arg = &fd_context; - grpc_fd_notify_on_read(&exec_ctx, em_fd[i], &on_read_closure); + + if (!register_cb_after_read_event) { + /* Registering the callback BEFORE the fd is readable */ + grpc_fd_notify_on_read(&exec_ctx, em_fd[i], &on_read_closure); + } data = 0; result = write(sv[i][1], &data, sizeof(data)); GPR_ASSERT(result == 1); - gpr_mu_lock(mu); - while (!fd_context.is_cb_called) { - grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(mu); - grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(mu); + /* grpc_pollset_work requires the caller to hold the pollset mutex */ + gpr_mu_lock(mu[i]); + worker = NULL; + grpc_pollset_work(&exec_ctx, pollset[i], &worker, + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_mu_unlock(mu[i]); + grpc_exec_ctx_flush(&exec_ctx); + + if (register_cb_after_read_event) { + /* Registering the callback after the fd is readable. In this case, the + callback should be executed right away. */ + grpc_fd_notify_on_read(&exec_ctx, em_fd[i], &on_read_closure); + grpc_exec_ctx_flush(&exec_ctx); } - gpr_mu_unlock(mu); + + /* The callback should have been called by now */ + GPR_ASSERT(fd_context.is_cb_called); /* Drain the socket (Not really needed for the test) */ result = read(sv[i][0], &data, 1); GPR_ASSERT(result == 1); } - + /* Clean up */ for (i = 0; i < 2; i++) { grpc_fd_orphan(&exec_ctx, em_fd[i], NULL, NULL, ""); close(sv[i][1]); + free_grpc_pollset(&exec_ctx, pollset[i]); } - free_grpc_pollset(&exec_ctx, pollset); grpc_exec_ctx_finish(&exec_ctx); } @@ -624,7 +655,8 @@ int main(int argc, char **argv) { grpc_pollset_init(g_pollset, &g_mu); test_grpc_fd(); test_grpc_fd_change(); - test_grpc_fd_read_notifier_pollset(); + test_grpc_fd_read_notifier_pollset(false); + test_grpc_fd_read_notifier_pollset(true); grpc_closure_init(&destroyed, destroy_pollset, g_pollset); grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); From fe115892d52b96946f3e661616468de059347e5c Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 12 Apr 2016 09:24:38 -0700 Subject: [PATCH 05/11] Delete debug log lines --- src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 59 -------------------- src/core/lib/iomgr/tcp_server_posix.c | 13 ----- src/core/lib/surface/server.c | 10 ---- 3 files changed, 82 deletions(-) diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index 77a67d20078..5800b372106 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -516,17 +516,9 @@ static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { if (*st == CLOSURE_NOT_READY) { - /* TODO (sreek): Remove following log line */ - gpr_log(GPR_INFO, "\t>> notify_on_locked: (fd:%d) CLOSURE_NOT_READY -> %p", - fd->fd, closure); /* not ready ==> switch to a waiting state by setting the closure */ *st = closure; } else if (*st == CLOSURE_READY) { - /* TODO (sreek): Remove following log line */ - gpr_log(GPR_INFO, - "\t>> notify_on_locked: (fd:%d) CLOSURE_READY -> CLOSURE_NOT_READY " - "(enqueue: %p)", - fd->fd, closure); /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL); @@ -545,26 +537,13 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st) { if (*st == CLOSURE_READY) { /* duplicate ready ==> ignore */ - /* TODO (sreek): Remove following log line */ - gpr_log(GPR_INFO, - "\t>> set_ready_locked: (fd:%d) CLOSURE_READY -> CLOSURE_READY (no " - "change)", - fd->fd); return 0; } else if (*st == CLOSURE_NOT_READY) { /* not ready, and not waiting ==> flag ready */ - /* TODO (sreek): Remove following log line */ - gpr_log(GPR_INFO, - "\t>> set_ready_locked: (fd:%d) CLOSURE_NOT_READY -> CLOSURE_READY", - fd->fd); *st = CLOSURE_READY; return 0; } else { /* waiting ==> queue closure */ - /* TODO (sreek): Remove following log line */ - gpr_log(GPR_INFO, - "\t>> set_ready_locked: (fd:%d) Enqueue %p -> CLOSURE_NOT_READY", - fd->fd, *st); grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL); *st = CLOSURE_NOT_READY; return 1; @@ -573,10 +552,6 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, static void set_read_notifier_pollset_locked( grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) { - /* TODO(sreek): Remove the following log line */ - gpr_log(GPR_INFO, "\t>> Set read notifier (fd:%d): %p --> %p", fd->fd, - fd->read_notifier_pollset, read_notifier_pollset); - fd->read_notifier_pollset = read_notifier_pollset; } @@ -701,13 +676,6 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, watcher->prev->next = watcher->next; } if (got_read) { - /*TODO(sreek): Delete this log line */ - gpr_log(GPR_INFO, - "\t>> fd_end_poll(): GOT READ Calling set_ready_locked. fd: %d, " - "fd->read_closure: %p, " - "notifier_pollset: %p", - fd->fd, fd->read_closure, read_notifier_pollset); - if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) { kick = 1; } @@ -717,11 +685,6 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, } } if (got_write) { - /*TODO(sreek): Delete this log line */ - gpr_log(GPR_INFO, - "\t>> fd_end_poll(): GOT WRITE set_ready_locked. fd: %d, " - "fd->write_closure: %p", - fd->fd, fd->write_closure); if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) { kick = 1; } @@ -1286,12 +1249,6 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); } if (nfds > 2) { - /* TODO(sreek): delete the following comment line */ - gpr_log( - GPR_INFO, - "\t>> basic_pollset_maybe_work_and_unlock(): fd->fd: %d, pollset: %p " - "is readable (calling fd_end_poll()) -------------------------------", - pfd[2].fd, pollset); fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, pfd[2].revents & POLLOUT_CHECK, pollset); } else if (fd) { @@ -1449,11 +1406,6 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); continue; } - /*TODO(sree) - Delete this log line*/ - gpr_log(GPR_INFO, - "multipoll_with_poll_pollset(). fd: %d became redable. Pollset: " - "%p (calling fd_end_poll())*************", - pfds[i].fd, pollset); fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, pfds[i].revents & POLLOUT_CHECK, pollset); } @@ -1761,20 +1713,9 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); } else { if (read_ev || cancel) { - /* TODO(sreek): Delete this once the issue #5470 is resolved */ - gpr_log( - GPR_INFO, - "\t>> multipoll_with_epoll_pollset: Calling " - "fd_become_readable(fd->fd: %d, pollset: %p) ++++++++++++", - fd->fd, pollset); fd_become_readable(exec_ctx, fd, pollset); } if (write_ev || cancel) { - /* TODO(sreek): Delete the following log line */ - gpr_log(GPR_INFO, - "\t>> multipoll_with_epoll_pollset: Calling " - "fd_become_writable(fd: %d)", - fd->fd); fd_become_writable(exec_ctx, fd); } } diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 03318151ccf..7045a260520 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -317,12 +317,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { goto error; } - /* TODO(sreek): Delete the following log line */ - gpr_log(GPR_INFO, "\t\t** tcp_server_posix.on_read(): Getting read notifier"); read_notifier_pollset = grpc_fd_get_read_notifier_pollset(exec_ctx, sp->emfd); - /* TODO(sreek): Delete the following log line */ - gpr_log(GPR_INFO, "\t\t** tcp_server_posix.on_read(): Got read notifier: %p", - read_notifier_pollset); /* loop until accept4 returns EAGAIN, and then re-arm notification */ for (;;) { @@ -358,18 +353,10 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { fdobj = grpc_fd_create(fd, name); if (read_notifier_pollset == NULL) { - /* TODO(sreek): Check when this would happen - Ideally this should not - * happen. Remove the next log-line once this is resolved */ - gpr_log(GPR_INFO, "\t** *******!!! tcp_server_posix.on_read(): " - "read_notifier_pollset is NULL. !!!**********************"); - gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd"); goto error; } - /* TODO(sreek): Delete the following log line */ - gpr_log(GPR_INFO, "\t\t** tcp_server_posix.on_read(): Adding fd %d *only* to pollset %p", - fd, read_notifier_pollset); grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj); sp->server->on_accept_cb( diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 25b6886f241..cbfd2458741 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1032,16 +1032,6 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, uint32_t max_probes = 0; grpc_transport_op op; - /* TODO(sreek): Delete this commented block once issue #5470 is resolved */ - /* - size_t i; - for (i = 0; i < s->cq_count; i++) { - memset(&op, 0, sizeof(op)); - op.bind_pollset = grpc_cq_pollset(s->cqs[i]); - grpc_transport_perform_op(exec_ctx, transport, &op); - } - */ - channel = grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport); chand = (channel_data *)grpc_channel_stack_element( From 5e28d71f3de6e4edc72b703e07b43709d8cc783f Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 12 Apr 2016 10:45:07 -0700 Subject: [PATCH 06/11] fix formatting --- src/core/lib/iomgr/ev_posix.c | 3 ++- src/core/lib/iomgr/ev_posix.h | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index af4126c900c..8c6ec90684f 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -83,7 +83,8 @@ void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, g_event_engine->fd_notify_on_write(exec_ctx, fd, closure); } -grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { +grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, + grpc_fd *fd) { return g_event_engine->fd_get_read_notifier_pollset(exec_ctx, fd); } diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 4cfa83e6a23..344bf63438a 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -56,7 +56,7 @@ typedef struct grpc_event_engine_vtable { void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx, - grpc_fd *fd); + grpc_fd *fd); void (*pollset_init)(grpc_pollset *pollset, gpr_mu **mu); void (*pollset_shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -141,7 +141,7 @@ void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, /* Return the read notifier pollset from the fd */ grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, - grpc_fd *fd); + grpc_fd *fd); /* pollset_posix functions */ From 9e926e8408803fcfdb5380caa28bdef73a6ddb5f Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 14 Apr 2016 10:54:14 -0700 Subject: [PATCH 07/11] Test failures fix --- test/core/end2end/fixtures/h2_sockpair+trace.c | 3 +++ test/core/end2end/fixtures/h2_sockpair.c | 3 +++ test/core/end2end/fixtures/h2_sockpair_1byte.c | 3 +++ 3 files changed, 9 insertions(+) diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.c b/test/core/end2end/fixtures/h2_sockpair+trace.c index 87533a9b7f3..b730df753ca 100644 --- a/test/core/end2end/fixtures/h2_sockpair+trace.c +++ b/test/core/end2end/fixtures/h2_sockpair+trace.c @@ -50,6 +50,7 @@ #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/support/env.h" #include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/completion_queue.h" #include "src/core/lib/surface/server.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" @@ -60,6 +61,8 @@ static void server_setup_transport(void *ts, grpc_transport *transport) { grpc_end2end_test_fixture *f = ts; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_endpoint_pair *sfd = f->fixture_data; + grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq)); grpc_server_setup_transport(&exec_ctx, f->server, transport, grpc_server_get_channel_args(f->server)); grpc_exec_ctx_finish(&exec_ctx); diff --git a/test/core/end2end/fixtures/h2_sockpair.c b/test/core/end2end/fixtures/h2_sockpair.c index f28147cf40a..41fcc1d6313 100644 --- a/test/core/end2end/fixtures/h2_sockpair.c +++ b/test/core/end2end/fixtures/h2_sockpair.c @@ -49,6 +49,7 @@ #include "src/core/lib/iomgr/endpoint_pair.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/completion_queue.h" #include "src/core/lib/surface/server.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" @@ -59,6 +60,8 @@ static void server_setup_transport(void *ts, grpc_transport *transport) { grpc_end2end_test_fixture *f = ts; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_endpoint_pair *sfd = f->fixture_data; + grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq)); grpc_server_setup_transport(&exec_ctx, f->server, transport, grpc_server_get_channel_args(f->server)); grpc_exec_ctx_finish(&exec_ctx); diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.c b/test/core/end2end/fixtures/h2_sockpair_1byte.c index 302b16b372d..4c805c43706 100644 --- a/test/core/end2end/fixtures/h2_sockpair_1byte.c +++ b/test/core/end2end/fixtures/h2_sockpair_1byte.c @@ -49,6 +49,7 @@ #include "src/core/lib/iomgr/endpoint_pair.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/completion_queue.h" #include "src/core/lib/surface/server.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" @@ -59,6 +60,8 @@ static void server_setup_transport(void *ts, grpc_transport *transport) { grpc_end2end_test_fixture *f = ts; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_endpoint_pair* sfd = f->fixture_data; + grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq)); grpc_server_setup_transport(&exec_ctx, f->server, transport, grpc_server_get_channel_args(f->server)); grpc_exec_ctx_finish(&exec_ctx); From 1f5e262589c84c2b5eb9416211bffd1f32998009 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 21 Apr 2016 12:28:09 -0700 Subject: [PATCH 08/11] Add the option of adding a non-listening server completion queue. This makes writing certain test cases (like hybrid_end2end tests) easier --- .../grpc++/impl/codegen/completion_queue.h | 11 ++++- include/grpc++/server_builder.h | 10 ++++- include/grpc/grpc.h | 9 ++++ src/core/lib/surface/completion_queue.c | 11 +++++ src/core/lib/surface/completion_queue.h | 2 + src/core/lib/surface/server.c | 44 ++++++++++++++----- src/cpp/server/server_builder.cc | 18 ++++++-- test/cpp/end2end/hybrid_end2end_test.cc | 2 +- 8 files changed, 89 insertions(+), 18 deletions(-) diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 56864d6d536..d489a90c69c 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -222,9 +222,18 @@ class CompletionQueue : private GrpcLibraryCodegen { /// A specific type of completion queue used by the processing of notifications /// by servers. Instantiated by \a ServerBuilder. class ServerCompletionQueue : public CompletionQueue { + public: + bool IsFrequentlyPolled() { return is_frequently_polled_; } + private: + bool is_frequently_polled_; friend class ServerBuilder; - ServerCompletionQueue() {} + /// \param is_frequently_polled Informs the GPRC library about whether the + /// server completion queue would be actively polled (by calling Next() or + /// AsyncNext()). By default all server completion queues are assumed to be + /// frequently polled. + ServerCompletionQueue(bool is_frequently_polled = true) + : is_frequently_polled_(is_frequently_polled) {} }; } // namespace grpc diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 86c7fecef59..85af9aa57fb 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -102,7 +102,15 @@ class ServerBuilder { /// Add a completion queue for handling asynchronous services /// Caller is required to keep this completion queue live until /// the server is destroyed. - std::unique_ptr AddCompletionQueue(); + /// + /// \param is_frequently_polled This is an optional parameter to inform GRPC + /// library about whether this completion queue would be frequently polled + /// (i.e by calling Next() or AsyncNext()). The default value is 'true' and is + /// the recommended setting. Setting this to 'false' (i.e not polling the + /// completion queue frequently) will have a significantly negative + /// performance impact and hence should not be used in production use cases. + std::unique_ptr AddCompletionQueue( + bool is_frequently_polled = true); /// Return a running server which is ready for processing calls. std::unique_ptr BuildAndStart(); diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 5c868aece37..059bd2ebc74 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -334,6 +334,15 @@ GRPCAPI void grpc_server_register_completion_queue(grpc_server *server, grpc_completion_queue *cq, void *reserved); +/** Register a non-listening completion queue with the server. This API is + similar to grpc_server_register_completion_queue except that the server will + not use this completion_queue to listen to any incoming channels. + + Registering a non-listening completion queue will have negative performance + impact and hence this API is not recommended for production use cases. */ +GRPCAPI void grpc_server_register_non_listening_completion_queue( + grpc_server *server, grpc_completion_queue *q, void *reserved); + /** Add a HTTP2 over plaintext over tcp listener. Returns bound port number on success, 0 on failure. REQUIRES: server not started */ diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 5ec8808b508..f6f7ac880cc 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -70,6 +70,8 @@ struct grpc_completion_queue { int shutdown; int shutdown_called; int is_server_cq; + /** Can the server cq accept incoming channels */ + int is_non_listening_server_cq; int num_pluckers; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; grpc_closure pollset_shutdown_done; @@ -149,6 +151,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { cc->shutdown = 0; cc->shutdown_called = 0; cc->is_server_cq = 0; + cc->is_non_listening_server_cq = 0; cc->num_pluckers = 0; #ifndef NDEBUG cc->outstanding_tag_count = 0; @@ -507,6 +510,14 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { return POLLSET_FROM_CQ(cc); } +void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) { + cc->is_non_listening_server_cq = 1; +} + +bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) { + return (cc->is_non_listening_server_cq == 1); +} + void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index eef82cf0148..ee3e0448401 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -82,6 +82,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); +void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc); +bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc); void grpc_cq_mark_server_cq(grpc_completion_queue *cc); int grpc_cq_is_server_cq(grpc_completion_queue *cc); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index cbfd2458741..c34ec04d2d3 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -895,23 +895,45 @@ const grpc_channel_filter grpc_server_top_filter = { "server", }; -void grpc_server_register_completion_queue(grpc_server *server, - grpc_completion_queue *cq, - void *reserved) { +static void register_completion_queue(grpc_server *server, + grpc_completion_queue *cq, + bool is_non_listening, void *reserved) { size_t i, n; - GRPC_API_TRACE( - "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3, - (server, cq, reserved)); GPR_ASSERT(!reserved); for (i = 0; i < server->cq_count; i++) { if (server->cqs[i] == cq) return; } - GRPC_CQ_INTERNAL_REF(cq, "server"); + grpc_cq_mark_server_cq(cq); - n = server->cq_count++; - server->cqs = gpr_realloc(server->cqs, - server->cq_count * sizeof(grpc_completion_queue *)); - server->cqs[n] = cq; + + /* Non-listening completion queues are not added to server->cqs */ + if (is_non_listening) { + grpc_cq_mark_non_listening_server_cq(cq); + } else { + GRPC_CQ_INTERNAL_REF(cq, "server"); + n = server->cq_count++; + server->cqs = gpr_realloc( + server->cqs, server->cq_count * sizeof(grpc_completion_queue *)); + server->cqs[n] = cq; + } +} + +void grpc_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq, + void *reserved) { + GRPC_API_TRACE( + "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3, + (server, cq, reserved)); + register_completion_queue(server, cq, false, reserved); +} + +void grpc_server_register_non_listening_completion_queue( + grpc_server *server, grpc_completion_queue *cq, void *reserved) { + GRPC_API_TRACE( + "grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, " + "reserved=%p)", + 3, (server, cq, reserved)); + register_completion_queue(server, cq, true, reserved); } grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 68cc38258cf..5445d3e13bc 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -46,8 +46,9 @@ ServerBuilder::ServerBuilder() grpc_compression_options_init(&compression_options_); } -std::unique_ptr ServerBuilder::AddCompletionQueue() { - ServerCompletionQueue* cq = new ServerCompletionQueue(); +std::unique_ptr ServerBuilder::AddCompletionQueue( + bool is_frequently_polled) { + ServerCompletionQueue* cq = new ServerCompletionQueue(is_frequently_polled); cqs_.push_back(cq); return std::unique_ptr(cq); } @@ -105,8 +106,17 @@ std::unique_ptr ServerBuilder::BuildAndStart() { std::unique_ptr server( new Server(thread_pool.release(), true, max_message_size_, &args)); for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { - grpc_server_register_completion_queue(server->server_, (*cq)->cq(), - nullptr); + // A completion queue that is not polled frequently (by calling Next() or + // AsyncNext()) is not safe to use for listening to incoming channels. + // Register all such completion queues as non-listening completion queues + // with the GRPC core library. + if ((*cq)->IsFrequentlyPolled()) { + grpc_server_register_completion_queue(server->server_, (*cq)->cq(), + nullptr); + } else { + grpc_server_register_non_listening_completion_queue(server->server_, + (*cq)->cq(), nullptr); + } } for (auto service = services_.begin(); service != services_.end(); service++) { diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index 02043a89d3a..0423448154d 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -216,7 +216,7 @@ class HybridEnd2endTest : public ::testing::Test { } // Create a separate cq for each potential handler. for (int i = 0; i < 5; i++) { - cqs_.push_back(builder.AddCompletionQueue()); + cqs_.push_back(builder.AddCompletionQueue(false)); } server_ = builder.BuildAndStart(); } From 7def036085bbbe61a908668da0e92c11eb4b921a Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 21 Apr 2016 14:54:32 -0700 Subject: [PATCH 09/11] Add a safety check to ensure atleast one of the completion queues is listening completion queue (i.e frequently polled) --- src/cpp/server/server_builder.cc | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 5445d3e13bc..c0d13951d70 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -86,8 +86,11 @@ void ServerBuilder::AddListeningPort(const grpc::string& addr, std::unique_ptr ServerBuilder::BuildAndStart() { std::unique_ptr thread_pool; + // Does this server have atleast one sync method + bool has_sync_methods = false; for (auto it = services_.begin(); it != services_.end(); ++it) { if ((*it)->service->has_synchronous_methods()) { + has_sync_methods = true; if (thread_pool == nullptr) { thread_pool.reset(CreateDefaultThreadPool()); break; @@ -105,6 +108,12 @@ std::unique_ptr ServerBuilder::BuildAndStart() { compression_options_.enabled_algorithms_bitset); std::unique_ptr server( new Server(thread_pool.release(), true, max_message_size_, &args)); + + // If the server has atleast one sync methods, we know that this is a Sync + // server or a Hybrid server and the completion queue (server->cq_) would be + // frequently polled. + int num_frequently_polled_cqs = has_sync_methods ? 1 : 0; + for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { // A completion queue that is not polled frequently (by calling Next() or // AsyncNext()) is not safe to use for listening to incoming channels. @@ -113,11 +122,19 @@ std::unique_ptr ServerBuilder::BuildAndStart() { if ((*cq)->IsFrequentlyPolled()) { grpc_server_register_completion_queue(server->server_, (*cq)->cq(), nullptr); + num_frequently_polled_cqs++; } else { grpc_server_register_non_listening_completion_queue(server->server_, (*cq)->cq(), nullptr); } } + + if (num_frequently_polled_cqs == 0) { + gpr_log(GPR_ERROR, + "Atleast one of the completion queues must be frequently polled"); + return nullptr; + } + for (auto service = services_.begin(); service != services_.end(); service++) { if (!server->RegisterService((*service)->host.get(), (*service)->service)) { From 01907123f6323a7494551e7a45e342dcdc068864 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 21 Apr 2016 15:09:13 -0700 Subject: [PATCH 10/11] generate_projects.sh and fix copyright year --- grpc.def | 1 + include/grpc++/impl/codegen/completion_queue.h | 2 +- include/grpc++/server_builder.h | 2 +- include/grpc/grpc.h | 2 +- src/core/lib/surface/completion_queue.c | 2 +- src/core/lib/surface/completion_queue.h | 2 +- src/core/lib/surface/server.c | 2 +- src/cpp/server/server_builder.cc | 2 +- src/proto/grpc/binary_log/v1alpha/log.proto | 2 +- src/python/grpcio/grpc/_cython/imports.generated.c | 2 ++ src/python/grpcio/grpc/_cython/imports.generated.h | 3 +++ src/ruby/ext/grpc/rb_grpc_imports.generated.c | 2 ++ src/ruby/ext/grpc/rb_grpc_imports.generated.h | 3 +++ tools/fuzzer/runners/client_fuzzer.sh | 2 +- tools/fuzzer/runners/hpack_parser_fuzzer_test.sh | 2 +- tools/fuzzer/runners/http_fuzzer_test.sh | 2 +- tools/fuzzer/runners/json_fuzzer_test.sh | 2 +- tools/fuzzer/runners/nanopb_fuzzer_response_test.sh | 2 +- tools/fuzzer/runners/nanopb_fuzzer_serverlist_test.sh | 2 +- tools/fuzzer/runners/server_fuzzer.sh | 2 +- tools/fuzzer/runners/uri_fuzzer_test.sh | 2 +- 21 files changed, 27 insertions(+), 16 deletions(-) diff --git a/grpc.def b/grpc.def index f81aa1b05a6..943b464c31f 100644 --- a/grpc.def +++ b/grpc.def @@ -77,6 +77,7 @@ EXPORTS grpc_server_request_registered_call grpc_server_create grpc_server_register_completion_queue + grpc_server_register_non_listening_completion_queue grpc_server_add_insecure_http2_port grpc_server_start grpc_server_shutdown_and_notify diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index d489a90c69c..1b84b447050 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 85af9aa57fb..5275bd3ac16 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 059bd2ebc74..ee15b9d88df 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index f6f7ac880cc..d5eb24270e3 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index ee3e0448401..1528ca4ad8f 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index c34ec04d2d3..0a84d8e7cda 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index c0d13951d70..9cd7cb2da37 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/proto/grpc/binary_log/v1alpha/log.proto b/src/proto/grpc/binary_log/v1alpha/log.proto index 6cc473be74e..83166cd4104 100644 --- a/src/proto/grpc/binary_log/v1alpha/log.proto +++ b/src/proto/grpc/binary_log/v1alpha/log.proto @@ -105,4 +105,4 @@ message Message { // The contents of the message. May be a prefix instead of the complete // message. bytes data = 5; -} \ No newline at end of file +} diff --git a/src/python/grpcio/grpc/_cython/imports.generated.c b/src/python/grpcio/grpc/_cython/imports.generated.c index 8bd6ae6372b..edad9a3131a 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.c +++ b/src/python/grpcio/grpc/_cython/imports.generated.c @@ -115,6 +115,7 @@ grpc_server_register_method_type grpc_server_register_method_import; grpc_server_request_registered_call_type grpc_server_request_registered_call_import; grpc_server_create_type grpc_server_create_import; grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import; +grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import; grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import; grpc_server_start_type grpc_server_start_import; grpc_server_shutdown_and_notify_type grpc_server_shutdown_and_notify_import; @@ -381,6 +382,7 @@ void pygrpc_load_imports(HMODULE library) { grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call"); grpc_server_create_import = (grpc_server_create_type) GetProcAddress(library, "grpc_server_create"); grpc_server_register_completion_queue_import = (grpc_server_register_completion_queue_type) GetProcAddress(library, "grpc_server_register_completion_queue"); + grpc_server_register_non_listening_completion_queue_import = (grpc_server_register_non_listening_completion_queue_type) GetProcAddress(library, "grpc_server_register_non_listening_completion_queue"); grpc_server_add_insecure_http2_port_import = (grpc_server_add_insecure_http2_port_type) GetProcAddress(library, "grpc_server_add_insecure_http2_port"); grpc_server_start_import = (grpc_server_start_type) GetProcAddress(library, "grpc_server_start"); grpc_server_shutdown_and_notify_import = (grpc_server_shutdown_and_notify_type) GetProcAddress(library, "grpc_server_shutdown_and_notify"); diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h index 272e85b4857..7354de4ba29 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.h +++ b/src/python/grpcio/grpc/_cython/imports.generated.h @@ -295,6 +295,9 @@ extern grpc_server_create_type grpc_server_create_import; typedef void(*grpc_server_register_completion_queue_type)(grpc_server *server, grpc_completion_queue *cq, void *reserved); extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import; #define grpc_server_register_completion_queue grpc_server_register_completion_queue_import +typedef void(*grpc_server_register_non_listening_completion_queue_type)(grpc_server *server, grpc_completion_queue *q, void *reserved); +extern grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import; +#define grpc_server_register_non_listening_completion_queue grpc_server_register_non_listening_completion_queue_import typedef int(*grpc_server_add_insecure_http2_port_type)(grpc_server *server, const char *addr); extern grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import; #define grpc_server_add_insecure_http2_port grpc_server_add_insecure_http2_port_import diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 56db4ec686b..149ce6c48a4 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -115,6 +115,7 @@ grpc_server_register_method_type grpc_server_register_method_import; grpc_server_request_registered_call_type grpc_server_request_registered_call_import; grpc_server_create_type grpc_server_create_import; grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import; +grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import; grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import; grpc_server_start_type grpc_server_start_import; grpc_server_shutdown_and_notify_type grpc_server_shutdown_and_notify_import; @@ -377,6 +378,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call"); grpc_server_create_import = (grpc_server_create_type) GetProcAddress(library, "grpc_server_create"); grpc_server_register_completion_queue_import = (grpc_server_register_completion_queue_type) GetProcAddress(library, "grpc_server_register_completion_queue"); + grpc_server_register_non_listening_completion_queue_import = (grpc_server_register_non_listening_completion_queue_type) GetProcAddress(library, "grpc_server_register_non_listening_completion_queue"); grpc_server_add_insecure_http2_port_import = (grpc_server_add_insecure_http2_port_type) GetProcAddress(library, "grpc_server_add_insecure_http2_port"); grpc_server_start_import = (grpc_server_start_type) GetProcAddress(library, "grpc_server_start"); grpc_server_shutdown_and_notify_import = (grpc_server_shutdown_and_notify_type) GetProcAddress(library, "grpc_server_shutdown_and_notify"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index c526f434c61..098319db77c 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -295,6 +295,9 @@ extern grpc_server_create_type grpc_server_create_import; typedef void(*grpc_server_register_completion_queue_type)(grpc_server *server, grpc_completion_queue *cq, void *reserved); extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import; #define grpc_server_register_completion_queue grpc_server_register_completion_queue_import +typedef void(*grpc_server_register_non_listening_completion_queue_type)(grpc_server *server, grpc_completion_queue *q, void *reserved); +extern grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import; +#define grpc_server_register_non_listening_completion_queue grpc_server_register_non_listening_completion_queue_import typedef int(*grpc_server_add_insecure_http2_port_type)(grpc_server *server, const char *addr); extern grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import; #define grpc_server_add_insecure_http2_port grpc_server_add_insecure_http2_port_import diff --git a/tools/fuzzer/runners/client_fuzzer.sh b/tools/fuzzer/runners/client_fuzzer.sh index 239d552c57d..97d4e60d908 100644 --- a/tools/fuzzer/runners/client_fuzzer.sh +++ b/tools/fuzzer/runners/client_fuzzer.sh @@ -33,7 +33,7 @@ flags="-max_total_time=$runtime -artifact_prefix=fuzzer_output/ -max_len=2048" if [ "$jobs" != "1" ] then - flags="-jobs=$jobs -workers=$jobs" + flags="-jobs=$jobs -workers=$jobs $flags" fi if [ "$config" == "asan-trace-cmp" ] diff --git a/tools/fuzzer/runners/hpack_parser_fuzzer_test.sh b/tools/fuzzer/runners/hpack_parser_fuzzer_test.sh index e69b4b4dfe2..c6f70a623dc 100644 --- a/tools/fuzzer/runners/hpack_parser_fuzzer_test.sh +++ b/tools/fuzzer/runners/hpack_parser_fuzzer_test.sh @@ -33,7 +33,7 @@ flags="-max_total_time=$runtime -artifact_prefix=fuzzer_output/ -max_len=512" if [ "$jobs" != "1" ] then - flags="-jobs=$jobs -workers=$jobs" + flags="-jobs=$jobs -workers=$jobs $flags" fi if [ "$config" == "asan-trace-cmp" ] diff --git a/tools/fuzzer/runners/http_fuzzer_test.sh b/tools/fuzzer/runners/http_fuzzer_test.sh index c190ba40b60..bb54a238145 100644 --- a/tools/fuzzer/runners/http_fuzzer_test.sh +++ b/tools/fuzzer/runners/http_fuzzer_test.sh @@ -33,7 +33,7 @@ flags="-max_total_time=$runtime -artifact_prefix=fuzzer_output/ -max_len=2048" if [ "$jobs" != "1" ] then - flags="-jobs=$jobs -workers=$jobs" + flags="-jobs=$jobs -workers=$jobs $flags" fi if [ "$config" == "asan-trace-cmp" ] diff --git a/tools/fuzzer/runners/json_fuzzer_test.sh b/tools/fuzzer/runners/json_fuzzer_test.sh index 9fc6271976b..e11e25dc097 100644 --- a/tools/fuzzer/runners/json_fuzzer_test.sh +++ b/tools/fuzzer/runners/json_fuzzer_test.sh @@ -33,7 +33,7 @@ flags="-max_total_time=$runtime -artifact_prefix=fuzzer_output/ -max_len=512" if [ "$jobs" != "1" ] then - flags="-jobs=$jobs -workers=$jobs" + flags="-jobs=$jobs -workers=$jobs $flags" fi if [ "$config" == "asan-trace-cmp" ] diff --git a/tools/fuzzer/runners/nanopb_fuzzer_response_test.sh b/tools/fuzzer/runners/nanopb_fuzzer_response_test.sh index bbcebf11cce..97359277ce2 100644 --- a/tools/fuzzer/runners/nanopb_fuzzer_response_test.sh +++ b/tools/fuzzer/runners/nanopb_fuzzer_response_test.sh @@ -33,7 +33,7 @@ flags="-max_total_time=$runtime -artifact_prefix=fuzzer_output/ -max_len=128" if [ "$jobs" != "1" ] then - flags="-jobs=$jobs -workers=$jobs" + flags="-jobs=$jobs -workers=$jobs $flags" fi if [ "$config" == "asan-trace-cmp" ] diff --git a/tools/fuzzer/runners/nanopb_fuzzer_serverlist_test.sh b/tools/fuzzer/runners/nanopb_fuzzer_serverlist_test.sh index e9099bac046..2dfaa2372fc 100644 --- a/tools/fuzzer/runners/nanopb_fuzzer_serverlist_test.sh +++ b/tools/fuzzer/runners/nanopb_fuzzer_serverlist_test.sh @@ -33,7 +33,7 @@ flags="-max_total_time=$runtime -artifact_prefix=fuzzer_output/ -max_len=128" if [ "$jobs" != "1" ] then - flags="-jobs=$jobs -workers=$jobs" + flags="-jobs=$jobs -workers=$jobs $flags" fi if [ "$config" == "asan-trace-cmp" ] diff --git a/tools/fuzzer/runners/server_fuzzer.sh b/tools/fuzzer/runners/server_fuzzer.sh index 28ca8b32719..fc0567f670b 100644 --- a/tools/fuzzer/runners/server_fuzzer.sh +++ b/tools/fuzzer/runners/server_fuzzer.sh @@ -33,7 +33,7 @@ flags="-max_total_time=$runtime -artifact_prefix=fuzzer_output/ -max_len=2048" if [ "$jobs" != "1" ] then - flags="-jobs=$jobs -workers=$jobs" + flags="-jobs=$jobs -workers=$jobs $flags" fi if [ "$config" == "asan-trace-cmp" ] diff --git a/tools/fuzzer/runners/uri_fuzzer_test.sh b/tools/fuzzer/runners/uri_fuzzer_test.sh index 7dac54ec518..5f33e734654 100644 --- a/tools/fuzzer/runners/uri_fuzzer_test.sh +++ b/tools/fuzzer/runners/uri_fuzzer_test.sh @@ -33,7 +33,7 @@ flags="-max_total_time=$runtime -artifact_prefix=fuzzer_output/ -max_len=128" if [ "$jobs" != "1" ] then - flags="-jobs=$jobs -workers=$jobs" + flags="-jobs=$jobs -workers=$jobs $flags" fi if [ "$config" == "asan-trace-cmp" ] From 0b9fdd8adc4b2d167e33a6d39e7ff4a46ef9a65c Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 21 Apr 2016 17:27:41 -0700 Subject: [PATCH 11/11] clang format fix --- test/core/end2end/fixtures/h2_sockpair_1byte.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.c b/test/core/end2end/fixtures/h2_sockpair_1byte.c index 4c805c43706..16ffb6ec13f 100644 --- a/test/core/end2end/fixtures/h2_sockpair_1byte.c +++ b/test/core/end2end/fixtures/h2_sockpair_1byte.c @@ -60,7 +60,7 @@ static void server_setup_transport(void *ts, grpc_transport *transport) { grpc_end2end_test_fixture *f = ts; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_endpoint_pair* sfd = f->fixture_data; + grpc_endpoint_pair *sfd = f->fixture_data; grpc_endpoint_add_to_pollset(&exec_ctx, sfd->server, grpc_cq_pollset(f->cq)); grpc_server_setup_transport(&exec_ctx, f->server, transport, grpc_server_get_channel_args(f->server));