From 8950461da777d4f5f2f7bfc85c307544e7b5307f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 27 Apr 2015 11:48:46 -0700 Subject: [PATCH] Have server hold a reference to completion queues In the presence of garbage collectors, this helps ensure that completion queues outlive the servers that depend upon them. --- src/core/surface/completion_queue.c | 22 +++++++++++++++------- src/core/surface/completion_queue.h | 3 +++ src/core/surface/server.c | 5 +++++ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 24f4a05071a..029a4213c04 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -132,9 +132,22 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, return ev; } +void grpc_cq_internal_ref(grpc_completion_queue *cc) { + gpr_ref(&cc->refs); +} + +void grpc_cq_internal_unref(grpc_completion_queue *cc) { + if (gpr_unref(&cc->refs)) { + GPR_ASSERT(!cc->shutdown); + GPR_ASSERT(cc->shutdown_called); + cc->shutdown = 1; + gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); + } +} + void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call, grpc_completion_type type) { - gpr_ref(&cc->refs); + grpc_cq_internal_ref(cc); if (call) grpc_call_internal_ref(call); #ifndef NDEBUG gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1); @@ -148,12 +161,7 @@ static void end_op_locked(grpc_completion_queue *cc, #ifndef NDEBUG GPR_ASSERT(gpr_atm_full_fetch_add(&cc->pending_op_count[type], -1) > 0); #endif - if (gpr_unref(&cc->refs)) { - GPR_ASSERT(!cc->shutdown); - GPR_ASSERT(cc->shutdown_called); - cc->shutdown = 1; - gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); - } + grpc_cq_internal_unref(cc); } void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag) { diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 3a7cc99dda9..41024cda14e 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -43,6 +43,9 @@ grpc_event_finish */ typedef void (*grpc_event_finish_func)(void *user_data, grpc_op_error error); +void grpc_cq_internal_ref(grpc_completion_queue *cc); +void grpc_cq_internal_unref(grpc_completion_queue *cc); + /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made */ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call, diff --git a/src/core/surface/server.c b/src/core/surface/server.c index e7719298701..ff922716e5a 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -262,6 +262,7 @@ static void server_ref(grpc_server *server) { static void server_unref(grpc_server *server) { registered_method *rm; + size_t i; if (gpr_unref(&server->internal_refcount)) { grpc_channel_args_destroy(server->channel_args); gpr_mu_destroy(&server->mu); @@ -275,6 +276,9 @@ static void server_unref(grpc_server *server) { requested_call_array_destroy(&rm->requested); gpr_free(rm); } + for (i = 0; i < server->cq_count; i++) { + grpc_cq_internal_unref(server->cqs[i]); + } gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -601,6 +605,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) { for (i = 0; i < server->cq_count; i++) { if (server->cqs[i] == cq) return; } + grpc_cq_internal_ref(cq); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue *));