From 084c8089a4ca3f9e1a5516751a9562b902926f3b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 29 Sep 2015 11:19:06 -0700 Subject: [PATCH] Threading fixes - single global mutex (simpler, easier to make correct for now) - properly flag kick state for workers to avoid missing wakeups (if signal is called before wait on the cv) --- src/core/iomgr/pollset_windows.c | 41 +++++++++++++++++--------------- src/core/iomgr/pollset_windows.h | 14 +++++++---- 2 files changed, 32 insertions(+), 23 deletions(-) diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index acdef9796ce..dd5301f5be9 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -43,12 +43,12 @@ #include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset_windows.h" -static gpr_mu g_polling_mu; +gpr_mu grpc_polling_mu; static grpc_pollset_worker *g_active_poller; static grpc_pollset_worker g_global_root_worker; void grpc_pollset_global_init() { - gpr_mu_init(&g_polling_mu); + gpr_mu_init(&grpc_polling_mu); g_active_poller = NULL; g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = @@ -56,7 +56,7 @@ void grpc_pollset_global_init() { } void grpc_pollset_global_shutdown() { - gpr_mu_destroy(&g_polling_mu); + gpr_mu_destroy(&grpc_polling_mu); } static void remove_worker(grpc_pollset_worker *worker, @@ -108,7 +108,6 @@ static void push_front_worker(grpc_pollset_worker *root, void grpc_pollset_init(grpc_pollset *pollset) { memset(pollset, 0, sizeof(*pollset)); - gpr_mu_init(&pollset->mu); pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = &pollset->root_worker; @@ -116,7 +115,7 @@ void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(&grpc_polling_mu); pollset->shutting_down = 1; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!pollset->is_iocp_worker) { @@ -124,11 +123,10 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } else { pollset->on_shutdown = closure; } - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(&grpc_polling_mu); } void grpc_pollset_destroy(grpc_pollset *pollset) { - gpr_mu_destroy(&pollset->mu); } void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -140,29 +138,31 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL; + worker->kicked = 0; + worker->pollset = pollset; gpr_cv_init(&worker->cv); if (grpc_alarm_check(exec_ctx, now, &deadline)) { goto done; } if (!pollset->kicked_without_pollers && !pollset->shutting_down) { - gpr_mu_lock(&g_polling_mu); if (g_active_poller == NULL) { grpc_pollset_worker *next_worker; /* become poller */ pollset->is_iocp_worker = 1; g_active_poller = worker; - gpr_mu_unlock(&g_polling_mu); - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(&grpc_polling_mu); grpc_iocp_work(exec_ctx, deadline); - gpr_mu_lock(&pollset->mu); - gpr_mu_lock(&g_polling_mu); + gpr_mu_lock(&grpc_polling_mu); pollset->is_iocp_worker = 0; g_active_poller = NULL; + /* try to get a worker from this pollsets worker list */ + next_worker = pop_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET); + /* try to get a worker from the global list */ next_worker = pop_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); if (next_worker != NULL) { + next_worker->kicked = 1; gpr_cv_signal(&next_worker->cv); } - gpr_mu_unlock(&g_polling_mu); if (pollset->shutting_down && pollset->on_shutdown != NULL) { grpc_exec_ctx_enqueue(exec_ctx, pollset->on_shutdown, 1); @@ -171,18 +171,21 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, goto done; } push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL, worker); - gpr_mu_unlock(&g_polling_mu); push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET, worker); added_worker = 1; - gpr_cv_wait(&worker->cv, &pollset->mu, deadline); + while (!worker->kicked) { + if (gpr_cv_wait(&worker->cv, &grpc_polling_mu, deadline)) { + break; + } + } } else { pollset->kicked_without_pollers = 0; } done: if (!grpc_closure_list_empty(exec_ctx->closure_list)) { - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(&grpc_polling_mu); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(&grpc_polling_mu); } if (added_worker) { remove_worker(worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); @@ -197,6 +200,7 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { for (specific_worker = p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next; specific_worker != &p->root_worker; specific_worker = specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) { + specific_worker->kicked = 1; gpr_cv_signal(&specific_worker->cv); } p->kicked_without_pollers = 1; @@ -205,12 +209,11 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { } } else { if (p->is_iocp_worker) { - gpr_mu_lock(&g_polling_mu); if (g_active_poller == specific_worker) { grpc_iocp_kick(); } - gpr_mu_unlock(&g_polling_mu); } else { + specific_worker->kicked = 1; gpr_cv_signal(&specific_worker->cv); } } diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index 55f87aca729..65ba80619b7 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -54,20 +54,26 @@ typedef struct grpc_pollset_worker_link { struct grpc_pollset_worker *prev; } grpc_pollset_worker_link; +struct grpc_pollset; +typedef struct grpc_pollset grpc_pollset; + typedef struct grpc_pollset_worker { gpr_cv cv; + int kicked; + struct grpc_pollset *pollset; grpc_pollset_worker_link links[GRPC_POLLSET_WORKER_LINK_TYPES]; } grpc_pollset_worker; -typedef struct grpc_pollset { - gpr_mu mu; +struct grpc_pollset { int shutting_down; int kicked_without_pollers; int is_iocp_worker; grpc_pollset_worker root_worker; grpc_closure *on_shutdown; -} grpc_pollset; +}; + +extern gpr_mu grpc_polling_mu; -#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) +#define GRPC_POLLSET_MU(pollset) (&grpc_polling_mu) #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */