- make kick_poller() do something on POSIX
- fix some conditions whereby alarms are held in a pollset exec context for too long
- make channel_connectivity tests dependent on the correct behavior
pull/3555/head
Craig Tiller 9 years ago
parent 10ce383e1b
commit 8afeec8838
  1. 7
      src/core/iomgr/iomgr.c
  2. 25
      src/core/iomgr/pollset_multipoller_with_epoll.c
  3. 22
      src/core/iomgr/pollset_multipoller_with_poll_posix.c
  4. 43
      src/core/iomgr/pollset_posix.c
  5. 1
      src/core/iomgr/pollset_posix.h
  6. 56
      test/core/end2end/tests/channel_connectivity.c

@ -51,13 +51,6 @@ static gpr_cv g_rcv;
static int g_shutdown;
static grpc_iomgr_object g_root_object;
void grpc_kick_poller(void) {
/* Empty. The background callback executor polls periodically. The activity
* the kicker is trying to draw the executor's attention to will be picked up
* either by one of the periodic wakeups or by one of the polling application
* threads. */
}
void grpc_iomgr_init(void) {
g_shutdown = 0;
gpr_mu_init(&g_mu);

@ -211,11 +211,15 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
int write = ep_ev[i].events & EPOLLOUT;
if (read || cancel) {
grpc_fd_become_readable(exec_ctx, fd);
}
if (write || cancel) {
grpc_fd_become_writable(exec_ctx, fd);
if (fd == NULL) {
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
} else {
if (read || cancel) {
grpc_fd_become_readable(exec_ctx, fd);
}
if (write || cancel) {
grpc_fd_become_writable(exec_ctx, fd);
}
}
}
}
@ -244,6 +248,8 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
size_t nfds) {
size_t i;
pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
epoll_event ev;
int err;
pollset->vtable = &multipoll_with_epoll_pollset;
pollset->data.ptr = h;
@ -253,6 +259,15 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
abort();
}
ev.events = (uint32_t)(EPOLLIN | EPOLLET);
ev.data.ptr = NULL;
err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
if (err < 0) {
gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
strerror(errno));
}
for (i = 0; i < nfds; i++) {
multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0);
}

