Have server hold a reference to completion queues

In the presence of garbage collectors, this helps ensure that completion
queues outlive the servers that depend upon them.
pull/1377/head
Craig Tiller 10 years ago
parent 8bf5fc88d4
commit 8950461da7
  1. 22
      src/core/surface/completion_queue.c
  2. 3
      src/core/surface/completion_queue.h
  3. 5
      src/core/surface/server.c

@ -132,9 +132,22 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
return ev; return ev;
} }
void grpc_cq_internal_ref(grpc_completion_queue *cc) {
gpr_ref(&cc->refs);
}
void grpc_cq_internal_unref(grpc_completion_queue *cc) {
if (gpr_unref(&cc->refs)) {
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
}
}
void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call, void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
grpc_completion_type type) { grpc_completion_type type) {
gpr_ref(&cc->refs); grpc_cq_internal_ref(cc);
if (call) grpc_call_internal_ref(call); if (call) grpc_call_internal_ref(call);
#ifndef NDEBUG #ifndef NDEBUG
gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1); gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1);
@ -148,12 +161,7 @@ static void end_op_locked(grpc_completion_queue *cc,
#ifndef NDEBUG #ifndef NDEBUG
GPR_ASSERT(gpr_atm_full_fetch_add(&cc->pending_op_count[type], -1) > 0); GPR_ASSERT(gpr_atm_full_fetch_add(&cc->pending_op_count[type], -1) > 0);
#endif #endif
if (gpr_unref(&cc->refs)) { grpc_cq_internal_unref(cc);
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
cc->shutdown = 1;
gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
}
} }
void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag) { void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag) {

@ -43,6 +43,9 @@
grpc_event_finish */ grpc_event_finish */
typedef void (*grpc_event_finish_func)(void *user_data, grpc_op_error error); typedef void (*grpc_event_finish_func)(void *user_data, grpc_op_error error);
void grpc_cq_internal_ref(grpc_completion_queue *cc);
void grpc_cq_internal_unref(grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish /* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made */ shutdown until a corrensponding grpc_cq_end_* call is made */
void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call, void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,

@ -262,6 +262,7 @@ static void server_ref(grpc_server *server) {
static void server_unref(grpc_server *server) { static void server_unref(grpc_server *server) {
registered_method *rm; registered_method *rm;
size_t i;
if (gpr_unref(&server->internal_refcount)) { if (gpr_unref(&server->internal_refcount)) {
grpc_channel_args_destroy(server->channel_args); grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu); gpr_mu_destroy(&server->mu);
@ -275,6 +276,9 @@ static void server_unref(grpc_server *server) {
requested_call_array_destroy(&rm->requested); requested_call_array_destroy(&rm->requested);
gpr_free(rm); gpr_free(rm);
} }
for (i = 0; i < server->cq_count; i++) {
grpc_cq_internal_unref(server->cqs[i]);
}
gpr_free(server->cqs); gpr_free(server->cqs);
gpr_free(server->pollsets); gpr_free(server->pollsets);
gpr_free(server->shutdown_tags); gpr_free(server->shutdown_tags);
@ -601,6 +605,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) {
for (i = 0; i < server->cq_count; i++) { for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return; if (server->cqs[i] == cq) return;
} }
grpc_cq_internal_ref(cq);
n = server->cq_count++; n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs, server->cqs = gpr_realloc(server->cqs,
server->cq_count * sizeof(grpc_completion_queue *)); server->cq_count * sizeof(grpc_completion_queue *));

Loading…
Cancel
Save