From 94aff9ea345eeb7d6f7bfe9ed87981a1fcdfe3ba Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Mon, 10 Apr 2017 10:25:03 -0700 Subject: [PATCH] cq_next --- src/core/lib/surface/completion_queue.c | 78 ++++++++++++------------- 1 file changed, 37 insertions(+), 41 deletions(-) diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 6e0a4c138ec..65204eaf121 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -83,7 +83,7 @@ struct grpc_completion_queue { useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; /** 0 initially, 1 once we've begun shutting down */ - int shutdown; + gpr_atm shutdown; int shutdown_called; int is_server_cq; /** Can the server cq accept incoming channels */ @@ -147,7 +147,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal( gpr_ref_init(&cc->owning_refs, 2); cc->completed_tail = &cc->completed_head; cc->completed_head.next = (uintptr_t)cc->completed_tail; - cc->shutdown = 0; + gpr_atm_no_barrier_store(&cc->shutdown, 0); cc->shutdown_called = 0; cc->is_server_cq = 0; cc->is_non_listening_server_cq = 0; @@ -245,9 +245,9 @@ void grpc_cq_end_op_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, } } else { - GPR_ASSERT(!cc->shutdown); + GPR_ASSERT(!gpr_atm_no_barrier_load(&cc->shutdown)); GPR_ASSERT(cc->shutdown_called); - cc->shutdown = 1; + gpr_atm_no_barrier_store(&cc->shutdown, 1); grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), &cc->pollset_shutdown_done); gpr_mu_unlock(cc->mu); @@ -337,9 +337,9 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, cc->completed_tail->next = ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); cc->completed_tail = storage; - GPR_ASSERT(!cc->shutdown); + GPR_ASSERT(!gpr_atm_no_barrier_load(&cc->shutdown)); GPR_ASSERT(cc->shutdown_called); - cc->shutdown = 1; + gpr_atm_no_barrier_store(&cc->shutdown, 1); grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), &cc->pollset_shutdown_done); gpr_mu_unlock(cc->mu); @@ -359,6 +359,7 @@ typedef struct { bool first_loop; } cq_is_finished_arg; +/* TODO (sreek) FIX THIS */ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; @@ -427,9 +428,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 5, - (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, - reserved)); + 5, (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, + reserved)); GPR_ASSERT(!reserved); dump_pending_tags(cc); @@ -437,7 +437,6 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "next"); - gpr_mu_lock(cc->mu); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cc->things_queued_ever), @@ -448,9 +447,9 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, .first_loop = true}; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg); + for (;;) { if (is_finished_arg.stolen_completion != NULL) { - gpr_mu_unlock(cc->mu); grpc_cq_completion *c = is_finished_arg.stolen_completion; is_finished_arg.stolen_completion = NULL; ret.type = GRPC_OP_COMPLETE; @@ -459,28 +458,27 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, c->done(&exec_ctx, c->done_arg, c); break; } - if (cc->completed_tail != &cc->completed_head) { - grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next; - cc->completed_head.next = c->next & ~(uintptr_t)1; - if (c == cc->completed_tail) { - cc->completed_tail = &cc->completed_head; - } - gpr_mu_unlock(cc->mu); + + gpr_mu_lock(&cc->queue_mu); + grpc_cq_completion *c = (grpc_cq_completion *)gpr_mpscq_pop(&cc->queue); + gpr_mu_unlock(&cc->queue_mu); + + if (c != NULL) { ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; c->done(&exec_ctx, c->done_arg, c); break; } - if (cc->shutdown) { - gpr_mu_unlock(cc->mu); + + if (gpr_atm_no_barrier_load(&cc->shutdown)) { memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; } + now = gpr_now(GPR_CLOCK_MONOTONIC); if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { - gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cc); @@ -488,32 +486,31 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, } /* Check alarms - these are a global resource so we just ping each time through on every pollset. - May update deadline to ensure timely wakeups. - TODO(ctiller): can this work be localized? */ + May update deadline to ensure timely wakeups. */ gpr_timespec iteration_deadline = deadline; if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { GPR_TIMER_MARK("alarm_triggered", 0); - gpr_mu_unlock(cc->mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(cc->mu); continue; - } else { - grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL, - now, iteration_deadline); - if (err != GRPC_ERROR_NONE) { - gpr_mu_unlock(cc->mu); - const char *msg = grpc_error_string(err); - gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); + } - GRPC_ERROR_UNREF(err); - memset(&ret, 0, sizeof(ret)); - ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); - break; - } + gpr_mu_lock(cc->mu); + grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL, + now, iteration_deadline); + gpr_mu_unlock(cc->mu); + if (err != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); + + GRPC_ERROR_UNREF(err); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_TIMEOUT; + dump_pending_tags(cc); + break; } is_finished_arg.first_loop = false; } + GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); grpc_exec_ctx_finish(&exec_ctx); @@ -603,9 +600,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 6, - (cc, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, - reserved)); + 6, (cc, tag, deadline.tv_sec, deadline.tv_nsec, + (int)deadline.clock_type, reserved)); } GPR_ASSERT(!reserved);