|
|
|
@ -71,16 +71,15 @@ grpc_completion_queue *grpc_completion_queue_create(void) { |
|
|
|
|
gpr_ref_init(&cc->owning_refs, 2); |
|
|
|
|
grpc_pollset_init(&cc->pollset); |
|
|
|
|
cc->completed_tail = &cc->completed_head; |
|
|
|
|
cc->completed_head.next = (gpr_uintptr) cc->completed_tail; |
|
|
|
|
cc->completed_head.next = (gpr_uintptr)cc->completed_tail; |
|
|
|
|
return cc; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#ifdef GRPC_CQ_REF_COUNT_DEBUG |
|
|
|
|
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, |
|
|
|
|
const char *file, int line) { |
|
|
|
|
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", |
|
|
|
|
cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, |
|
|
|
|
reason); |
|
|
|
|
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cc, |
|
|
|
|
(int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason); |
|
|
|
|
#else |
|
|
|
|
void grpc_cq_internal_ref(grpc_completion_queue *cc) { |
|
|
|
|
#endif |
|
|
|
@ -95,14 +94,13 @@ static void on_pollset_destroy_done(void *arg) { |
|
|
|
|
#ifdef GRPC_CQ_REF_COUNT_DEBUG |
|
|
|
|
void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, |
|
|
|
|
const char *file, int line) { |
|
|
|
|
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", |
|
|
|
|
cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, |
|
|
|
|
reason); |
|
|
|
|
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc, |
|
|
|
|
(int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason); |
|
|
|
|
#else |
|
|
|
|
void grpc_cq_internal_unref(grpc_completion_queue *cc) { |
|
|
|
|
#endif |
|
|
|
|
if (gpr_unref(&cc->owning_refs)) { |
|
|
|
|
GPR_ASSERT(cc->completed_head.next == (gpr_uintptr) &cc->completed_head); |
|
|
|
|
GPR_ASSERT(cc->completed_head.next == (gpr_uintptr)&cc->completed_head); |
|
|
|
|
grpc_pollset_destroy(&cc->pollset); |
|
|
|
|
gpr_free(cc); |
|
|
|
|
} |
|
|
|
@ -115,28 +113,27 @@ void grpc_cq_begin_op(grpc_completion_queue *cc) { |
|
|
|
|
/* Signal the end of an operation - if this is the last waiting-to-be-queued
|
|
|
|
|
event, then enter shutdown mode */ |
|
|
|
|
/* Queue a GRPC_OP_COMPLETED operation */ |
|
|
|
|
void grpc_cq_end_op( |
|
|
|
|
grpc_completion_queue *cc,
|
|
|
|
|
void *tag,
|
|
|
|
|
int success,
|
|
|
|
|
void (*done)(void *done_arg, grpc_cq_completion *storage),
|
|
|
|
|
void *done_arg, |
|
|
|
|
grpc_cq_completion *storage) { |
|
|
|
|
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, |
|
|
|
|
void (*done)(void *done_arg, grpc_cq_completion *storage), |
|
|
|
|
void *done_arg, grpc_cq_completion *storage) { |
|
|
|
|
int shutdown = gpr_unref(&cc->pending_events); |
|
|
|
|
|
|
|
|
|
storage->tag = tag; |
|
|
|
|
storage->done = done; |
|
|
|
|
storage->done_arg = done_arg; |
|
|
|
|
storage->next = ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0)); |
|
|
|
|
storage->next = |
|
|
|
|
((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0)); |
|
|
|
|
|
|
|
|
|
if (!shutdown) { |
|
|
|
|
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); |
|
|
|
|
cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); |
|
|
|
|
cc->completed_tail->next = |
|
|
|
|
((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); |
|
|
|
|
cc->completed_tail = storage; |
|
|
|
|
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); |
|
|
|
|
} else { |
|
|
|
|
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); |
|
|
|
|
cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); |
|
|
|
|
cc->completed_tail->next = |
|
|
|
|
((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); |
|
|
|
|
cc->completed_tail = storage; |
|
|
|
|
GPR_ASSERT(!cc->shutdown); |
|
|
|
|
GPR_ASSERT(cc->shutdown_called); |
|
|
|
@ -154,7 +151,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, |
|
|
|
|
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); |
|
|
|
|
for (;;) { |
|
|
|
|
if (cc->completed_tail != &cc->completed_head) { |
|
|
|
|
grpc_cq_completion *c = (grpc_cq_completion *) cc->completed_head.next; |
|
|
|
|
grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next; |
|
|
|
|
cc->completed_head.next = c->next & ~(gpr_uintptr)1; |
|
|
|
|
if (c == cc->completed_tail) { |
|
|
|
|
cc->completed_tail = &cc->completed_head; |
|
|
|
@ -194,9 +191,11 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, |
|
|
|
|
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); |
|
|
|
|
for (;;) { |
|
|
|
|
prev = &cc->completed_head; |
|
|
|
|
while ((c = (grpc_cq_completion*)(prev->next & ~(gpr_uintptr)1)) != &cc->completed_head) { |
|
|
|
|
while ((c = (grpc_cq_completion *)(prev->next & ~(gpr_uintptr)1)) != |
|
|
|
|
&cc->completed_head) { |
|
|
|
|
if (c->tag == tag) { |
|
|
|
|
prev->next = (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1); |
|
|
|
|
prev->next = |
|
|
|
|
(prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1); |
|
|
|
|
if (c == cc->completed_tail) { |
|
|
|
|
cc->completed_tail = prev; |
|
|
|
|
} |
|
|
|
|