From 3dedb70b38683a491ae3d150fdd206adabab8b4e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 24 Sep 2015 10:31:26 -0700 Subject: [PATCH 1/9] Implement turnstile polling for Windows --- src/core/iomgr/iocp_windows.c | 61 +++++++------- src/core/iomgr/iocp_windows.h | 1 + src/core/iomgr/iomgr.c | 2 + src/core/iomgr/iomgr_internal.h | 3 + src/core/iomgr/iomgr_posix.c | 2 - src/core/iomgr/iomgr_posix.h | 3 - src/core/iomgr/pollset_windows.c | 133 ++++++++++++++++++++++++------- src/core/iomgr/pollset_windows.h | 16 +++- 8 files changed, 153 insertions(+), 68 deletions(-) diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index c2f62a41b83..af846f680f7 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -50,13 +50,28 @@ static ULONG g_iocp_kick_token; static OVERLAPPED g_iocp_custom_overlap; -static gpr_event g_shutdown_iocp; -static gpr_event g_iocp_done; static gpr_atm g_custom_events = 0; static HANDLE g_iocp; -static void do_iocp_work(grpc_exec_ctx *exec_ctx) { +static DWORD deadline_to_millis_timeout(gpr_timespec deadline, + gpr_timespec now) { + gpr_timespec timeout; + static const int max_spin_polling_us = 10; + if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { + return INFINITE; + } + if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( + max_spin_polling_us, + GPR_TIMESPAN))) <= 0) { + return 0; + } + timeout = gpr_time_sub(deadline, now); + return gpr_time_to_millis(gpr_time_add( + timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1, GPR_TIMESPAN))); +} + +void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { BOOL success; DWORD bytes = 0; DWORD flags = 0; @@ -66,10 +81,10 @@ static void do_iocp_work(grpc_exec_ctx *exec_ctx) { grpc_winsocket_callback_info *info; grpc_closure *closure = NULL; success = GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, - &overlapped, INFINITE); - /* success = 0 and overlapped = NULL means the deadline got attained. - Which is impossible. since our wait time is +inf */ - GPR_ASSERT(success || overlapped); + &overlapped, deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type))); + if (success == 0 && overlapped == NULL) { + return; + } GPR_ASSERT(completion_key && overlapped); if (overlapped == &g_iocp_custom_overlap) { gpr_atm_full_fetch_add(&g_custom_events, -1); @@ -109,29 +124,10 @@ static void do_iocp_work(grpc_exec_ctx *exec_ctx) { } } -static void iocp_loop(void *p) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - - while (gpr_atm_acq_load(&g_custom_events) || - !gpr_event_get(&g_shutdown_iocp)) { - do_iocp_work(&exec_ctx); - grpc_exec_ctx_flush(&exec_ctx); - } - grpc_exec_ctx_finish(&exec_ctx); - - gpr_event_set(&g_iocp_done, (void *)1); -} - void grpc_iocp_init(void) { - gpr_thd_id id; - g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0); GPR_ASSERT(g_iocp); - - gpr_event_init(&g_iocp_done); - gpr_event_init(&g_shutdown_iocp); - gpr_thd_new(&id, iocp_loop, NULL, NULL); } void grpc_iocp_kick(void) { @@ -144,12 +140,13 @@ void grpc_iocp_kick(void) { } void grpc_iocp_shutdown(void) { - BOOL success; - gpr_event_set(&g_shutdown_iocp, (void *)1); - grpc_iocp_kick(); - gpr_event_wait(&g_iocp_done, gpr_inf_future(GPR_CLOCK_REALTIME)); - success = CloseHandle(g_iocp); - GPR_ASSERT(success); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + while (gpr_atm_acq_load(&g_custom_events)) { + grpc_iocp_work(&exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_exec_ctx_flush(&exec_ctx); + } + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(CloseHandle(g_iocp)); } void grpc_iocp_add_socket(grpc_winsocket *socket) { diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h index b0209e04e33..7e330e7ce2f 100644 --- a/src/core/iomgr/iocp_windows.h +++ b/src/core/iomgr/iocp_windows.h @@ -38,6 +38,7 @@ #include "src/core/iomgr/socket_windows.h" +void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline); void grpc_iocp_init(void); void grpc_iocp_kick(void); void grpc_iocp_shutdown(void); diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index ef222416af7..fe5c1d4e8fb 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -66,6 +66,7 @@ void grpc_iomgr_init(void) { g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; grpc_iomgr_platform_init(); + grpc_pollset_global_init(); } static size_t count_objects(void) { @@ -138,6 +139,7 @@ void grpc_iomgr_shutdown(void) { gpr_mu_lock(&g_mu); gpr_mu_unlock(&g_mu); + grpc_pollset_global_shutdown(); grpc_iomgr_platform_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_rcv); diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h index f266732c96b..1a0724b431c 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_internal.h @@ -43,6 +43,9 @@ typedef struct grpc_iomgr_object { struct grpc_iomgr_object *prev; } grpc_iomgr_object; +void grpc_pollset_global_init(void); +void grpc_pollset_global_shutdown(void); + void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name); void grpc_iomgr_unregister_object(grpc_iomgr_object *obj); diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/iomgr_posix.c index 2425e599415..db93d0a7561 100644 --- a/src/core/iomgr/iomgr_posix.c +++ b/src/core/iomgr/iomgr_posix.c @@ -42,12 +42,10 @@ void grpc_iomgr_platform_init(void) { grpc_fd_global_init(); - grpc_pollset_global_init(); grpc_register_tracer("tcp", &grpc_tcp_trace); } void grpc_iomgr_platform_shutdown(void) { - grpc_pollset_global_shutdown(); grpc_fd_global_shutdown(); } diff --git a/src/core/iomgr/iomgr_posix.h b/src/core/iomgr/iomgr_posix.h index 716fedb6368..068a5c6d7cf 100644 --- a/src/core/iomgr/iomgr_posix.h +++ b/src/core/iomgr/iomgr_posix.h @@ -36,7 +36,4 @@ #include "src/core/iomgr/iomgr_internal.h" -void grpc_pollset_global_init(void); -void grpc_pollset_global_shutdown(void); - #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_POSIX_H */ diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 6182eb35328..cb0aad0e33c 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -39,38 +39,66 @@ #include "src/core/iomgr/alarm_internal.h" #include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset_windows.h" -static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { - worker->prev->next = worker->next; - worker->next->prev = worker->prev; +static gpr_mu g_polling_mu; +static grpc_pollset_worker *g_active_poller; +static grpc_pollset_worker g_global_root_worker; + +void grpc_pollset_global_init() { + gpr_mu_init(&g_polling_mu); + g_active_poller = NULL; + g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = + g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = + &g_global_root_worker; +} + +void grpc_pollset_global_shutdown() { + gpr_mu_destroy(&g_polling_mu); } -static int has_workers(grpc_pollset *p) { - return p->root_worker.next != &p->root_worker; +static void remove_worker(grpc_pollset_worker *worker, + grpc_pollset_worker_link_type type) { + worker->links[type].prev->links[type].next = worker->links[type].next; + worker->links[type].next->links[type].prev = worker->links[type].prev; + worker->links[type].next = worker->links[type].prev = worker; } -static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { - if (has_workers(p)) { - grpc_pollset_worker *w = p->root_worker.next; - remove_worker(p, w); +static int has_workers(grpc_pollset_worker *root, grpc_pollset_worker_link_type type) { + return root->links[type].next != root; +} + +static grpc_pollset_worker *pop_front_worker( + grpc_pollset_worker *root, grpc_pollset_worker_link_type type) { + if (has_workers(root, type)) { + grpc_pollset_worker *w = root->links[type].next; + remove_worker(w, type); return w; } else { return NULL; } } -static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) { - worker->next = &p->root_worker; - worker->prev = worker->next->prev; - worker->prev->next = worker->next->prev = worker; +static void push_back_worker(grpc_pollset_worker *root, + grpc_pollset_worker_link_type type, + grpc_pollset_worker *worker) { + worker->links[type].next = root; + worker->links[type].prev = worker->links[type].next->links[type].prev; + worker->links[type].prev->links[type].next = + worker->links[type].next->links[type].prev = + worker; } -static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { - worker->prev = &p->root_worker; - worker->next = worker->prev->next; - worker->prev->next = worker->next->prev = worker; +static void push_front_worker(grpc_pollset_worker *root, + grpc_pollset_worker_link_type type, + grpc_pollset_worker *worker) { + worker->links[type].prev = root; + worker->links[type].next = worker->links[type].prev->links[type].next; + worker->links[type].prev->links[type].next = + worker->links[type].next->links[type].prev = + worker; } /* There isn't really any such thing as a pollset under Windows, due to the @@ -81,8 +109,9 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { void grpc_pollset_init(grpc_pollset *pollset) { memset(pollset, 0, sizeof(*pollset)); gpr_mu_init(&pollset->mu); - pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; - pollset->kicked_without_pollers = 0; + pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = + pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = + &pollset->root_worker; } void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -90,8 +119,12 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); pollset->shutting_down = 1; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); + if (!pollset->is_iocp_worker) { + grpc_exec_ctx_enqueue(exec_ctx, closure, 1); + } else { + pollset->on_shutdown = closure; + } gpr_mu_unlock(&pollset->mu); - grpc_exec_ctx_enqueue(exec_ctx, closure, 1); } void grpc_pollset_destroy(grpc_pollset *pollset) { @@ -102,13 +135,42 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec now, gpr_timespec deadline) { int added_worker = 0; - worker->next = worker->prev = NULL; + worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = + worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = + NULL; gpr_cv_init(&worker->cv); if (grpc_alarm_check(exec_ctx, now, &deadline)) { goto done; } if (!pollset->kicked_without_pollers && !pollset->shutting_down) { - push_front_worker(pollset, worker); + gpr_mu_lock(&g_polling_mu); + if (g_active_poller == NULL) { + grpc_pollset_worker *next_worker; + /* become poller */ + pollset->is_iocp_worker = 1; + g_active_poller = worker; + gpr_mu_unlock(&g_polling_mu); + gpr_mu_unlock(&pollset->mu); + grpc_iocp_work(exec_ctx, deadline); + gpr_mu_lock(&pollset->mu); + gpr_mu_lock(&g_polling_mu); + pollset->is_iocp_worker = 0; + g_active_poller = NULL; + next_worker = pop_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); + if (next_worker != NULL) { + gpr_cv_signal(&next_worker->cv); + } + gpr_mu_unlock(&g_polling_mu); + + if (pollset->shutting_down && pollset->on_shutdown != NULL) { + grpc_exec_ctx_enqueue(exec_ctx, pollset->on_shutdown, 1); + pollset->on_shutdown = NULL; + } + goto done; + } + push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL, worker); + gpr_mu_unlock(&g_polling_mu); + push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET, worker); added_worker = 1; gpr_cv_wait(&worker->cv, &pollset->mu, deadline); } else { @@ -122,27 +184,40 @@ done: } gpr_cv_destroy(&worker->cv); if (added_worker) { - remove_worker(pollset, worker); + remove_worker(worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); + remove_worker(worker, GRPC_POLLSET_WORKER_LINK_POLLSET); } } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { - for (specific_worker = p->root_worker.next; + for (specific_worker = p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next; specific_worker != &p->root_worker; - specific_worker = specific_worker->next) { + specific_worker = specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) { gpr_cv_signal(&specific_worker->cv); } p->kicked_without_pollers = 1; + if (p->is_iocp_worker) { + grpc_iocp_kick(); + } } else { - gpr_cv_signal(&specific_worker->cv); + if (p->is_iocp_worker) { + gpr_mu_lock(&g_polling_mu); + if (g_active_poller == specific_worker) { + grpc_iocp_kick(); + } + gpr_mu_unlock(&g_polling_mu); + } else { + gpr_cv_signal(&specific_worker->cv); + } } } else { - specific_worker = pop_front_worker(p); + specific_worker = pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET); if (specific_worker != NULL) { - push_back_worker(p, specific_worker); - gpr_cv_signal(&specific_worker->cv); + grpc_pollset_kick(p, specific_worker); + } else if (p->is_iocp_worker) { + grpc_iocp_kick(); } else { p->kicked_without_pollers = 1; } diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index 4efa5a1717c..55f87aca729 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -43,17 +43,29 @@ used to synchronize with the IOCP, and workers are condition variables used to block threads until work is ready. */ -typedef struct grpc_pollset_worker { - gpr_cv cv; +typedef enum { + GRPC_POLLSET_WORKER_LINK_POLLSET = 0, + GRPC_POLLSET_WORKER_LINK_GLOBAL, + GRPC_POLLSET_WORKER_LINK_TYPES +} grpc_pollset_worker_link_type; + +typedef struct grpc_pollset_worker_link { struct grpc_pollset_worker *next; struct grpc_pollset_worker *prev; +} grpc_pollset_worker_link; + +typedef struct grpc_pollset_worker { + gpr_cv cv; + grpc_pollset_worker_link links[GRPC_POLLSET_WORKER_LINK_TYPES]; } grpc_pollset_worker; typedef struct grpc_pollset { gpr_mu mu; int shutting_down; int kicked_without_pollers; + int is_iocp_worker; grpc_pollset_worker root_worker; + grpc_closure *on_shutdown; } grpc_pollset; #define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) From 114bda10906a602907fd9e469b9583a9e219fa47 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 29 Sep 2015 08:37:09 -0700 Subject: [PATCH 2/9] Fix rounding error so IOCP completes on time --- src/core/iomgr/iocp_windows.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index af846f680f7..349440fa88c 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -68,7 +68,7 @@ static DWORD deadline_to_millis_timeout(gpr_timespec deadline, } timeout = gpr_time_sub(deadline, now); return gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1, GPR_TIMESPAN))); + timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); } void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { From c3e577805d7538d1d5e1f2610387744651b0507f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 29 Sep 2015 08:37:32 -0700 Subject: [PATCH 3/9] Properly initialize variables, destroy variables in usage order --- src/core/iomgr/pollset_windows.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index cb0aad0e33c..acdef9796ce 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -137,6 +137,8 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int added_worker = 0; worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = + worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = + worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL; gpr_cv_init(&worker->cv); if (grpc_alarm_check(exec_ctx, now, &deadline)) { @@ -182,11 +184,11 @@ done: grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&pollset->mu); } - gpr_cv_destroy(&worker->cv); if (added_worker) { remove_worker(worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); remove_worker(worker, GRPC_POLLSET_WORKER_LINK_POLLSET); } + gpr_cv_destroy(&worker->cv); } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { From 3ccd9613dab3c4de1e679ef2fc8b01ac24d4fbc0 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 29 Sep 2015 11:17:54 -0700 Subject: [PATCH 4/9] Ensure GrpcEnvironment setup for these (special) tests --- .../Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs index a2ee1832724..c6843f10af2 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs @@ -45,17 +45,21 @@ namespace Grpc.Core.Internal.Tests [Test] public void CreateAndDestroy() { + GrpcEnvironment.AddRef(); var cq = CompletionQueueSafeHandle.Create(); cq.Dispose(); + GrpcEnvironment.Release(); } [Test] public void CreateAndShutdown() { + GrpcEnvironment.AddRef(); var cq = CompletionQueueSafeHandle.Create(); cq.Shutdown(); var ev = cq.Next(); cq.Dispose(); + GrpcEnvironment.Release(); Assert.AreEqual(GRPCCompletionType.Shutdown, ev.type); Assert.AreNotEqual(IntPtr.Zero, ev.success); Assert.AreEqual(IntPtr.Zero, ev.tag); From 084c8089a4ca3f9e1a5516751a9562b902926f3b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 29 Sep 2015 11:19:06 -0700 Subject: [PATCH 5/9] Threading fixes - single global mutex (simpler, easier to make correct for now) - properly flag kick state for workers to avoid missing wakeups (if signal is called before wait on the cv) --- src/core/iomgr/pollset_windows.c | 41 +++++++++++++++++--------------- src/core/iomgr/pollset_windows.h | 14 +++++++---- 2 files changed, 32 insertions(+), 23 deletions(-) diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index acdef9796ce..dd5301f5be9 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -43,12 +43,12 @@ #include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset_windows.h" -static gpr_mu g_polling_mu; +gpr_mu grpc_polling_mu; static grpc_pollset_worker *g_active_poller; static grpc_pollset_worker g_global_root_worker; void grpc_pollset_global_init() { - gpr_mu_init(&g_polling_mu); + gpr_mu_init(&grpc_polling_mu); g_active_poller = NULL; g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = @@ -56,7 +56,7 @@ void grpc_pollset_global_init() { } void grpc_pollset_global_shutdown() { - gpr_mu_destroy(&g_polling_mu); + gpr_mu_destroy(&grpc_polling_mu); } static void remove_worker(grpc_pollset_worker *worker, @@ -108,7 +108,6 @@ static void push_front_worker(grpc_pollset_worker *root, void grpc_pollset_init(grpc_pollset *pollset) { memset(pollset, 0, sizeof(*pollset)); - gpr_mu_init(&pollset->mu); pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = &pollset->root_worker; @@ -116,7 +115,7 @@ void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(&grpc_polling_mu); pollset->shutting_down = 1; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!pollset->is_iocp_worker) { @@ -124,11 +123,10 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } else { pollset->on_shutdown = closure; } - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(&grpc_polling_mu); } void grpc_pollset_destroy(grpc_pollset *pollset) { - gpr_mu_destroy(&pollset->mu); } void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -140,29 +138,31 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL; + worker->kicked = 0; + worker->pollset = pollset; gpr_cv_init(&worker->cv); if (grpc_alarm_check(exec_ctx, now, &deadline)) { goto done; } if (!pollset->kicked_without_pollers && !pollset->shutting_down) { - gpr_mu_lock(&g_polling_mu); if (g_active_poller == NULL) { grpc_pollset_worker *next_worker; /* become poller */ pollset->is_iocp_worker = 1; g_active_poller = worker; - gpr_mu_unlock(&g_polling_mu); - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(&grpc_polling_mu); grpc_iocp_work(exec_ctx, deadline); - gpr_mu_lock(&pollset->mu); - gpr_mu_lock(&g_polling_mu); + gpr_mu_lock(&grpc_polling_mu); pollset->is_iocp_worker = 0; g_active_poller = NULL; + /* try to get a worker from this pollsets worker list */ + next_worker = pop_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET); + /* try to get a worker from the global list */ next_worker = pop_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); if (next_worker != NULL) { + next_worker->kicked = 1; gpr_cv_signal(&next_worker->cv); } - gpr_mu_unlock(&g_polling_mu); if (pollset->shutting_down && pollset->on_shutdown != NULL) { grpc_exec_ctx_enqueue(exec_ctx, pollset->on_shutdown, 1); @@ -171,18 +171,21 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, goto done; } push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL, worker); - gpr_mu_unlock(&g_polling_mu); push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET, worker); added_worker = 1; - gpr_cv_wait(&worker->cv, &pollset->mu, deadline); + while (!worker->kicked) { + if (gpr_cv_wait(&worker->cv, &grpc_polling_mu, deadline)) { + break; + } + } } else { pollset->kicked_without_pollers = 0; } done: if (!grpc_closure_list_empty(exec_ctx->closure_list)) { - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(&grpc_polling_mu); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(&grpc_polling_mu); } if (added_worker) { remove_worker(worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); @@ -197,6 +200,7 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { for (specific_worker = p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next; specific_worker != &p->root_worker; specific_worker = specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) { + specific_worker->kicked = 1; gpr_cv_signal(&specific_worker->cv); } p->kicked_without_pollers = 1; @@ -205,12 +209,11 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { } } else { if (p->is_iocp_worker) { - gpr_mu_lock(&g_polling_mu); if (g_active_poller == specific_worker) { grpc_iocp_kick(); } - gpr_mu_unlock(&g_polling_mu); } else { + specific_worker->kicked = 1; gpr_cv_signal(&specific_worker->cv); } } diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index 55f87aca729..65ba80619b7 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -54,20 +54,26 @@ typedef struct grpc_pollset_worker_link { struct grpc_pollset_worker *prev; } grpc_pollset_worker_link; +struct grpc_pollset; +typedef struct grpc_pollset grpc_pollset; + typedef struct grpc_pollset_worker { gpr_cv cv; + int kicked; + struct grpc_pollset *pollset; grpc_pollset_worker_link links[GRPC_POLLSET_WORKER_LINK_TYPES]; } grpc_pollset_worker; -typedef struct grpc_pollset { - gpr_mu mu; +struct grpc_pollset { int shutting_down; int kicked_without_pollers; int is_iocp_worker; grpc_pollset_worker root_worker; grpc_closure *on_shutdown; -} grpc_pollset; +}; + +extern gpr_mu grpc_polling_mu; -#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) +#define GRPC_POLLSET_MU(pollset) (&grpc_polling_mu) #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ From 4086474399be1811731fac8bb6755de730ddbc3f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 30 Sep 2015 08:35:31 -0700 Subject: [PATCH 6/9] Make grpc_exec_ctx_flush return a status indicating if work was performed --- src/core/iomgr/exec_ctx.c | 5 ++++- src/core/iomgr/exec_ctx.h | 5 +++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c index a830a27b0ba..f2914d376ef 100644 --- a/src/core/iomgr/exec_ctx.c +++ b/src/core/iomgr/exec_ctx.c @@ -35,16 +35,19 @@ #include -void grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { +int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { + int did_something = 0; while (!grpc_closure_list_empty(exec_ctx->closure_list)) { grpc_closure *c = exec_ctx->closure_list.head; exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; while (c != NULL) { grpc_closure *next = c->next; + did_something = 1; c->cb(exec_ctx, c->cb_arg, c->success); c = next; } } + return did_something; } void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h index f99aa038c59..aa0610cbeaf 100644 --- a/src/core/iomgr/exec_ctx.h +++ b/src/core/iomgr/exec_ctx.h @@ -61,8 +61,9 @@ struct grpc_exec_ctx { { GRPC_CLOSURE_LIST_INIT } /** Flush any work that has been enqueued onto this grpc_exec_ctx. - * Caller must guarantee that no interfering locks are held. */ -void grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); + * Caller must guarantee that no interfering locks are held. + * Returns 1 if work was performed, 0 otherwise. */ +int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); /** Finish any pending work for a grpc_exec_ctx. Must be called before * the instance is destroyed, or work may be lost. */ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); From 01be53d1a11c966baf1b1ce66baa60bf763bfc8b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 30 Sep 2015 08:36:03 -0700 Subject: [PATCH 7/9] Add a facility to flush iocp at iomgr shutdown --- src/core/iomgr/iocp_windows.c | 12 +++++++++--- src/core/iomgr/iocp_windows.h | 1 + src/core/iomgr/iomgr.c | 2 ++ src/core/iomgr/iomgr_internal.h | 3 +++ src/core/iomgr/iomgr_posix.c | 3 +++ src/core/iomgr/iomgr_windows.c | 4 ++++ 6 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 349440fa88c..791f2e39c84 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -119,9 +119,7 @@ void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { info->has_pending_iocp = 1; } gpr_mu_unlock(&socket->state_mu); - if (closure) { - closure->cb(exec_ctx, closure->cb_arg, 1); - } + grpc_exec_ctx_enqueue(exec_ctx, closure, 1); } void grpc_iocp_init(void) { @@ -139,6 +137,14 @@ void grpc_iocp_kick(void) { GPR_ASSERT(success); } +void grpc_iocp_flush(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + do { + grpc_iocp_work(&exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + } while (grpc_exec_ctx_flush(&exec_ctx)); +} + void grpc_iocp_shutdown(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (gpr_atm_acq_load(&g_custom_events)) { diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h index 7e330e7ce2f..75f3ba84770 100644 --- a/src/core/iomgr/iocp_windows.h +++ b/src/core/iomgr/iocp_windows.h @@ -41,6 +41,7 @@ void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline); void grpc_iocp_init(void); void grpc_iocp_kick(void); +void grpc_iocp_flush(void); void grpc_iocp_shutdown(void); void grpc_iocp_add_socket(grpc_winsocket *); diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 0c067e51877..a10399311fc 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -91,6 +91,8 @@ void grpc_iomgr_shutdown(void) { gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_iomgr_platform_flush(); + gpr_mu_lock(&g_mu); g_shutdown = 1; while (g_root_object.next != &g_root_object) { diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h index 1a0724b431c..e372c18e8a0 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_internal.h @@ -50,6 +50,9 @@ void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name); void grpc_iomgr_unregister_object(grpc_iomgr_object *obj); void grpc_iomgr_platform_init(void); +/** flush any globally queued work from iomgr */ +void grpc_iomgr_platform_flush(void); +/** tear down all platform specific global iomgr structures */ void grpc_iomgr_platform_shutdown(void); #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/iomgr_posix.c index db93d0a7561..f6474b7e6d4 100644 --- a/src/core/iomgr/iomgr_posix.c +++ b/src/core/iomgr/iomgr_posix.c @@ -45,6 +45,9 @@ void grpc_iomgr_platform_init(void) { grpc_register_tracer("tcp", &grpc_tcp_trace); } +void grpc_iomgr_platform_flush(void) { +} + void grpc_iomgr_platform_shutdown(void) { grpc_fd_global_shutdown(); } diff --git a/src/core/iomgr/iomgr_windows.c b/src/core/iomgr/iomgr_windows.c index b49cb87e97b..93bdc5ec16e 100644 --- a/src/core/iomgr/iomgr_windows.c +++ b/src/core/iomgr/iomgr_windows.c @@ -63,6 +63,10 @@ void grpc_iomgr_platform_init(void) { grpc_iocp_init(); } +void grpc_iomgr_platform_flush(void) { + grpc_iocp_flush(); +} + void grpc_iomgr_platform_shutdown(void) { grpc_iocp_shutdown(); winsock_shutdown(); From 0b6312e970a425819a68843e93f72c4d06e31543 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 30 Sep 2015 08:36:27 -0700 Subject: [PATCH 8/9] Flush iocp related work immediately --- src/core/iomgr/pollset_windows.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index dd5301f5be9..798b6376358 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -152,6 +152,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, g_active_poller = worker; gpr_mu_unlock(&grpc_polling_mu); grpc_iocp_work(exec_ctx, deadline); + grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&grpc_polling_mu); pollset->is_iocp_worker = 0; g_active_poller = NULL; From 1433791d87ec0c0b50163d8474e49a08d189287c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 30 Sep 2015 08:41:51 -0700 Subject: [PATCH 9/9] Don't wait forever for iocp to shutdown --- src/core/iomgr/iocp_windows.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 791f2e39c84..cf33d74366f 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -141,7 +141,7 @@ void grpc_iocp_flush(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; do { - grpc_iocp_work(&exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_iocp_work(&exec_ctx, gpr_inf_past(GPR_CLOCK_MONOTONIC)); } while (grpc_exec_ctx_flush(&exec_ctx)); }