pull/1731/head
Craig Tiller 10 years ago
parent 640dfaf2ee
commit 6f2d64796d
  1. 16
      src/core/surface/completion_queue.c

@ -80,6 +80,7 @@ struct grpc_completion_queue {
grpc_completion_queue *grpc_completion_queue_create(void) { grpc_completion_queue *grpc_completion_queue_create(void) {
grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue)); grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
gpr_log(GPR_DEBUG, "%s: %p", __FUNCTION__, cc);
memset(cc, 0, sizeof(*cc)); memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */ /* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->refs, 1); gpr_ref_init(&cc->refs, 1);
@ -92,15 +93,19 @@ grpc_completion_queue *grpc_completion_queue_create(void) {
void grpc_cq_internal_ref(grpc_completion_queue *cc) { void grpc_cq_internal_ref(grpc_completion_queue *cc) {
gpr_ref(&cc->owning_refs); gpr_ref(&cc->owning_refs);
gpr_log(GPR_DEBUG, "%s: %p", __FUNCTION__, cc);
} }
static void on_pollset_destroy_done(void *arg) { static void on_pollset_destroy_done(void *arg) {
grpc_completion_queue *cc = arg; grpc_completion_queue *cc = arg;
gpr_log(GPR_DEBUG, "%s: %p", __FUNCTION__, cc);
grpc_cq_internal_unref(cc); grpc_cq_internal_unref(cc);
} }
void grpc_cq_internal_unref(grpc_completion_queue *cc) { void grpc_cq_internal_unref(grpc_completion_queue *cc) {
gpr_log(GPR_DEBUG, "%s: %p", __FUNCTION__, cc);
if (gpr_unref(&cc->owning_refs)) { if (gpr_unref(&cc->owning_refs)) {
gpr_log(GPR_DEBUG, "%s: %p", __FUNCTION__, cc);
GPR_ASSERT(cc->queue == NULL); GPR_ASSERT(cc->queue == NULL);
grpc_pollset_destroy(&cc->pollset); grpc_pollset_destroy(&cc->pollset);
gpr_free(cc); gpr_free(cc);
@ -118,6 +123,7 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
void *tag, grpc_call *call) { void *tag, grpc_call *call) {
event *ev = gpr_malloc(sizeof(event)); event *ev = gpr_malloc(sizeof(event));
gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS; gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
gpr_log(GPR_DEBUG, "%s: %p", __FUNCTION__, cc);
ev->base.type = type; ev->base.type = type;
ev->base.tag = tag; ev->base.tag = tag;
if (cc->queue == NULL) { if (cc->queue == NULL) {
@ -140,6 +146,7 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
} }
void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) { void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) {
gpr_log(GPR_DEBUG, "%s: %p", __FUNCTION__, cc);
gpr_ref(&cc->refs); gpr_ref(&cc->refs);
if (call) GRPC_CALL_INTERNAL_REF(call, "cq"); if (call) GRPC_CALL_INTERNAL_REF(call, "cq");
} }
@ -150,6 +157,7 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
int success) { int success) {
event *ev; event *ev;
int shutdown = 0; int shutdown = 0;
gpr_log(GPR_DEBUG, "%s: %p", __FUNCTION__, cc);
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call); ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call);
ev->base.success = success; ev->base.success = success;
@ -162,9 +170,11 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
} }
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
if (call) GRPC_CALL_INTERNAL_UNREF(call, "cq", 0); if (call) GRPC_CALL_INTERNAL_UNREF(call, "cq", 0);
if (shutdown) gpr_log(GPR_DEBUG, "shutdown=%d", shutdown);
if (shutdown) {
grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
} }
}
/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */ /* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
static event *create_shutdown_event(void) { static event *create_shutdown_event(void) {
@ -179,6 +189,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
event *ev = NULL; event *ev = NULL;
grpc_event ret; grpc_event ret;
gpr_log(GPR_DEBUG, "%s: %p", __FUNCTION__, cc);
grpc_cq_internal_ref(cc); grpc_cq_internal_ref(cc);
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) { for (;;) {
@ -261,6 +272,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
event *ev = NULL; event *ev = NULL;
grpc_event ret; grpc_event ret;
gpr_log(GPR_DEBUG, "%s: %p", __FUNCTION__, cc);
grpc_cq_internal_ref(cc); grpc_cq_internal_ref(cc);
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) { for (;;) {
@ -295,6 +307,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
/* Shutdown simply drops a ref that we reserved at creation time; if we drop /* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here, then enter shutdown mode and wake up any waiters */ to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
gpr_log(GPR_DEBUG, "%s: %p", __FUNCTION__, cc);
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
cc->shutdown_called = 1; cc->shutdown_called = 1;
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
@ -310,6 +323,7 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
} }
void grpc_completion_queue_destroy(grpc_completion_queue *cc) { void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
gpr_log(GPR_DEBUG, "%s: %p", __FUNCTION__, cc);
grpc_cq_internal_unref(cc); grpc_cq_internal_unref(cc);
} }

Loading…
Cancel
Save