Merge branch 'who-combines-the-combiners' into direct-calls

reviewable/pr8008/r2
Craig Tiller 8 years ago
commit 39b9914d85
  1. 89
      src/core/lib/surface/completion_queue.c

@ -71,6 +71,9 @@ struct grpc_completion_queue {
gpr_refcount pending_events;
/** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs;
/** counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever;
/** 0 initially, 1 once we've begun shutting down */
int shutdown;
int shutdown_called;
@ -125,15 +128,6 @@ void grpc_cq_global_shutdown(void) {
}
}
struct grpc_cq_alarm {
grpc_timer alarm;
grpc_cq_completion completion;
/** completion queue where events about this alarm will be posted */
grpc_completion_queue *cq;
/** user supplied tag */
void *tag;
};
grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_completion_queue *cc;
GPR_ASSERT(!reserved);
@ -170,6 +164,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc->is_server_cq = 0;
cc->is_non_listening_server_cq = 0;
cc->num_pluckers = 0;
gpr_atm_no_barrier_store(&cc->things_queued_ever, 0);
#ifndef NDEBUG
cc->outstanding_tag_count = 0;
#endif
@ -280,6 +275,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GPR_ASSERT(found);
#endif
shutdown = gpr_unref(&cc->pending_events);
gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1);
if (!shutdown) {
cc->completed_tail->next =
((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next);
@ -318,6 +314,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
}
typedef struct {
gpr_atm last_seen_things_queued_ever;
grpc_completion_queue *cq;
gpr_timespec deadline;
grpc_cq_completion *stolen_completion;
@ -328,17 +325,23 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = arg;
grpc_completion_queue *cq = a->cq;
GPR_ASSERT(a->stolen_completion == NULL);
gpr_mu_lock(cq->mu);
if (cq->completed_tail != &cq->completed_head) {
a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next;
cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1;
if (a->stolen_completion == cq->completed_tail) {
cq->completed_tail = &cq->completed_head;
gpr_atm current_last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cq->things_queued_ever);
if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
gpr_mu_lock(cq->mu);
a->last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cq->things_queued_ever);
if (cq->completed_tail != &cq->completed_head) {
a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next;
cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1;
if (a->stolen_completion == cq->completed_tail) {
cq->completed_tail = &cq->completed_head;
}
gpr_mu_unlock(cq->mu);
return true;
}
gpr_mu_unlock(cq->mu);
return true;
}
gpr_mu_unlock(cq->mu);
return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
}
@ -386,12 +389,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
cq_is_finished_arg is_finished_arg = {cc, deadline, NULL, NULL};
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
cq_is_next_finished, &is_finished_arg);
GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(cc->mu);
cq_is_finished_arg is_finished_arg = {
gpr_atm_no_barrier_load(&cc->things_queued_ever), cc, deadline, NULL,
NULL};
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
cq_is_next_finished, &is_finished_arg);
for (;;) {
if (is_finished_arg.stolen_completion != NULL) {
gpr_mu_unlock(cc->mu);
@ -496,23 +500,29 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = arg;
grpc_completion_queue *cq = a->cq;
GPR_ASSERT(a->stolen_completion == NULL);
gpr_mu_lock(cq->mu);
grpc_cq_completion *c;
grpc_cq_completion *prev = &cq->completed_head;
while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
&cq->completed_head) {
if (c->tag == a->tag) {
prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
if (c == cq->completed_tail) {
cq->completed_tail = prev;
gpr_atm current_last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cq->things_queued_ever);
if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) {
gpr_mu_lock(cq->mu);
a->last_seen_things_queued_ever =
gpr_atm_no_barrier_load(&cq->things_queued_ever);
grpc_cq_completion *c;
grpc_cq_completion *prev = &cq->completed_head;
while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
&cq->completed_head) {
if (c->tag == a->tag) {
prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
if (c == cq->completed_tail) {
cq->completed_tail = prev;
}
gpr_mu_unlock(cq->mu);
a->stolen_completion = c;
return true;
}
gpr_mu_unlock(cq->mu);
a->stolen_completion = c;
return true;
prev = c;
}
prev = c;
gpr_mu_unlock(cq->mu);
}
gpr_mu_unlock(cq->mu);
return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
}
@ -543,12 +553,13 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
cq_is_finished_arg is_finished_arg = {cc, deadline, NULL, tag};
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
cq_is_pluck_finished, &is_finished_arg);
GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(cc->mu);
cq_is_finished_arg is_finished_arg = {
gpr_atm_no_barrier_load(&cc->things_queued_ever), cc, deadline, NULL,
tag};
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
cq_is_pluck_finished, &is_finished_arg);
for (;;) {
if (is_finished_arg.stolen_completion != NULL) {
gpr_mu_unlock(cc->mu);

Loading…
Cancel
Save