From 4bc3463108a7675b24d6f826acf6b2f472a615f5 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 7 Oct 2015 16:12:35 -0700 Subject: [PATCH 1/8] Introducing grpc_executor, for all your threading needs --- BUILD | 6 + Makefile | 2 + build.yaml | 2 + gRPC.podspec | 3 + grpc.gyp | 2 + src/core/iomgr/closure.c | 10 ++ src/core/iomgr/closure.h | 9 ++ src/core/iomgr/executor.c | 149 ++++++++++++++++++ src/core/iomgr/executor.h | 49 ++++++ src/core/iomgr/resolve_address_posix.c | 22 +-- src/core/surface/init.c | 3 + test/core/iomgr/resolve_address_test.c | 6 +- tools/doxygen/Doxyfile.core.internal | 2 + tools/run_tests/sources_and_headers.json | 6 + vsprojects/vcxproj/grpc/grpc.vcxproj | 3 + vsprojects/vcxproj/grpc/grpc.vcxproj.filters | 6 + .../grpc_unsecure/grpc_unsecure.vcxproj | 3 + .../grpc_unsecure.vcxproj.filters | 6 + 18 files changed, 272 insertions(+), 17 deletions(-) create mode 100644 src/core/iomgr/executor.c create mode 100644 src/core/iomgr/executor.h diff --git a/BUILD b/BUILD index 3e2a45b8a0a..2b21b19aba4 100644 --- a/BUILD +++ b/BUILD @@ -184,6 +184,7 @@ cc_library( "src/core/iomgr/endpoint.h", "src/core/iomgr/endpoint_pair.h", "src/core/iomgr/exec_ctx.h", + "src/core/iomgr/executor.h", "src/core/iomgr/fd_posix.h", "src/core/iomgr/iocp_windows.h", "src/core/iomgr/iomgr.h", @@ -319,6 +320,7 @@ cc_library( "src/core/iomgr/endpoint_pair_posix.c", "src/core/iomgr/endpoint_pair_windows.c", "src/core/iomgr/exec_ctx.c", + "src/core/iomgr/executor.c", "src/core/iomgr/fd_posix.c", "src/core/iomgr/iocp_windows.c", "src/core/iomgr/iomgr.c", @@ -470,6 +472,7 @@ cc_library( "src/core/iomgr/endpoint.h", "src/core/iomgr/endpoint_pair.h", "src/core/iomgr/exec_ctx.h", + "src/core/iomgr/executor.h", "src/core/iomgr/fd_posix.h", "src/core/iomgr/iocp_windows.h", "src/core/iomgr/iomgr.h", @@ -585,6 +588,7 @@ cc_library( "src/core/iomgr/endpoint_pair_posix.c", "src/core/iomgr/endpoint_pair_windows.c", "src/core/iomgr/exec_ctx.c", + "src/core/iomgr/executor.c", "src/core/iomgr/fd_posix.c", "src/core/iomgr/iocp_windows.c", "src/core/iomgr/iomgr.c", @@ -1109,6 +1113,7 @@ objc_library( "src/core/iomgr/endpoint_pair_posix.c", "src/core/iomgr/endpoint_pair_windows.c", "src/core/iomgr/exec_ctx.c", + "src/core/iomgr/executor.c", "src/core/iomgr/fd_posix.c", "src/core/iomgr/iocp_windows.c", "src/core/iomgr/iomgr.c", @@ -1257,6 +1262,7 @@ objc_library( "src/core/iomgr/endpoint.h", "src/core/iomgr/endpoint_pair.h", "src/core/iomgr/exec_ctx.h", + "src/core/iomgr/executor.h", "src/core/iomgr/fd_posix.h", "src/core/iomgr/iocp_windows.h", "src/core/iomgr/iomgr.h", diff --git a/Makefile b/Makefile index 8eb94f768a2..0fa2863202b 100644 --- a/Makefile +++ b/Makefile @@ -4081,6 +4081,7 @@ LIBGRPC_SRC = \ src/core/iomgr/endpoint_pair_posix.c \ src/core/iomgr/endpoint_pair_windows.c \ src/core/iomgr/exec_ctx.c \ + src/core/iomgr/executor.c \ src/core/iomgr/fd_posix.c \ src/core/iomgr/iocp_windows.c \ src/core/iomgr/iomgr.c \ @@ -4363,6 +4364,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/iomgr/endpoint_pair_posix.c \ src/core/iomgr/endpoint_pair_windows.c \ src/core/iomgr/exec_ctx.c \ + src/core/iomgr/executor.c \ src/core/iomgr/fd_posix.c \ src/core/iomgr/iocp_windows.c \ src/core/iomgr/iomgr.c \ diff --git a/build.yaml b/build.yaml index 98fb0348ca3..6879237ce89 100644 --- a/build.yaml +++ b/build.yaml @@ -144,6 +144,7 @@ filegroups: - src/core/iomgr/endpoint.h - src/core/iomgr/endpoint_pair.h - src/core/iomgr/exec_ctx.h + - src/core/iomgr/executor.h - src/core/iomgr/fd_posix.h - src/core/iomgr/iocp_windows.h - src/core/iomgr/iomgr.h @@ -256,6 +257,7 @@ filegroups: - src/core/iomgr/endpoint_pair_posix.c - src/core/iomgr/endpoint_pair_windows.c - src/core/iomgr/exec_ctx.c + - src/core/iomgr/executor.c - src/core/iomgr/fd_posix.c - src/core/iomgr/iocp_windows.c - src/core/iomgr/iomgr.c diff --git a/gRPC.podspec b/gRPC.podspec index 717e7005ee5..2c035620f6c 100644 --- a/gRPC.podspec +++ b/gRPC.podspec @@ -188,6 +188,7 @@ Pod::Spec.new do |s| 'src/core/iomgr/endpoint.h', 'src/core/iomgr/endpoint_pair.h', 'src/core/iomgr/exec_ctx.h', + 'src/core/iomgr/executor.h', 'src/core/iomgr/fd_posix.h', 'src/core/iomgr/iocp_windows.h', 'src/core/iomgr/iomgr.h', @@ -330,6 +331,7 @@ Pod::Spec.new do |s| 'src/core/iomgr/endpoint_pair_posix.c', 'src/core/iomgr/endpoint_pair_windows.c', 'src/core/iomgr/exec_ctx.c', + 'src/core/iomgr/executor.c', 'src/core/iomgr/fd_posix.c', 'src/core/iomgr/iocp_windows.c', 'src/core/iomgr/iomgr.c', @@ -479,6 +481,7 @@ Pod::Spec.new do |s| 'src/core/iomgr/endpoint.h', 'src/core/iomgr/endpoint_pair.h', 'src/core/iomgr/exec_ctx.h', + 'src/core/iomgr/executor.h', 'src/core/iomgr/fd_posix.h', 'src/core/iomgr/iocp_windows.h', 'src/core/iomgr/iomgr.h', diff --git a/grpc.gyp b/grpc.gyp index 2225ca4e045..fb83f911ae6 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -210,6 +210,7 @@ 'src/core/iomgr/endpoint_pair_posix.c', 'src/core/iomgr/endpoint_pair_windows.c', 'src/core/iomgr/exec_ctx.c', + 'src/core/iomgr/executor.c', 'src/core/iomgr/fd_posix.c', 'src/core/iomgr/iocp_windows.c', 'src/core/iomgr/iomgr.c', @@ -396,6 +397,7 @@ 'src/core/iomgr/endpoint_pair_posix.c', 'src/core/iomgr/endpoint_pair_windows.c', 'src/core/iomgr/exec_ctx.c', + 'src/core/iomgr/executor.c', 'src/core/iomgr/fd_posix.c', 'src/core/iomgr/iocp_windows.c', 'src/core/iomgr/iomgr.c', diff --git a/src/core/iomgr/closure.c b/src/core/iomgr/closure.c index 32654257892..1e3cad00906 100644 --- a/src/core/iomgr/closure.c +++ b/src/core/iomgr/closure.c @@ -69,3 +69,13 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) { } src->head = src->tail = NULL; } + +grpc_closure *grpc_closure_list_pop(grpc_closure_list *list) { + grpc_closure *head; + if (list->head == NULL) { + return NULL; + } + head = list->head; + list->head = list->head->next; + return head; +} diff --git a/src/core/iomgr/closure.h b/src/core/iomgr/closure.h index 982ffa4e1be..709ef1afe93 100644 --- a/src/core/iomgr/closure.h +++ b/src/core/iomgr/closure.h @@ -80,9 +80,18 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, #define GRPC_CLOSURE_LIST_INIT \ { NULL, NULL } +/** add \a closure to the end of \a list and set \a closure's success to \a + * success */ void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure, int success); + +/** append all closures from \a src to \a dst and empty \a src. */ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst); + +/** pop (return and remove) the head closure from \a list. */ +grpc_closure *grpc_closure_list_pop(grpc_closure_list *list); + +/** return whether \a list is empty. */ int grpc_closure_list_empty(grpc_closure_list list); #endif /* GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H */ diff --git a/src/core/iomgr/executor.c b/src/core/iomgr/executor.c new file mode 100644 index 00000000000..d80192e5c09 --- /dev/null +++ b/src/core/iomgr/executor.c @@ -0,0 +1,149 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/executor.h" + +#include + +#include +#include +#include +#include +#include "src/core/iomgr/exec_ctx.h" + +typedef struct grpc_executor_data { + int busy; /**< is the thread currently running? */ + int shutting_down; /**< has \a grpc_shutdown() been invoked? */ + int pending_join; /**< has the thread finished but not been joined? */ + grpc_closure_list closures; /**< collection of pending work */ + gpr_thd_id tid; /**< thread id of the thread, only valid if \a busy or \a + pending_join are true */ + gpr_thd_options options; + gpr_mu mu; +} grpc_executor; + +static grpc_executor g_executor; + +void grpc_executor_init() { + memset(&g_executor, 0, sizeof(grpc_executor)); + gpr_mu_init(&g_executor.mu); + g_executor.options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&g_executor.options); +} + +/* thread body */ +static void closure_exec_thread_func(void *ignored) { + grpc_closure *closure; + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + while (1) { + gpr_mu_lock(&g_executor.mu); + if (g_executor.shutting_down != 0) { + gpr_mu_unlock(&g_executor.mu); + break; + } + closure = grpc_closure_list_pop(&g_executor.closures); + if (closure == NULL) { + /* no more work, time to die */ + GPR_ASSERT(g_executor.busy == 1); + g_executor.busy = 0; + gpr_mu_unlock(&g_executor.mu); + break; + } + gpr_mu_unlock(&g_executor.mu); + closure->cb(&exec_ctx, closure->cb_arg, closure->success); + } + grpc_exec_ctx_finish(&exec_ctx); +} + +/* Spawn the thread if new work has arrived a no thread is up */ +static void maybe_spawn_locked() { + if (grpc_closure_list_empty(g_executor.closures) == 1) { + return; + } + if (g_executor.shutting_down == 1) { + return; + } + + if (g_executor.busy != 0) { + /* Thread still working. New work will be picked up by already running + * thread. Not spawning anything. */ + return; + } else if (g_executor.pending_join != 0) { + /* Pickup the remains of the previous incarnations of the thread. */ + gpr_thd_join(g_executor.tid); + g_executor.pending_join = 0; + } + + /* All previous instances of the thread should have been joined at this point. + * Spawn time! */ + g_executor.busy = 1; + gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL, + &g_executor.options); + g_executor.pending_join = 1; +} + +void grpc_executor_enqueue(grpc_closure *closure, int success) { + gpr_mu_lock(&g_executor.mu); + if (g_executor.shutting_down == 0) { + grpc_closure_list_add(&g_executor.closures, closure, success); + maybe_spawn_locked(); + } + gpr_mu_unlock(&g_executor.mu); +} + +void grpc_executor_shutdown() { + gpr_thd_id tid; + int pending_join; + grpc_closure *closure; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + gpr_mu_lock(&g_executor.mu); + pending_join = g_executor.pending_join; + tid = g_executor.tid; + g_executor.shutting_down = 1; + gpr_mu_unlock(&g_executor.mu); + /* we can release the lock at this point despite the access to the closure + * list below because we aren't accepting new work */ + + /* Execute pending callbacks, some may be performing cleanups */ + while ((closure = grpc_closure_list_pop(&g_executor.closures)) != NULL) { + closure->cb(&exec_ctx, closure->cb_arg, closure->success); + } + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(grpc_closure_list_empty(g_executor.closures)); + if (pending_join) { + gpr_thd_join(g_executor.tid); + } + gpr_mu_destroy(&g_executor.mu); +} diff --git a/src/core/iomgr/executor.h b/src/core/iomgr/executor.h new file mode 100644 index 00000000000..89da479fa11 --- /dev/null +++ b/src/core/iomgr/executor.h @@ -0,0 +1,49 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H +#define GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H + +#include "src/core/iomgr/closure.h" + +/** initialize the global executor */ +void grpc_executor_init(); + +/** enqueue \a closure for its eventual execution of \a f(arg) on a separate + * thread */ +void grpc_executor_enqueue(grpc_closure *closure, int success); + +/** shutdown the executor, running all pending work as part of the call */ +void grpc_executor_shutdown(); + +#endif /* GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H */ diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index ed0a93fcc90..555c74ce7e3 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -41,6 +41,7 @@ #include #include +#include "src/core/iomgr/executor.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/support/block_annotate.h" @@ -57,8 +58,8 @@ typedef struct { char *name; char *default_port; grpc_resolve_cb cb; + grpc_closure request_closure; void *arg; - grpc_iomgr_object iomgr_object; } request; grpc_resolved_addresses *grpc_blocking_resolve_address( @@ -149,20 +150,18 @@ done: return addrs; } -/* Thread function to asynch-ify grpc_blocking_resolve_address */ -static void do_request_thread(void *rp) { +/* Callback to be passed to grpc_executor to asynch-ify + * grpc_blocking_resolve_address */ +static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, int success) { request *r = rp; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_addresses *resolved = grpc_blocking_resolve_address(r->name, r->default_port); void *arg = r->arg; grpc_resolve_cb cb = r->cb; gpr_free(r->name); gpr_free(r->default_port); - cb(&exec_ctx, arg, resolved); - grpc_iomgr_unregister_object(&r->iomgr_object); + cb(exec_ctx, arg, resolved); gpr_free(r); - grpc_exec_ctx_finish(&exec_ctx); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { @@ -173,17 +172,12 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { void grpc_resolve_address(const char *name, const char *default_port, grpc_resolve_cb cb, void *arg) { request *r = gpr_malloc(sizeof(request)); - gpr_thd_id id; - char *tmp; - gpr_asprintf(&tmp, "resolve_address:name='%s':default_port='%s'", name, - default_port); - grpc_iomgr_register_object(&r->iomgr_object, tmp); - gpr_free(tmp); + grpc_closure_init(&r->request_closure, do_request_thread, r); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->cb = cb; r->arg = arg; - gpr_thd_new(&id, do_request_thread, r, NULL); + grpc_executor_enqueue(&r->request_closure, 1); } #endif diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 95011cab178..a1687e73f37 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -47,6 +47,7 @@ #include "src/core/client_config/resolvers/dns_resolver.h" #include "src/core/client_config/resolvers/sockaddr_resolver.h" #include "src/core/debug/trace.h" +#include "src/core/iomgr/executor.h" #include "src/core/iomgr/iomgr.h" #include "src/core/profiling/timers.h" #include "src/core/surface/api_trace.h" @@ -108,6 +109,7 @@ void grpc_init(void) { grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace); grpc_security_pre_init(); grpc_iomgr_init(); + grpc_executor_init(); grpc_tracer_init("GRPC_TRACE"); /* Only initialize census if noone else has. */ if (census_enabled() == CENSUS_FEATURE_NONE) { @@ -132,6 +134,7 @@ void grpc_shutdown(void) { gpr_mu_lock(&g_init_mu); if (--g_initializations == 0) { grpc_iomgr_shutdown(); + grpc_executor_shutdown(); census_shutdown(); grpc_timers_global_destroy(); grpc_tracer_shutdown(); diff --git a/test/core/iomgr/resolve_address_test.c b/test/core/iomgr/resolve_address_test.c index 1c95a9960e0..a516a2595d1 100644 --- a/test/core/iomgr/resolve_address_test.c +++ b/test/core/iomgr/resolve_address_test.c @@ -32,7 +32,7 @@ */ #include "src/core/iomgr/resolve_address.h" -#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/executor.h" #include #include #include @@ -125,7 +125,7 @@ static void test_unparseable_hostports(void) { int main(int argc, char **argv) { grpc_test_init(argc, argv); - grpc_iomgr_init(); + grpc_executor_init(); test_localhost(); test_default_port(); test_missing_default_port(); @@ -133,6 +133,6 @@ int main(int argc, char **argv) { test_ipv6_without_port(); test_invalid_ip_addresses(); test_unparseable_hostports(); - grpc_iomgr_shutdown(); + grpc_executor_shutdown(); return 0; } diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 5658a102d75..b782978a2a6 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -819,6 +819,7 @@ src/core/iomgr/closure.h \ src/core/iomgr/endpoint.h \ src/core/iomgr/endpoint_pair.h \ src/core/iomgr/exec_ctx.h \ +src/core/iomgr/executor.h \ src/core/iomgr/fd_posix.h \ src/core/iomgr/iocp_windows.h \ src/core/iomgr/iomgr.h \ @@ -954,6 +955,7 @@ src/core/iomgr/endpoint.c \ src/core/iomgr/endpoint_pair_posix.c \ src/core/iomgr/endpoint_pair_windows.c \ src/core/iomgr/exec_ctx.c \ +src/core/iomgr/executor.c \ src/core/iomgr/fd_posix.c \ src/core/iomgr/iocp_windows.c \ src/core/iomgr/iomgr.c \ diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 1ceff15a3be..af6a81bfd5c 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -12317,6 +12317,7 @@ "src/core/iomgr/endpoint.h", "src/core/iomgr/endpoint_pair.h", "src/core/iomgr/exec_ctx.h", + "src/core/iomgr/executor.h", "src/core/iomgr/fd_posix.h", "src/core/iomgr/iocp_windows.h", "src/core/iomgr/iomgr.h", @@ -12499,6 +12500,8 @@ "src/core/iomgr/endpoint_pair_windows.c", "src/core/iomgr/exec_ctx.c", "src/core/iomgr/exec_ctx.h", + "src/core/iomgr/executor.c", + "src/core/iomgr/executor.h", "src/core/iomgr/fd_posix.c", "src/core/iomgr/fd_posix.h", "src/core/iomgr/iocp_windows.c", @@ -12822,6 +12825,7 @@ "src/core/iomgr/endpoint.h", "src/core/iomgr/endpoint_pair.h", "src/core/iomgr/exec_ctx.h", + "src/core/iomgr/executor.h", "src/core/iomgr/fd_posix.h", "src/core/iomgr/iocp_windows.h", "src/core/iomgr/iomgr.h", @@ -12989,6 +12993,8 @@ "src/core/iomgr/endpoint_pair_windows.c", "src/core/iomgr/exec_ctx.c", "src/core/iomgr/exec_ctx.h", + "src/core/iomgr/executor.c", + "src/core/iomgr/executor.h", "src/core/iomgr/fd_posix.c", "src/core/iomgr/fd_posix.h", "src/core/iomgr/iocp_windows.c", diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index 183edbc05bf..8850bf469d1 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -305,6 +305,7 @@ + @@ -503,6 +504,8 @@ + + diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index 66ce9ca05be..3e8ca92b988 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -184,6 +184,9 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr @@ -626,6 +629,9 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index b527179f9f3..4526c2e332c 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -284,6 +284,7 @@ + @@ -442,6 +443,8 @@ + + diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 7be3c9ec93b..8ba784e9338 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -124,6 +124,9 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr @@ -524,6 +527,9 @@ src\core\iomgr + + src\core\iomgr + src\core\iomgr From 7137a84be8def54b819246c2c6907812b66fc749 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 7 Oct 2015 16:25:45 -0700 Subject: [PATCH 2/8] Moar comments --- src/core/iomgr/executor.h | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/core/iomgr/executor.h b/src/core/iomgr/executor.h index 89da479fa11..6da446ae9c6 100644 --- a/src/core/iomgr/executor.h +++ b/src/core/iomgr/executor.h @@ -36,14 +36,18 @@ #include "src/core/iomgr/closure.h" -/** initialize the global executor */ +/** Initialize the global executor. + * + * This mechanism is meant to outsource work (grpc_closure instances) to a + * thread, for those cases where blocking isn't an option but there isn't a + * non-blocking solution available. */ void grpc_executor_init(); -/** enqueue \a closure for its eventual execution of \a f(arg) on a separate +/** Enqueue \a closure for its eventual execution of \a f(arg) on a separate * thread */ void grpc_executor_enqueue(grpc_closure *closure, int success); -/** shutdown the executor, running all pending work as part of the call */ +/** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(); #endif /* GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H */ From 7d8ea7898025f852cb837c120eab6df7796ee7d2 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 7 Oct 2015 19:10:35 -0700 Subject: [PATCH 3/8] fixed outdated grpc.gyp --- grpc.gyp | 1 + 1 file changed, 1 insertion(+) diff --git a/grpc.gyp b/grpc.gyp index 08185ac592e..bb8a7423ef4 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -202,6 +202,7 @@ 'src/core/iomgr/endpoint_pair_posix.c', 'src/core/iomgr/endpoint_pair_windows.c', 'src/core/iomgr/exec_ctx.c', + 'src/core/iomgr/executor.c', 'src/core/iomgr/fd_posix.c', 'src/core/iomgr/iocp_windows.c', 'src/core/iomgr/iomgr.c', From 249e9bf1164104e543d22c60dcb193d7930bd22e Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Mon, 12 Oct 2015 21:17:47 -0700 Subject: [PATCH 4/8] Flush exec_ctx work within loop --- src/core/iomgr/executor.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/iomgr/executor.c b/src/core/iomgr/executor.c index d80192e5c09..94e5114dd59 100644 --- a/src/core/iomgr/executor.c +++ b/src/core/iomgr/executor.c @@ -82,6 +82,7 @@ static void closure_exec_thread_func(void *ignored) { } gpr_mu_unlock(&g_executor.mu); closure->cb(&exec_ctx, closure->cb_arg, closure->success); + grpc_exec_ctx_flush(&exec_ctx); } grpc_exec_ctx_finish(&exec_ctx); } From 63e72a7008196b43564c8b1ff2c042d590e41e3f Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Mon, 12 Oct 2015 22:07:32 -0700 Subject: [PATCH 5/8] regenerated projects --- binding.gyp | 1 + 1 file changed, 1 insertion(+) diff --git a/binding.gyp b/binding.gyp index 392835c7721..384c4449a36 100644 --- a/binding.gyp +++ b/binding.gyp @@ -172,6 +172,7 @@ 'src/core/iomgr/endpoint_pair_posix.c', 'src/core/iomgr/endpoint_pair_windows.c', 'src/core/iomgr/exec_ctx.c', + 'src/core/iomgr/executor.c', 'src/core/iomgr/fd_posix.c', 'src/core/iomgr/iocp_windows.c', 'src/core/iomgr/iomgr.c', From 9e76e7aa3a0d6a8fe95b159177c839f32f3acabb Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 13 Oct 2015 07:52:54 -0700 Subject: [PATCH 6/8] removed unused local var --- src/core/iomgr/executor.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/core/iomgr/executor.c b/src/core/iomgr/executor.c index 94e5114dd59..457e5cdbac6 100644 --- a/src/core/iomgr/executor.c +++ b/src/core/iomgr/executor.c @@ -124,14 +124,12 @@ void grpc_executor_enqueue(grpc_closure *closure, int success) { } void grpc_executor_shutdown() { - gpr_thd_id tid; int pending_join; grpc_closure *closure; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_lock(&g_executor.mu); pending_join = g_executor.pending_join; - tid = g_executor.tid; g_executor.shutting_down = 1; gpr_mu_unlock(&g_executor.mu); /* we can release the lock at this point despite the access to the closure From 706b70f4986df8b69a680d1feb8c6bb4d585bd03 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 13 Oct 2015 11:19:49 -0700 Subject: [PATCH 7/8] Updated Windows resolver --- src/core/iomgr/resolve_address_windows.c | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c index 82a56029967..6dc5de1660a 100644 --- a/src/core/iomgr/resolve_address_windows.c +++ b/src/core/iomgr/resolve_address_windows.c @@ -40,6 +40,7 @@ #include #include +#include "src/core/iomgr/executor.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/support/block_annotate.h" @@ -55,8 +56,8 @@ typedef struct { char *name; char *default_port; grpc_resolve_cb cb; + grpc_closure request_closure; void *arg; - grpc_iomgr_object iomgr_object; } request; grpc_resolved_addresses *grpc_blocking_resolve_address( @@ -129,9 +130,9 @@ done: return addrs; } -/* Thread function to asynch-ify grpc_blocking_resolve_address */ -static void do_request(void *rp) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; +/* Callback to be passed to grpc_executor to asynch-ify + * grpc_blocking_resolve_address */ +static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, int success) { request *r = rp; grpc_resolved_addresses *resolved = grpc_blocking_resolve_address(r->name, r->default_port); @@ -139,10 +140,8 @@ static void do_request(void *rp) { grpc_resolve_cb cb = r->cb; gpr_free(r->name); gpr_free(r->default_port); - grpc_iomgr_unregister_object(&r->iomgr_object); + cb(exec_ctx, arg, resolved); gpr_free(r); - cb(&exec_ctx, arg, resolved); - grpc_exec_ctx_finish(&exec_ctx); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { @@ -153,16 +152,12 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { void grpc_resolve_address(const char *name, const char *default_port, grpc_resolve_cb cb, void *arg) { request *r = gpr_malloc(sizeof(request)); - gpr_thd_id id; - char *label; - gpr_asprintf(&label, "resolve:%s", name); - grpc_iomgr_register_object(&r->iomgr_object, label); - gpr_free(label); + grpc_closure_init(&r->request_closure, do_request_thread, r); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->cb = cb; r->arg = arg; - gpr_thd_new(&id, do_request, r, NULL); + grpc_executor_enqueue(&r->request_closure, 1); } #endif From 661ad7fc85b0603c9e554b82851c102b84375074 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 13 Oct 2015 15:51:31 -0700 Subject: [PATCH 8/8] We need the iomgr_init for winsocks initialization. Also fixed (thanks @nicolasnoble) wrong construction of error msg. --- src/core/iomgr/resolve_address_windows.c | 5 ++++- test/core/iomgr/resolve_address_test.c | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c index 6dc5de1660a..007c855d109 100644 --- a/src/core/iomgr/resolve_address_windows.c +++ b/src/core/iomgr/resolve_address_windows.c @@ -48,6 +48,7 @@ #include #include #include +#include #include #include #include @@ -94,7 +95,9 @@ grpc_resolved_addresses *grpc_blocking_resolve_address( s = getaddrinfo(host, port, &hints, &result); GRPC_SCHEDULING_END_BLOCKING_REGION; if (s != 0) { - gpr_log(GPR_ERROR, "getaddrinfo: %s", gai_strerror(s)); + char *error_message = gpr_format_message(s); + gpr_log(GPR_ERROR, "getaddrinfo: %s", error_message); + gpr_free(error_message); goto done; } diff --git a/test/core/iomgr/resolve_address_test.c b/test/core/iomgr/resolve_address_test.c index a516a2595d1..56ce091a887 100644 --- a/test/core/iomgr/resolve_address_test.c +++ b/test/core/iomgr/resolve_address_test.c @@ -126,6 +126,7 @@ static void test_unparseable_hostports(void) { int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_executor_init(); + grpc_iomgr_init(); test_localhost(); test_default_port(); test_missing_default_port(); @@ -133,6 +134,7 @@ int main(int argc, char **argv) { test_ipv6_without_port(); test_invalid_ip_addresses(); test_unparseable_hostports(); + grpc_iomgr_shutdown(); grpc_executor_shutdown(); return 0; }