From 8afeec88380c1e226949f0017540cb646d37eb5e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 28 Sep 2015 17:03:34 -0700 Subject: [PATCH] Fix alarms - make kick_poller() do something on POSIX - fix some conditions whereby alarms are held in a pollset exec context for too long - make channel_connectivity tests dependent on the correct behavior --- src/core/iomgr/iomgr.c | 7 --- .../iomgr/pollset_multipoller_with_epoll.c | 25 +++++++-- .../pollset_multipoller_with_poll_posix.c | 22 +++++--- src/core/iomgr/pollset_posix.c | 43 +++++++++----- src/core/iomgr/pollset_posix.h | 1 + .../core/end2end/tests/channel_connectivity.c | 56 +++++++++++++++---- 6 files changed, 110 insertions(+), 44 deletions(-) diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 612419b70e4..d8d84f7457d 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -51,13 +51,6 @@ static gpr_cv g_rcv; static int g_shutdown; static grpc_iomgr_object g_root_object; -void grpc_kick_poller(void) { - /* Empty. The background callback executor polls periodically. The activity - * the kicker is trying to draw the executor's attention to will be picked up - * either by one of the periodic wakeups or by one of the polling application - * threads. */ -} - void grpc_iomgr_init(void) { g_shutdown = 0; gpr_mu_init(&g_mu); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index a4293eb4a40..bcb36b472b1 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -211,11 +211,15 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI); int write = ep_ev[i].events & EPOLLOUT; - if (read || cancel) { - grpc_fd_become_readable(exec_ctx, fd); - } - if (write || cancel) { - grpc_fd_become_writable(exec_ctx, fd); + if (fd == NULL) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + } else { + if (read || cancel) { + grpc_fd_become_readable(exec_ctx, fd); + } + if (write || cancel) { + grpc_fd_become_writable(exec_ctx, fd); + } } } } @@ -244,6 +248,8 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx, size_t nfds) { size_t i; pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr)); + epoll_event ev; + int err; pollset->vtable = &multipoll_with_epoll_pollset; pollset->data.ptr = h; @@ -253,6 +259,15 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx, gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno)); abort(); } + + ev.events = (uint32_t)(EPOLLIN | EPOLLET); + ev.data.ptr = NULL; + err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev); + if (err < 0) { + gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd, + strerror(errno)); + } + for (i = 0; i < nfds; i++) { multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0); } diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 44031b8ef66..240e9daf8ee 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -114,13 +114,16 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( h = pollset->data.ptr; timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); /* TODO(ctiller): perform just one malloc here if we exceed the inline case */ - pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 1)); - watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 1)); + pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 2)); + watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 2)); fd_count = 0; - pfd_count = 1; - pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); + pfd_count = 2; + pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); pfds[0].events = POLLIN; - pfds[0].revents = POLLOUT; + pfds[0].revents = 0; + pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); + pfds[1].events = POLLIN; + pfds[1].revents = 0; for (i = 0; i < h->fd_count; i++) { int remove = grpc_fd_is_orphaned(h->fds[i]); for (j = 0; !remove && j < h->del_count; j++) { @@ -143,7 +146,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( h->fd_count = fd_count; gpr_mu_unlock(&pollset->mu); - for (i = 1; i < pfd_count; i++) { + for (i = 2; i < pfd_count; i++) { pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN, POLLOUT, &watchers[i]); } @@ -152,7 +155,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( r = grpc_poll_function(pfds, pfd_count, timeout); GRPC_SCHEDULING_END_BLOCKING_REGION; - for (i = 1; i < pfd_count; i++) { + for (i = 2; i < pfd_count; i++) { grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN, pfds[i].revents & POLLOUT); } @@ -165,9 +168,12 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( /* do nothing */ } else { if (pfds[0].revents & POLLIN) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + } + if (pfds[1].revents & POLLIN) { grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd); } - for (i = 1; i < pfd_count; i++) { + for (i = 2; i < pfd_count; i++) { if (watchers[i].fd == NULL) { continue; } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index e80963e0ea8..7adb0e626ea 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -58,6 +58,7 @@ GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_worker); grpc_poll_function_type grpc_poll_function = poll; +grpc_wakeup_fd grpc_global_wakeup_fd; static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next; @@ -121,12 +122,18 @@ void grpc_pollset_global_init(void) { gpr_tls_init(&g_current_thread_poller); gpr_tls_init(&g_current_thread_worker); grpc_wakeup_fd_global_init(); + grpc_wakeup_fd_init(&grpc_global_wakeup_fd); } void grpc_pollset_global_shutdown(void) { + grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); + grpc_wakeup_fd_global_destroy(); gpr_tls_destroy(&g_current_thread_poller); gpr_tls_destroy(&g_current_thread_worker); - grpc_wakeup_fd_global_destroy(); +} + +void grpc_kick_poller(void) { + grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } /* main interface */ @@ -193,6 +200,8 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, goto done; } if (grpc_alarm_check(exec_ctx, now, &deadline)) { + gpr_mu_unlock(&pollset->mu); + locked = 0; goto done; } if (pollset->shutting_down) { @@ -294,7 +303,7 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, } timeout = gpr_time_sub(deadline, now); return gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1, GPR_TIMESPAN))); + timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); } /* @@ -439,7 +448,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now) { - struct pollfd pfd[2]; + struct pollfd pfd[3]; grpc_fd *fd; grpc_fd_watcher fd_watcher; int timeout; @@ -452,17 +461,20 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, fd = pollset->data.ptr = NULL; } timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); - pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); + pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); pfd[0].events = POLLIN; pfd[0].revents = 0; - nfds = 1; + pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); + pfd[1].events = POLLIN; + pfd[1].revents = 0; + nfds = 2; if (fd) { - pfd[1].fd = fd->fd; - pfd[1].revents = 0; + pfd[2].fd = fd->fd; + pfd[2].revents = 0; gpr_mu_unlock(&pollset->mu); - pfd[1].events = + pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher); - if (pfd[1].events != 0) { + if (pfd[2].events != 0) { nfds++; } } else { @@ -477,8 +489,8 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); if (fd) { - grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[1].revents & POLLIN, - pfd[1].revents & POLLOUT); + grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN, + pfd[2].revents & POLLOUT); } if (r < 0) { @@ -489,13 +501,16 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, /* do nothing */ } else { if (pfd[0].revents & POLLIN) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + } + if (pfd[1].revents & POLLIN) { grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd); } - if (nfds > 1) { - if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) { + if (nfds > 2) { + if (pfd[2].revents & (POLLIN | POLLHUP | POLLERR)) { grpc_fd_become_readable(exec_ctx, fd); } - if (pfd[1].revents & (POLLOUT | POLLHUP | POLLERR)) { + if (pfd[2].revents & (POLLOUT | POLLHUP | POLLERR)) { grpc_fd_become_writable(exec_ctx, fd); } } diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index f996dd1edfb..83c52585390 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -129,5 +129,6 @@ int grpc_pollset_has_workers(grpc_pollset *pollset); /* override to allow tests to hook poll() usage */ typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); extern grpc_poll_function_type grpc_poll_function; +extern grpc_wakeup_fd grpc_global_wakeup_fd; #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */ diff --git a/test/core/end2end/tests/channel_connectivity.c b/test/core/end2end/tests/channel_connectivity.c index 0b7a8a664b1..be9b7fde1dd 100644 --- a/test/core/end2end/tests/channel_connectivity.c +++ b/test/core/end2end/tests/channel_connectivity.c @@ -34,19 +34,49 @@ #include "test/core/end2end/end2end_tests.h" #include +#include +#include #include #include "test/core/end2end/cq_verifier.h" static void *tag(gpr_intptr t) { return (void *)t; } +typedef struct { + gpr_event started; + grpc_channel *channel; + grpc_completion_queue *cq; +} child_events; + +static void child_thread(void *arg) { + child_events *ce = arg; + grpc_event ev; + gpr_event_set(&ce->started, (void*)1); + gpr_log(GPR_DEBUG, "verifying"); + ev = grpc_completion_queue_next(ce->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL); + GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + GPR_ASSERT(ev.tag == tag(1)); + GPR_ASSERT(ev.success == 0); +} + static void test_connectivity(grpc_end2end_test_config config) { grpc_end2end_test_fixture f = config.create_fixture(NULL, NULL); grpc_connectivity_state state; cq_verifier *cqv = cq_verifier_create(f.cq); + child_events ce; + gpr_thd_options thdopt = gpr_thd_options_default(); + gpr_thd_id thdid; config.init_client(&f, NULL); + ce.channel = f.client; + ce.cq = f.cq; + gpr_event_init(&ce.started); + gpr_thd_options_set_joinable(&thdopt); + GPR_ASSERT(gpr_thd_new(&thdid, child_thread, &ce, &thdopt)); + + gpr_event_wait(&ce.started, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + /* channels should start life in IDLE, and stay there */ GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 0) == GRPC_CHANNEL_IDLE); @@ -55,18 +85,24 @@ static void test_connectivity(grpc_end2end_test_config config) { GRPC_CHANNEL_IDLE); /* start watching for a change */ + gpr_log(GPR_DEBUG, "watching"); grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_IDLE, - GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), + gpr_now(GPR_CLOCK_MONOTONIC), f.cq, tag(1)); - /* nothing should happen */ - cq_verify_empty(cqv); + + /* eventually the child thread completion should trigger */ + gpr_thd_join(thdid); /* check that we're still in idle, and start connecting */ GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 1) == GRPC_CHANNEL_IDLE); + /* start watching for a change */ + grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_IDLE, + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), + f.cq, tag(2)); /* and now the watch should trigger */ - cq_expect_completion(cqv, tag(1), 1); + cq_expect_completion(cqv, tag(2), 1); cq_verify(cqv); state = grpc_channel_check_connectivity_state(f.client, 0); GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE || @@ -75,8 +111,8 @@ static void test_connectivity(grpc_end2end_test_config config) { /* quickly followed by a transition to TRANSIENT_FAILURE */ grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_CONNECTING, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), - f.cq, tag(2)); - cq_expect_completion(cqv, tag(2), 1); + f.cq, tag(3)); + cq_expect_completion(cqv, tag(3), 1); cq_verify(cqv); state = grpc_channel_check_connectivity_state(f.client, 0); GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE || @@ -93,8 +129,8 @@ static void test_connectivity(grpc_end2end_test_config config) { READY is reached */ while (state != GRPC_CHANNEL_READY) { grpc_channel_watch_connectivity_state( - f.client, state, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), f.cq, tag(3)); - cq_expect_completion(cqv, tag(3), 1); + f.client, state, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), f.cq, tag(4)); + cq_expect_completion(cqv, tag(4), 1); cq_verify(cqv); state = grpc_channel_check_connectivity_state(f.client, 0); GPR_ASSERT(state == GRPC_CHANNEL_READY || @@ -108,11 +144,11 @@ static void test_connectivity(grpc_end2end_test_config config) { grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_READY, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), - f.cq, tag(4)); + f.cq, tag(5)); grpc_server_shutdown_and_notify(f.server, f.cq, tag(0xdead)); - cq_expect_completion(cqv, tag(4), 1); + cq_expect_completion(cqv, tag(5), 1); cq_expect_completion(cqv, tag(0xdead), 1); cq_verify(cqv); state = grpc_channel_check_connectivity_state(f.client, 0);