diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 06186403e53..575ee02249b 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -104,7 +104,7 @@ void pf_destroy(grpc_lb_policy *pol) { grpc_connectivity_state_destroy(&p->state_tracker); gpr_free(p->subchannels); gpr_mu_destroy(&p->mu); - grpc_workqueue_unref(p->workqueue); + GRPC_WORKQUEUE_UNREF(p->workqueue, "pick_first"); gpr_free(p); } @@ -331,7 +331,7 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); p->num_subchannels = args->num_subchannels; p->workqueue = args->workqueue; - grpc_workqueue_ref(p->workqueue); + GRPC_WORKQUEUE_REF(p->workqueue, "pick_first"); grpc_connectivity_state_init(&p->state_tracker, args->workqueue, GRPC_CHANNEL_IDLE, "pick_first"); memcpy(p->subchannels, args->subchannels, diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index cbc96ca9923..66a8c9f99df 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -197,7 +197,7 @@ static void dns_destroy(grpc_resolver *gr) { grpc_client_config_unref(r->resolved_config); } grpc_subchannel_factory_unref(r->subchannel_factory); - grpc_workqueue_unref(r->workqueue); + GRPC_WORKQUEUE_UNREF(r->workqueue, "dns"); gpr_free(r->name); gpr_free(r->default_port); gpr_free(r->lb_policy_name); @@ -227,7 +227,7 @@ static grpc_resolver *dns_create(grpc_resolver_args *args, r->subchannel_factory = args->subchannel_factory; grpc_subchannel_factory_ref(r->subchannel_factory); r->workqueue = args->workqueue; - grpc_workqueue_ref(r->workqueue); + GRPC_WORKQUEUE_REF(r->workqueue, "dns"); r->lb_policy_name = gpr_strdup(lb_policy_name); return &r->base; } diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index cb3b6070f27..abfb7b8569c 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -157,7 +157,7 @@ static void sockaddr_destroy(grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); grpc_subchannel_factory_unref(r->subchannel_factory); - grpc_workqueue_unref(r->workqueue); + GRPC_WORKQUEUE_UNREF(r->workqueue, "sockaddr"); gpr_free(r->addrs); gpr_free(r->addrs_len); gpr_free(r->lb_policy_name); @@ -331,7 +331,7 @@ static grpc_resolver *sockaddr_create( r->subchannel_factory = args->subchannel_factory; grpc_subchannel_factory_ref(r->subchannel_factory); r->workqueue = args->workqueue; - grpc_workqueue_ref(r->workqueue); + GRPC_WORKQUEUE_REF(r->workqueue, "sockaddr"); r->lb_policy_name = gpr_strdup(lb_policy_name); return &r->base; diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 99479b79c51..2047d2fee75 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -260,7 +260,7 @@ static void subchannel_destroy(grpc_subchannel *c) { grpc_mdctx_unref(c->mdctx); grpc_connectivity_state_destroy(&c->state_tracker); grpc_connector_unref(c->connector); - grpc_workqueue_unref(c->workqueue); + GRPC_WORKQUEUE_UNREF(c->workqueue, "subchannel"); gpr_free(c); } @@ -298,7 +298,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, c->mdctx = args->mdctx; c->master = args->master; c->workqueue = grpc_channel_get_workqueue(c->master); - grpc_workqueue_ref(c->workqueue); + GRPC_WORKQUEUE_REF(c->workqueue, "subchannel"); c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem); c->random = random_seed(); grpc_mdctx_ref(c->mdctx); diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 4bfe3cf9738..75f3f7db4b1 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -106,7 +106,7 @@ static void finish(internal_request *req, int success) { grpc_iomgr_unregister_object(&req->iomgr_obj); gpr_slice_buffer_destroy(&req->incoming); gpr_slice_buffer_destroy(&req->outgoing); - grpc_workqueue_unref(req->workqueue); + GRPC_WORKQUEUE_UNREF(req->workqueue, "destroy"); gpr_free(req); } diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 69518597d57..5bdce0bfd8a 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -72,7 +72,7 @@ static gpr_mu fd_freelist_mu; static void freelist_fd(grpc_fd *fd) { if (fd->workqueue->wakeup_read_fd != fd) { - grpc_workqueue_unref(fd->workqueue); + GRPC_WORKQUEUE_UNREF(fd->workqueue, "fd"); } gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; @@ -167,7 +167,7 @@ grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name) { /* if the wakeup_read_fd is NULL, then the workqueue is under construction ==> this fd will be the wakeup_read_fd, and we shouldn't take a ref */ if (workqueue->wakeup_read_fd != NULL) { - grpc_workqueue_ref(workqueue); + GRPC_WORKQUEUE_REF(workqueue, "fd"); } grpc_iomgr_register_object(&r->iomgr_object, name); return r; diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 5d1fc687673..67eff3e5285 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -126,8 +126,6 @@ void grpc_iomgr_shutdown(void) { } gpr_mu_unlock(&g_mu); - memset(&g_root_object, 0, sizeof(g_root_object)); - grpc_alarm_list_shutdown(); grpc_iomgr_platform_shutdown(); diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 02d37350f75..c6c716e4dfd 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -151,7 +151,7 @@ static void finish_shutdown(grpc_tcp_server *s) { gpr_mu_destroy(&s->mu); gpr_free(s->ports); - grpc_workqueue_unref(s->workqueue); + GRPC_WORKQUEUE_UNREF(s->workqueue, "destroy"); gpr_free(s); } diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index 96688054fbf..30957f8dee7 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -144,7 +144,7 @@ static void finish_shutdown(grpc_udp_server *s) { gpr_cv_destroy(&s->cv); gpr_free(s->ports); - grpc_workqueue_unref(s->workqueue); + GRPC_WORKQUEUE_UNREF(s->workqueue, "workqueue"); gpr_free(s); } diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h index 0bfa959953c..a236651fbd4 100644 --- a/src/core/iomgr/workqueue.h +++ b/src/core/iomgr/workqueue.h @@ -52,8 +52,23 @@ typedef struct grpc_workqueue grpc_workqueue; /** Create a work queue */ grpc_workqueue *grpc_workqueue_create(void); +void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously); + +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +#define GRPC_WORKQUEUE_REF(p, r) \ + grpc_workqueue_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_WORKQUEUE_UNREF(p, r) \ + grpc_workqueue_unref((p), __FILE__, __LINE__, (r)) +void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, + const char *reason); +void grpc_workqueue_unref(grpc_workqueue *workqueue, const char *file, int line, + const char *reason); +#else +#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p)) +#define GRPC_WORKQUEUE_UNREF(p, r) grpc_workqueue_unref((p)) void grpc_workqueue_ref(grpc_workqueue *workqueue); void grpc_workqueue_unref(grpc_workqueue *workqueue); +#endif /** Bind this workqueue to a pollset */ void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue, diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index 26626bef3bc..ec3ce713b0c 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -35,12 +35,15 @@ #ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/fd_posix.h" #include "src/core/iomgr/workqueue.h" #include #include +#include +#include + +#include "src/core/iomgr/fd_posix.h" static void on_readable(void *arg, int success); @@ -61,15 +64,81 @@ grpc_workqueue *grpc_workqueue_create(void) { return workqueue; } +static void shutdown_thread(void *arg) { + grpc_iomgr_closure *todo = arg; + + while (todo) { + grpc_iomgr_closure *next = todo->next; + todo->cb(todo->cb_arg, todo->success); + todo = next; + } +} + +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +static size_t count_waiting(grpc_workqueue *workqueue) { + size_t i = 0; + grpc_iomgr_closure *c; + for (c = workqueue->head.next; c; c = c->next) { + i++; + } + return i; +} +#endif + +void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously) { + grpc_iomgr_closure *todo; + gpr_thd_id thd; + + gpr_mu_lock(&workqueue->mu); +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG + gpr_log(GPR_DEBUG, "WORKQUEUE:%p flush %d objects %s", workqueue, + count_waiting(workqueue), + asynchronously ? "asynchronously" : "synchronously"); +#endif + todo = workqueue->head.next; + workqueue->head.next = NULL; + workqueue->tail = &workqueue->head; + gpr_mu_unlock(&workqueue->mu); + + if (todo != NULL) { + if (asynchronously) { + gpr_thd_new(&thd, shutdown_thread, todo, NULL); + } else { + while (todo) { + grpc_iomgr_closure *next = todo->next; + todo->cb(todo->cb_arg, todo->success); + todo = next; + } + } + } +} + static void workqueue_destroy(grpc_workqueue *workqueue) { + GPR_ASSERT(workqueue->tail == &workqueue->head); grpc_fd_shutdown(workqueue->wakeup_read_fd); } +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, + const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p ref %d -> %d %s", + workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count + 1, + reason); +#else void grpc_workqueue_ref(grpc_workqueue *workqueue) { +#endif gpr_ref(&workqueue->refs); } +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +void grpc_workqueue_unref(grpc_workqueue *workqueue, const char *file, int line, + const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s", + workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1, + reason); +#else void grpc_workqueue_unref(grpc_workqueue *workqueue) { +#endif if (gpr_unref(&workqueue->refs)) { workqueue_destroy(workqueue); } @@ -94,6 +163,10 @@ static void on_readable(void *arg, int success) { return; } else { gpr_mu_lock(&workqueue->mu); +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG + gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue, + count_waiting(workqueue)); +#endif todo = workqueue->head.next; workqueue->head.next = NULL; workqueue->tail = &workqueue->head; @@ -119,6 +192,10 @@ void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure, } workqueue->tail->next = closure; workqueue->tail = closure; +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG + gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue, + count_waiting(workqueue)); +#endif gpr_mu_unlock(&workqueue->mu); } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index aba0f94fd48..efddb9ee479 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -365,7 +365,7 @@ static void server_delete(grpc_server *server) { } request_matcher_destroy(&server->unregistered_request_matcher); gpr_stack_lockfree_destroy(server->request_freelist); - grpc_workqueue_unref(server->workqueue); + GRPC_WORKQUEUE_UNREF(server->workqueue, "destroy"); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -392,6 +392,7 @@ static void orphan_channel(channel_data *chand) { static void finish_destroy_channel(void *cd, int success) { channel_data *chand = cd; grpc_server *server = chand->server; + gpr_log(GPR_DEBUG, "finish_destroy_channel: %p", chand->channel); GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server"); server_unref(server); } @@ -404,6 +405,8 @@ static void destroy_channel(channel_data *chand) { maybe_finish_shutdown(chand->server); chand->finish_destroy_channel_closure.cb = finish_destroy_channel; chand->finish_destroy_channel_closure.cb_arg = chand; + gpr_log(GPR_DEBUG, "queue finish_destroy_channel: %p on %p", chand->channel, + chand->server->workqueue); grpc_workqueue_push(chand->server->workqueue, &chand->finish_destroy_channel_closure, 1); } @@ -1074,6 +1077,8 @@ void grpc_server_destroy(grpc_server *server) { gpr_mu_unlock(&server->mu_global); + grpc_workqueue_flush(server->workqueue, 0); + server_unref(server); } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 705a025ccaa..896f5a331ae 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -242,8 +242,9 @@ static void init_transport(grpc_chttp2_transport *t, t->parsing.deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->writing.is_client = is_client; - grpc_connectivity_state_init(&t->channel_callback.state_tracker, workqueue, - GRPC_CHANNEL_READY, "transport"); + grpc_connectivity_state_init( + &t->channel_callback.state_tracker, workqueue, GRPC_CHANNEL_READY, + is_client ? "client_transport" : "server_transport"); gpr_slice_buffer_init(&t->global.qbuf); diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index 716280505e8..cf23adfbb23 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -61,6 +61,8 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, const char *name) { tracker->current_state = init_state; tracker->watchers = NULL; + tracker->workqueue = workqueue; + GRPC_WORKQUEUE_REF(workqueue, name); tracker->name = gpr_strdup(name); } @@ -79,6 +81,7 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { grpc_workqueue_push(tracker->workqueue, w->notify, success); gpr_free(w); } + GRPC_WORKQUEUE_UNREF(tracker->workqueue, tracker->name); gpr_free(tracker->name); } diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index 01fcb9275e4..48534ce0605 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -168,6 +168,6 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator, grpc_completion_queue_destroy(a.cq); gpr_slice_buffer_destroy(&outgoing); - grpc_workqueue_unref(workqueue); + GRPC_WORKQUEUE_UNREF(workqueue, "destroy"); grpc_shutdown(); } diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.c b/test/core/end2end/fixtures/h2_sockpair+trace.c index 13cc6b4d99e..16dc7044051 100644 --- a/test/core/end2end/fixtures/h2_sockpair+trace.c +++ b/test/core/end2end/fixtures/h2_sockpair+trace.c @@ -171,7 +171,7 @@ int main(int argc, char **argv) { grpc_end2end_tests(configs[i]); } - grpc_workqueue_unref(g_workqueue); + GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); grpc_shutdown(); return 0; diff --git a/test/core/end2end/fixtures/h2_sockpair.c b/test/core/end2end/fixtures/h2_sockpair.c index 044c9e47b04..9d7e973b8e2 100644 --- a/test/core/end2end/fixtures/h2_sockpair.c +++ b/test/core/end2end/fixtures/h2_sockpair.c @@ -157,7 +157,8 @@ int main(int argc, char **argv) { grpc_end2end_tests(configs[i]); } - grpc_workqueue_unref(g_workqueue); + grpc_workqueue_flush(g_workqueue, 1); + GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); grpc_shutdown(); return 0; diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.c b/test/core/end2end/fixtures/h2_sockpair_1byte.c index 0fec408c30c..2a3002bc579 100644 --- a/test/core/end2end/fixtures/h2_sockpair_1byte.c +++ b/test/core/end2end/fixtures/h2_sockpair_1byte.c @@ -157,7 +157,7 @@ int main(int argc, char **argv) { grpc_end2end_tests(configs[i]); } - grpc_workqueue_unref(g_workqueue); + GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); grpc_shutdown(); return 0; diff --git a/test/core/iomgr/endpoint_pair_test.c b/test/core/iomgr/endpoint_pair_test.c index 3c1c50e3c44..9fe54771bd5 100644 --- a/test/core/iomgr/endpoint_pair_test.c +++ b/test/core/iomgr/endpoint_pair_test.c @@ -74,7 +74,7 @@ int main(int argc, char **argv) { g_workqueue = grpc_workqueue_create(); grpc_endpoint_tests(configs[0], &g_pollset); grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); - grpc_workqueue_unref(g_workqueue); + GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); grpc_shutdown(); return 0; diff --git a/test/core/iomgr/fd_conservation_posix_test.c b/test/core/iomgr/fd_conservation_posix_test.c index f5fd3b017d9..30852050ceb 100644 --- a/test/core/iomgr/fd_conservation_posix_test.c +++ b/test/core/iomgr/fd_conservation_posix_test.c @@ -61,7 +61,7 @@ int main(int argc, char **argv) { grpc_endpoint_destroy(p.server); } - grpc_workqueue_unref(workqueue); + GRPC_WORKQUEUE_UNREF(workqueue, "destroy"); grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 8b68439dce1..0f9b323e770 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -495,7 +495,7 @@ int main(int argc, char **argv) { test_grpc_fd(); test_grpc_fd_change(); grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); - grpc_workqueue_unref(g_workqueue); + GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index c73347b94fb..4a6786c0a24 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -247,7 +247,7 @@ int main(int argc, char **argv) { test_times_out(); grpc_pollset_set_destroy(&g_pollset_set); grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); - grpc_workqueue_unref(g_workqueue); + GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); grpc_shutdown(); return 0; } diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 6d080c2b975..01162eb2252 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -479,7 +479,7 @@ int main(int argc, char **argv) { grpc_pollset_init(&g_pollset); run_tests(); grpc_endpoint_tests(configs[0], &g_pollset); - grpc_workqueue_unref(g_workqueue); + GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); grpc_shutdown(); diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c index 99a9de7c9d0..5663af80e8d 100644 --- a/test/core/iomgr/workqueue_test.c +++ b/test/core/iomgr/workqueue_test.c @@ -66,7 +66,7 @@ static void test_add_closure(void) { GPR_ASSERT(done); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_workqueue_unref(wq); + GRPC_WORKQUEUE_UNREF(wq, "destroy"); } static void done_shutdown(void *arg) { grpc_pollset_destroy(arg); } diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index 67fa9322a70..e0bdea527ad 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -170,7 +170,7 @@ int main(int argc, char **argv) { grpc_pollset_init(&g_pollset); grpc_endpoint_tests(configs[0], &g_pollset); test_leftover(configs[1], 1); - grpc_workqueue_unref(g_workqueue); + GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); grpc_shutdown();