Better error handling and add polling_island_unlock_pair() helper

pull/6803/head
Sree Kuchibhotla 9 years ago
parent 05fd4fe043
commit 20d0a167be
  1. 274
      src/core/lib/iomgr/ev_epoll_linux.c

@ -237,6 +237,19 @@ struct grpc_pollset_set {
grpc_fd **fds;
};
/*******************************************************************************
* Common helpers
*/
static void append_error(grpc_error **composite, grpc_error *error,
const char *desc) {
if (error == GRPC_ERROR_NONE) return;
if (*composite == GRPC_ERROR_NONE) {
*composite = GRPC_ERROR_CREATE(desc);
}
*composite = grpc_error_add_child(*composite, error);
}
/*******************************************************************************
* Polling island Definitions
*/
@ -316,10 +329,13 @@ long pi_unref(polling_island *pi, int ref_cnt) {
/* The caller is expected to hold pi->mu lock before calling this function */
static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
size_t fd_count, bool add_fd_refs) {
size_t fd_count, bool add_fd_refs,
grpc_error **error) {
int err;
size_t i;
struct epoll_event ev;
char *err_msg;
const char *err_desc = "polling_island_add_fds";
#ifdef GRPC_TSAN
/* See the definition of g_epoll_sync for more context */
@ -333,10 +349,12 @@ static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
if (err < 0) {
if (errno != EEXIST) {
/* TODO: sreek - We need a better way to bubble up this error instead of
just logging a message */
gpr_log(GPR_ERROR, "epoll_ctl add for fd: %d failed with error: %s",
fds[i]->fd, strerror(errno));
gpr_asprintf(
&err_msg,
"epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_free(err_msg);
}
continue;
@ -356,37 +374,47 @@ static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
/* The caller is expected to hold pi->mu before calling this */
static void polling_island_add_wakeup_fd_locked(polling_island *pi,
grpc_wakeup_fd *wakeup_fd) {
grpc_wakeup_fd *wakeup_fd,
grpc_error **error) {
struct epoll_event ev;
int err;
char *err_msg;
const char *err_desc = "polling_island_add_wakeup_fd";
ev.events = (uint32_t)(EPOLLIN | EPOLLET);
ev.data.ptr = wakeup_fd;
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
if (err < 0) {
gpr_log(GPR_ERROR,
"Failed to add grpc_wake_up_fd (%d) to the epoll set (epoll_fd: %d)"
". Error: %s",
GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), pi->epoll_fd,
if (err < 0 && errno != EEXIST) {
gpr_asprintf(&err_msg,
"epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
"error: %d (%s)",
pi->epoll_fd,
GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
strerror(errno));
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_free(err_msg);
}
}
/* The caller is expected to hold pi->mu lock before calling this function */
static void polling_island_remove_all_fds_locked(polling_island *pi,
bool remove_fd_refs) {
bool remove_fd_refs,
grpc_error **error) {
int err;
size_t i;
char *err_msg;
const char *err_desc = "polling_island_remove_fds";
for (i = 0; i < pi->fd_cnt; i++) {
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
if (err < 0 && errno != ENOENT) {
/* TODO: sreek - We need a better way to bubble up this error instead of
* just logging a message */
gpr_log(GPR_ERROR,
"epoll_ctl deleting fds[%zu]: %d failed with error: %s", i,
pi->fds[i]->fd, strerror(errno));
gpr_asprintf(&err_msg,
"epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
"error: %d (%s)",
pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_free(err_msg);
}
if (remove_fd_refs) {
@ -399,17 +427,24 @@ static void polling_island_remove_all_fds_locked(polling_island *pi,
/* The caller is expected to hold pi->mu lock before calling this function */
static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
bool is_fd_closed) {
bool is_fd_closed,
grpc_error **error) {
int err;
size_t i;
char *err_msg;
const char *err_desc = "polling_island_remove_fd";
/* If fd is already closed, then it would have been automatically been removed
from the epoll set */
if (!is_fd_closed) {
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
if (err < 0 && errno != ENOENT) {
gpr_log(GPR_ERROR, "epoll_ctl deleting fd: %d failed with error; %s",
fd->fd, strerror(errno));
gpr_asprintf(
&err_msg,
"epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
pi->epoll_fd, fd->fd, errno, strerror(errno));
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_free(err_msg);
}
}
@ -422,8 +457,12 @@ static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
}
}
static polling_island *polling_island_create(grpc_fd *initial_fd) {
/* Might return NULL in case of an error */
static polling_island *polling_island_create(grpc_fd *initial_fd,
grpc_error **error) {
polling_island *pi = NULL;
char *err_msg;
const char *err_desc = "polling_island_create";
/* Try to get one from the polling island freelist */
gpr_mu_lock(&g_pi_freelist_mu);
@ -449,23 +488,23 @@ static polling_island *polling_island_create(grpc_fd *initial_fd) {
pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (pi->epoll_fd < 0) {
gpr_log(GPR_ERROR, "epoll_create1() failed with error: %s",
gpr_asprintf(&err_msg, "epoll_create1 failed with error %d (%s)", errno,
strerror(errno));
}
GPR_ASSERT(pi->epoll_fd >= 0);
polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd);
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_free(err_msg);
} else {
polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
pi->next_free = NULL;
if (initial_fd != NULL) {
/* Lock the polling island here just in case we got this structure from the
freelist and the polling island lock was not released yet (by the code
that adds the polling island to the freelist) */
/* Lock the polling island here just in case we got this structure from
the freelist and the polling island lock was not released yet (by the
code that adds the polling island to the freelist) */
gpr_mu_lock(&pi->mu);
polling_island_add_fds_locked(pi, &initial_fd, 1, true);
polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
gpr_mu_unlock(&pi->mu);
}
}
return pi;
}
@ -534,7 +573,9 @@ static polling_island *polling_island_lock(polling_island *pi) {
return pi;
}
/* Gets the lock on the *latest* polling islands pointed by *p and *q.
/* Gets the lock on the *latest* polling islands in the linked lists pointed by
*p and *q (and also updates *p and *q to point to the latest polling islands)
This function is needed because calling the following block of code to obtain
locks on polling islands (*p and *q) is prone to deadlocks.
{
@ -550,18 +591,8 @@ static polling_island *polling_island_lock(polling_island *pi) {
..
.. Critical section with both p1 and p2 locked
..
// Release locks
// **IMPORTANT**: Make sure you check p1 == p2 AFTER the function
// polling_island_lock_pair() was called and if so, release the lock only
// once. Note: Even if p1 != p2 beforec calling polling_island_lock_pair(),
// they might be after the function returns:
if (p1 == p2) {
gpr_mu_unlock(&p1->mu)
} else {
gpr_mu_unlock(&p1->mu);
gpr_mu_unlock(&p2->mu);
}
// Release locks: Always call polling_island_unlock_pair() to release locks
polling_island_unlock_pair(p1, p2);
*/
static void polling_island_lock_pair(polling_island **p, polling_island **q) {
polling_island *pi_1 = *p;
@ -623,39 +654,46 @@ static void polling_island_lock_pair(polling_island **p, polling_island **q) {
*q = pi_2;
}
static polling_island *polling_island_merge(polling_island *p,
polling_island *q) {
/* Get locks on both the polling islands */
polling_island_lock_pair(&p, &q);
static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
if (p == q) {
/* Nothing needs to be done here */
gpr_mu_unlock(&p->mu);
return p;
} else {
gpr_mu_unlock(&p->mu);
gpr_mu_unlock(&q->mu);
}
}
static polling_island *polling_island_merge(polling_island *p,
polling_island *q,
grpc_error **error) {
/* Get locks on both the polling islands */
polling_island_lock_pair(&p, &q);
if (p != q) {
/* Make sure that p points to the polling island with fewer fds than q */
if (p->fd_cnt > q->fd_cnt) {
GPR_SWAP(polling_island *, p, q);
}
/* "Merge" p with q i.e move all the fds from p (The one with fewer fds) to q
Note that the refcounts on the fds being moved will not change here. This
is why the last parameter in the following two functions is 'false') */
polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false);
polling_island_remove_all_fds_locked(p, false);
/* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
Note that the refcounts on the fds being moved will not change here.
This is why the last param in the following two functions is 'false') */
polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
polling_island_remove_all_fds_locked(p, false, error);
/* Wakeup all the pollers (if any) on p so that they can pickup this change */
polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd);
/* Wakeup all the pollers (if any) on p so that they pickup this change */
polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
/* Add the 'merged_to' link from p --> q */
gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
}
/* else if p == q, nothing needs to be done */
gpr_mu_unlock(&p->mu);
gpr_mu_unlock(&q->mu);
polling_island_unlock_pair(p, q);
/* Return the merged polling island */
/* Return the merged polling island (Note that no merge would have happened
if p == q which is ok) */
return q;
}
@ -853,6 +891,8 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
const char *reason) {
bool is_fd_closed = false;
grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_lock(&fd->mu);
fd->on_done_closure = on_done;
@ -882,7 +922,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
gpr_mu_lock(&fd->pi_mu);
if (fd->polling_island != NULL) {
polling_island *pi_latest = polling_island_lock(fd->polling_island);
polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed);
polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
PI_UNREF(fd->polling_island, "fd_orphan");
@ -890,10 +930,11 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
gpr_mu_unlock(&fd->pi_mu);
grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL);
grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, error, NULL);
gpr_mu_unlock(&fd->mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
}
static grpc_error *fd_shutdown_error(bool shutdown) {
@ -1062,19 +1103,12 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next->prev = worker;
}
static void kick_append_error(grpc_error **composite, grpc_error *error) {
if (error == GRPC_ERROR_NONE) return;
if (*composite == GRPC_ERROR_NONE) {
*composite = GRPC_ERROR_CREATE("Kick Failure");
}
*composite = grpc_error_add_child(*composite, error);
}
/* p->mu must be held before calling this function */
static grpc_error *pollset_kick(grpc_pollset *p,
grpc_pollset_worker *specific_worker) {
GPR_TIMER_BEGIN("pollset_kick", 0);
grpc_error *error = GRPC_ERROR_NONE;
const char *err_desc = "Kick Failure";
grpc_pollset_worker *worker = specific_worker;
if (worker != NULL) {
@ -1084,7 +1118,7 @@ static grpc_error *pollset_kick(grpc_pollset *p,
for (worker = p->root_worker.next; worker != &p->root_worker;
worker = worker->next) {
if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
kick_append_error(&error, pollset_worker_kick(worker));
append_error(&error, pollset_worker_kick(worker), err_desc);
}
}
} else {
@ -1094,7 +1128,7 @@ static grpc_error *pollset_kick(grpc_pollset *p,
} else {
GPR_TIMER_MARK("kicked_specifically", 0);
if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
kick_append_error(&error, pollset_worker_kick(worker));
append_error(&error, pollset_worker_kick(worker), err_desc);
}
}
} else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
@ -1110,7 +1144,7 @@ static grpc_error *pollset_kick(grpc_pollset *p,
if (worker != NULL) {
GPR_TIMER_MARK("finally_kick", 0);
push_back_worker(p, worker);
kick_append_error(&error, pollset_worker_kick(worker));
append_error(&error, pollset_worker_kick(worker), err_desc);
} else {
GPR_TIMER_MARK("kicked_no_pollers", 0);
p->kicked_without_pollers = true;
@ -1238,23 +1272,17 @@ static void pollset_reset(grpc_pollset *pollset) {
pollset_release_polling_island(pollset, "ps_reset");
}
static void work_combine_error(grpc_error **composite, grpc_error *error) {
if (error == GRPC_ERROR_NONE) return;
if (*composite == GRPC_ERROR_NONE) {
*composite = GRPC_ERROR_CREATE("pollset_work");
}
*composite = grpc_error_add_child(*composite, error);
}
#define GRPC_EPOLL_MAX_EVENTS 1000
static grpc_error *pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
int timeout_ms, sigset_t *sig_mask) {
/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset, int timeout_ms,
sigset_t *sig_mask, grpc_error **error) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int epoll_fd = -1;
int ep_rv;
polling_island *pi = NULL;
grpc_error *error = GRPC_ERROR_NONE;
char *err_msg;
const char *err_desc = "pollset_work_and_unlock";
GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
/* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
@ -1265,11 +1293,15 @@ static grpc_error *pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
which we got the epoll_fd) got merged with another island while we are
in this function. This is still okay because in such a case, we will wakeup
right-away from epoll_wait() and pick up the latest polling_island the next
this function (i.e pollset_work_and_unlock()) is called.
*/
this function (i.e pollset_work_and_unlock()) is called */
if (pollset->polling_island == NULL) {
pollset->polling_island = polling_island_create(NULL);
pollset->polling_island = polling_island_create(NULL, error);
if (pollset->polling_island == NULL) {
GPR_TIMER_END("pollset_work_and_unlock", 0);
return; /* Fatal error. We cannot continue */
}
PI_ADD_REF(pollset->polling_island, "ps");
}
@ -1297,8 +1329,10 @@ static grpc_error *pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
sig_mask);
if (ep_rv < 0) {
if (errno != EINTR) {
gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
work_combine_error(&error, GRPC_OS_ERROR(errno, "epoll_pwait"));
gpr_asprintf(&err_msg,
"epoll_wait() epoll fd: %d failed with error: %d (%s)",
epoll_fd, errno, strerror(errno));
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
} else {
/* We were interrupted. Save an interation by doing a zero timeout
epoll_wait to see if there are any other events of interest */
@ -1314,8 +1348,9 @@ static grpc_error *pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
for (int i = 0; i < ep_rv; ++i) {
void *data_ptr = ep_ev[i].data.ptr;
if (data_ptr == &grpc_global_wakeup_fd) {
work_combine_error(
&error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
append_error(error,
grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
err_desc);
} else if (data_ptr == &polling_island_wakeup_fd) {
/* This means that our polling island is merged with a different
island. We do not have to do anything here since the subsequent call
@ -1346,7 +1381,6 @@ static grpc_error *pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
PI_UNREF(pi, "ps_work");
GPR_TIMER_END("pollset_work_and_unlock", 0);
return error;
}
/* pollset->mu lock must be held by the caller before calling this.
@ -1368,6 +1402,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
worker.pt_id = pthread_self();
*worker_hdl = &worker;
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
@ -1379,14 +1414,37 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
pollset->kicked_without_pollers = 0;
} else if (!pollset->shutting_down) {
/* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
(i.e 'kicking') a worker in the pollset.
A 'kick' is a way to inform that worker that there is some pending work
that needs immediate attention (like an event on the completion queue,
or a polling island merge that results in a new epoll-fd to wait on) and
that the worker should not spend time waiting in epoll_pwait().
A kick can come at anytime (i.e before/during or after the worker calls
epoll_pwait()) but in all cases we have to make sure that when a worker
gets a kick, it does not spend time in epoll_pwait(). In other words, one
kick should result in skipping/exiting of one epoll_pwait();
To accomplish this, we mask 'grpc_wakeup_signal' on this worker at all
times *except* when it is in epoll_pwait(). This way, the worker never
misses acting on a kick */
sigemptyset(&new_mask);
sigaddset(&new_mask, grpc_wakeup_signal);
pthread_sigmask(SIG_BLOCK, &new_mask, &orig_mask);
sigdelset(&orig_mask, grpc_wakeup_signal);
/* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'. This is
the mask used at all times *except during epoll_wait()*"
orig_mask: The thread mask which allows 'grpc_wakeup_signal' and this is
the mask to use *during epoll_wait()*
The new_mask is set on the worker before it is added to the pollset (i.e
before it can be kicked) */
push_front_worker(pollset, &worker);
push_front_worker(pollset, &worker); /* Add worker to pollset */
error = pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask);
pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask, &error);
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu);
@ -1412,15 +1470,20 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
*worker_hdl = NULL;
gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
GPR_TIMER_END("pollset_work", 0);
GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
return error;
}
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {
grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_lock(&pollset->mu);
gpr_mu_lock(&fd->pi_mu);
@ -1443,19 +1506,23 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (fd->polling_island == pollset->polling_island) {
pi_new = fd->polling_island;
if (pi_new == NULL) {
pi_new = polling_island_create(fd);
pi_new = polling_island_create(fd, &error);
}
} else if (fd->polling_island == NULL) {
pi_new = polling_island_lock(pollset->polling_island);
polling_island_add_fds_locked(pi_new, &fd, 1, true);
polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
gpr_mu_unlock(&pi_new->mu);
} else if (pollset->polling_island == NULL) {
pi_new = polling_island_lock(fd->polling_island);
gpr_mu_unlock(&pi_new->mu);
} else {
pi_new = polling_island_merge(fd->polling_island, pollset->polling_island);
pi_new = polling_island_merge(fd->polling_island, pollset->polling_island,
&error);
}
/* At this point, pi_new is the polling island that both fd->polling_island
and pollset->polling_island must be pointing to */
if (fd->polling_island != pi_new) {
PI_ADD_REF(pi_new, "fd");
if (fd->polling_island != NULL) {
@ -1645,13 +1712,10 @@ bool grpc_are_polling_islands_equal(void *p, void *q) {
polling_island *p1 = p;
polling_island *p2 = q;
/* Note: polling_island_lock_pair() may change p1 and p2 to point to the
latest polling islands in their respective linked lists */
polling_island_lock_pair(&p1, &p2);
if (p1 == p2) {
gpr_mu_unlock(&p1->mu);
} else {
gpr_mu_unlock(&p1->mu);
gpr_mu_unlock(&p2->mu);
}
polling_island_unlock_pair(p1, p2);
return p1 == p2;
}

Loading…
Cancel
Save