@ -114,13 +114,16 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
h = pollset->data.ptr;
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
/* TODO(ctiller): perform just one malloc here if we exceed the inline case */
pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 1));
watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 1));
pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 2));
watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 2));
fd_count = 0;
pfd_count = 1;
pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
pfd_count = 2;
pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
pfds[0].events = POLLIN;
pfds[0].revents = POLLOUT;
pfds[0].revents = 0;
pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
pfds[1].events = POLLIN;
pfds[1].revents = 0;
for (i = 0; i < h->fd_count; i++) {
int remove = grpc_fd_is_orphaned(h->fds[i]);
for (j = 0; !remove && j < h->del_count; j++) {
@ -143,7 +146,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
h->fd_count = fd_count;
gpr_mu_unlock(&pollset->mu);
for (i = 1; i < pfd_count; i++) {
for (i = 2; i < pfd_count; i++) {
pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN,
POLLOUT, &watchers[i]);
}
@ -152,7 +155,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
r = grpc_poll_function(pfds, pfd_count, timeout);
GRPC_SCHEDULING_END_BLOCKING_REGION;
for (i = 1; i < pfd_count; i++) {
for (i = 2; i < pfd_count; i++) {
grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN,
pfds[i].revents & POLLOUT);
}
@ -165,9 +168,12 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
/* do nothing */
} else {
if (pfds[0].revents & POLLIN) {
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
}
if (pfds[1].revents & POLLIN) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
for (i = 1; i < pfd_count; i++) {
for (i = 2; i < pfd_count; i++) {
if (watchers[i].fd == NULL) {
continue;
}

@ -58,6 +58,7 @@ GPR_TLS_DECL(g_current_thread_poller);
GPR_TLS_DECL(g_current_thread_worker);
grpc_poll_function_type grpc_poll_function = poll;
grpc_wakeup_fd grpc_global_wakeup_fd;
static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next;
@ -121,12 +122,18 @@ void grpc_pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
gpr_tls_init(&g_current_thread_worker);
grpc_wakeup_fd_global_init();
grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
}
void grpc_pollset_global_shutdown(void) {
grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
grpc_wakeup_fd_global_destroy();
gpr_tls_destroy(&g_current_thread_poller);
gpr_tls_destroy(&g_current_thread_worker);
grpc_wakeup_fd_global_destroy();
}
void grpc_kick_poller(void) {
grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
}
/* main interface */
@ -193,6 +200,8 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
goto done;
}
if (grpc_alarm_check(exec_ctx, now, &deadline)) {
gpr_mu_unlock(&pollset->mu);
locked = 0;
goto done;
}
if (pollset->shutting_down) {
@ -294,7 +303,7 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
}
timeout = gpr_time_sub(deadline, now);
return gpr_time_to_millis(gpr_time_add(
timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1, GPR_TIMESPAN)));
timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
}
/*
@ -439,7 +448,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset_worker *worker,
gpr_timespec deadline,
gpr_timespec now) {
struct pollfd pfd[2];
struct pollfd pfd[3];
grpc_fd *fd;
grpc_fd_watcher fd_watcher;
int timeout;
@ -452,17 +461,20 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
fd = pollset->data.ptr = NULL;
}
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
pfd[0].events = POLLIN;
pfd[0].revents = 0;
nfds = 1;
pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
pfd[1].events = POLLIN;
pfd[1].revents = 0;
nfds = 2;
if (fd) {
pfd[1].fd = fd->fd;
pfd[1].revents = 0;
pfd[2].fd = fd->fd;
pfd[2].revents = 0;
gpr_mu_unlock(&pollset->mu);
pfd[1].events =
pfd[2].events =
(short)grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher);
if (pfd[1].events != 0) {
if (pfd[2].events != 0) {
nfds++;
}
} else {
@ -477,8 +489,8 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
if (fd) {
grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[1].revents & POLLIN,
pfd[1].revents & POLLOUT);
grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN,
pfd[2].revents & POLLOUT);
}
if (r < 0) {
@ -489,13 +501,16 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
/* do nothing */
} else {
if (pfd[0].revents & POLLIN) {
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
}
if (pfd[1].revents & POLLIN) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
if (nfds > 1) {
if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) {
if (nfds > 2) {
if (pfd[2].revents & (POLLIN | POLLHUP | POLLERR)) {
grpc_fd_become_readable(exec_ctx, fd);
}
if (pfd[1].revents & (POLLOUT | POLLHUP | POLLERR)) {
if (pfd[2].revents & (POLLOUT | POLLHUP | POLLERR)) {
grpc_fd_become_writable(exec_ctx, fd);
}
}

@ -129,5 +129,6 @@ int grpc_pollset_has_workers(grpc_pollset *pollset);
/* override to allow tests to hook poll() usage */
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
extern grpc_wakeup_fd grpc_global_wakeup_fd;
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */

@ -34,19 +34,49 @@
#include "test/core/end2end/end2end_tests.h"
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include "test/core/end2end/cq_verifier.h"
static void *tag(gpr_intptr t) { return (void *)t; }
typedef struct {
gpr_event started;
grpc_channel *channel;
grpc_completion_queue *cq;
} child_events;
static void child_thread(void *arg) {
child_events *ce = arg;
grpc_event ev;
gpr_event_set(&ce->started, (void*)1);
gpr_log(GPR_DEBUG, "verifying");
ev = grpc_completion_queue_next(ce->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
GPR_ASSERT(ev.tag == tag(1));
GPR_ASSERT(ev.success == 0);
}
static void test_connectivity(grpc_end2end_test_config config) {
grpc_end2end_test_fixture f = config.create_fixture(NULL, NULL);
grpc_connectivity_state state;
cq_verifier *cqv = cq_verifier_create(f.cq);
child_events ce;
gpr_thd_options thdopt = gpr_thd_options_default();
gpr_thd_id thdid;
config.init_client(&f, NULL);
ce.channel = f.client;
ce.cq = f.cq;
gpr_event_init(&ce.started);
gpr_thd_options_set_joinable(&thdopt);
GPR_ASSERT(gpr_thd_new(&thdid, child_thread, &ce, &thdopt));
gpr_event_wait(&ce.started, gpr_inf_future(GPR_CLOCK_MONOTONIC));
/* channels should start life in IDLE, and stay there */
GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 0) ==
GRPC_CHANNEL_IDLE);
@ -55,18 +85,24 @@ static void test_connectivity(grpc_end2end_test_config config) {
GRPC_CHANNEL_IDLE);
/* start watching for a change */
gpr_log(GPR_DEBUG, "watching");
grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_IDLE,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3),
gpr_now(GPR_CLOCK_MONOTONIC),
f.cq, tag(1));
/* nothing should happen */
cq_verify_empty(cqv);
/* eventually the child thread completion should trigger */
gpr_thd_join(thdid);
/* check that we're still in idle, and start connecting */
GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 1) ==
GRPC_CHANNEL_IDLE);
/* start watching for a change */
grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_IDLE,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3),
f.cq, tag(2));
/* and now the watch should trigger */
cq_expect_completion(cqv, tag(1), 1);
cq_expect_completion(cqv, tag(2), 1);
cq_verify(cqv);
state = grpc_channel_check_connectivity_state(f.client, 0);
GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
@ -75,8 +111,8 @@ static void test_connectivity(grpc_end2end_test_config config) {
/* quickly followed by a transition to TRANSIENT_FAILURE */
grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_CONNECTING,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3),
f.cq, tag(2));
cq_expect_completion(cqv, tag(2), 1);
f.cq, tag(3));
cq_expect_completion(cqv, tag(3), 1);
cq_verify(cqv);
state = grpc_channel_check_connectivity_state(f.client, 0);
GPR_ASSERT(state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
@ -93,8 +129,8 @@ static void test_connectivity(grpc_end2end_test_config config) {
READY is reached */
while (state != GRPC_CHANNEL_READY) {
grpc_channel_watch_connectivity_state(
f.client, state, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), f.cq, tag(3));
cq_expect_completion(cqv, tag(3), 1);
f.client, state, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), f.cq, tag(4));
cq_expect_completion(cqv, tag(4), 1);
cq_verify(cqv);
state = grpc_channel_check_connectivity_state(f.client, 0);
GPR_ASSERT(state == GRPC_CHANNEL_READY ||
@ -108,11 +144,11 @@ static void test_connectivity(grpc_end2end_test_config config) {
grpc_channel_watch_connectivity_state(f.client, GRPC_CHANNEL_READY,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3),
f.cq, tag(4));
f.cq, tag(5));
grpc_server_shutdown_and_notify(f.server, f.cq, tag(0xdead));
cq_expect_completion(cqv, tag(4), 1);
cq_expect_completion(cqv, tag(5), 1);
cq_expect_completion(cqv, tag(0xdead), 1);
cq_verify(cqv);
state = grpc_channel_check_connectivity_state(f.client, 0);

Loading…
Cancel
Save