diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 84749053968..ac4674bbacb 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -75,6 +75,8 @@ struct grpc_exec_ctx { bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); }; +/* initializer for grpc_exec_ctx: + prefer to use GRPC_EXEC_CTX_INIT whenever possible */ #define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \ { \ GRPC_CLOSURE_LIST_INIT, NULL, NULL, NULL, false, finish_check_arg, \ @@ -90,8 +92,10 @@ struct grpc_exec_ctx { { false, finish_check_arg, finish_check } #endif +/* initialize an execution context at the top level of an API call into grpc + (this is safe to use elsewhere, though possibly not as efficient) */ #define GRPC_EXEC_CTX_INIT \ - GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_never_ready_to_finish, NULL) + GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_always_ready_to_finish, NULL) /** Flush any work that has been enqueued onto this grpc_exec_ctx. * Caller must guarantee that no interfering locks are held. diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 5978884db83..9eb4dfc6188 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -313,13 +313,37 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GRPC_ERROR_UNREF(error); } +typedef struct { + grpc_completion_queue *cq; + gpr_timespec deadline; + grpc_cq_completion *stolen_completion; + void *tag; /* for pluck */ +} cq_is_finished_arg; + +static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { + cq_is_finished_arg *a = arg; + grpc_completion_queue *cq = a->cq; + GPR_ASSERT(a->stolen_completion == NULL); + gpr_mu_lock(cq->mu); + if (cq->completed_tail != &cq->completed_head) { + a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next; + cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1; + if (a->stolen_completion == cq->completed_tail) { + cq->completed_tail = &cq->completed_head; + } + gpr_mu_unlock(cq->mu); + return true; + } + gpr_mu_unlock(cq->mu); + return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) > 0; +} + grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline, void *reserved) { grpc_event ret; grpc_pollset_worker *worker = NULL; int first_loop = 1; gpr_timespec now; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); @@ -335,9 +359,23 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); + cq_is_finished_arg is_finished_arg = {cc, deadline, NULL, NULL}; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK( + cq_is_next_finished, &is_finished_arg); + GRPC_CQ_INTERNAL_REF(cc, "next"); gpr_mu_lock(cc->mu); for (;;) { + if (is_finished_arg.stolen_completion != NULL) { + gpr_mu_unlock(cc->mu); + grpc_cq_completion *c = is_finished_arg.stolen_completion; + is_finished_arg.stolen_completion = NULL; + ret.type = GRPC_OP_COMPLETE; + ret.success = c->next & 1u; + ret.tag = c->tag; + c->done(&exec_ctx, c->done_arg, c); + break; + } if (cc->completed_tail != &cc->completed_head) { grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next; cc->completed_head.next = c->next & ~(uintptr_t)1; @@ -394,6 +432,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(is_finished_arg.stolen_completion == NULL); GPR_TIMER_END("grpc_completion_queue_next", 0); @@ -424,6 +463,30 @@ static void del_plucker(grpc_completion_queue *cc, void *tag, GPR_UNREACHABLE_CODE(return ); } +static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { + cq_is_finished_arg *a = arg; + grpc_completion_queue *cq = a->cq; + GPR_ASSERT(a->stolen_completion == NULL); + gpr_mu_lock(cq->mu); + grpc_cq_completion *c; + grpc_cq_completion *prev = &cq->completed_head; + while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != + &cq->completed_head) { + if (c->tag == a->tag) { + prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1); + if (c == cq->completed_tail) { + cq->completed_tail = prev; + } + gpr_mu_unlock(cq->mu); + a->stolen_completion = c; + return true; + } + prev = c; + } + gpr_mu_unlock(cq->mu); + return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) > 0; +} + grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec deadline, void *reserved) { grpc_event ret; @@ -432,7 +495,6 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_pollset_worker *worker = NULL; gpr_timespec now; int first_loop = 1; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); @@ -450,9 +512,23 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); + cq_is_finished_arg is_finished_arg = {cc, deadline, NULL, tag}; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK( + cq_is_pluck_finished, &is_finished_arg); + GRPC_CQ_INTERNAL_REF(cc, "pluck"); gpr_mu_lock(cc->mu); for (;;) { + if (is_finished_arg.stolen_completion != NULL) { + gpr_mu_unlock(cc->mu); + grpc_cq_completion *c = is_finished_arg.stolen_completion; + is_finished_arg.stolen_completion = NULL; + ret.type = GRPC_OP_COMPLETE; + ret.success = c->next & 1u; + ret.tag = c->tag; + c->done(&exec_ctx, c->done_arg, c); + break; + } prev = &cc->completed_head; while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != &cc->completed_head) { @@ -527,6 +603,7 @@ done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "pluck"); grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(is_finished_arg.stolen_completion == NULL); GPR_TIMER_END("grpc_completion_queue_pluck", 0);