From 916a5003a8046cf36a0140fc8cbaac04ffda5f71 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 26 May 2015 11:49:10 -0700 Subject: [PATCH] Add silent events to completion queue - to allow server shutdown to complete --- src/core/surface/completion_queue.c | 20 ++++++++++++++++++++ src/core/surface/completion_queue.h | 6 ++++++ src/core/surface/server.c | 14 ++++++++++---- 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index b48fbace315..bc7c15178c8 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -167,6 +167,26 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, } } +void grpc_cq_begin_silent_op(grpc_completion_queue *cc) { + gpr_ref(&cc->refs); +} + +void grpc_cq_end_silent_op(grpc_completion_queue *cc) { + int shutdown = 0; + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + if (gpr_unref(&cc->refs)) { + GPR_ASSERT(!cc->shutdown); + GPR_ASSERT(cc->shutdown_called); + cc->shutdown = 1; + gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); + shutdown = 1; + } + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + if (shutdown) { + grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); + } +} + /* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */ static event *create_shutdown_event(void) { event *ev = gpr_malloc(sizeof(event)); diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 7b6fad98fdf..896b7f708be 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -50,6 +50,12 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call); void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, int success); +/* Begin a 'silent operation' - one that blocks completion queue shutdown + until it is ended */ +void grpc_cq_begin_silent_op(grpc_completion_queue *cc); +/* End such an operation */ +void grpc_cq_end_silent_op(grpc_completion_queue *cc); + /* disable polling for some tests */ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 90d5acc569d..2802671bdc4 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -533,8 +533,9 @@ static void destroy_call_elem(grpc_call_element *elem) { call_list_remove(elem->call_data, i); } if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) { - for (i = 0; i < chand->server->num_shutdown_tags; i++) { - for (j = 0; j < chand->server->cq_count; j++) { + for (j = 0; j < chand->server->cq_count; j++) { + grpc_cq_end_silent_op(chand->server->cqs[j]); + for (i = 0; i < chand->server->num_shutdown_tags; i++) { grpc_cq_end_op(chand->server->cqs[j], chand->server->shutdown_tags[i], NULL, 1); } @@ -821,6 +822,10 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, return; } + for (i = 0; i < server->cq_count; i++) { + grpc_cq_begin_silent_op(server->cqs[i]); + } + nchannels = 0; for (c = server->root_channel_data.next; c != &server->root_channel_data; c = c->next) { @@ -857,8 +862,9 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, server->shutdown = 1; if (server->lists[ALL_CALLS] == NULL) { - for (i = 0; i < server->num_shutdown_tags; i++) { - for (j = 0; j < server->cq_count; j++) { + for (j = 0; j < server->cq_count; j++) { + grpc_cq_end_silent_op(server->cqs[j]); + for (i = 0; i < server->num_shutdown_tags; i++) { grpc_cq_end_op(server->cqs[j], server->shutdown_tags[i], NULL, 1); } }