Implement is_finished for cqs

pull/7644/head
Craig Tiller 8 years ago
parent 1e1cd5084b
commit 2f42fdef8e
  1. 6
      src/core/lib/iomgr/exec_ctx.h
  2. 81
      src/core/lib/surface/completion_queue.c

@ -75,6 +75,8 @@ struct grpc_exec_ctx {
bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
/* initializer for grpc_exec_ctx:
prefer to use GRPC_EXEC_CTX_INIT whenever possible */
#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
{ \
GRPC_CLOSURE_LIST_INIT, NULL, NULL, NULL, false, finish_check_arg, \
@ -90,8 +92,10 @@ struct grpc_exec_ctx {
{ false, finish_check_arg, finish_check }
#endif
/* initialize an execution context at the top level of an API call into grpc
(this is safe to use elsewhere, though possibly not as efficient) */
#define GRPC_EXEC_CTX_INIT \
GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_never_ready_to_finish, NULL)
GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_always_ready_to_finish, NULL)
/** Flush any work that has been enqueued onto this grpc_exec_ctx.
* Caller must guarantee that no interfering locks are held.

@ -313,13 +313,37 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GRPC_ERROR_UNREF(error);
}
typedef struct {
grpc_completion_queue *cq;
gpr_timespec deadline;
grpc_cq_completion *stolen_completion;
void *tag; /* for pluck */
} cq_is_finished_arg;
static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = arg;
grpc_completion_queue *cq = a->cq;
GPR_ASSERT(a->stolen_completion == NULL);
gpr_mu_lock(cq->mu);
if (cq->completed_tail != &cq->completed_head) {
a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next;
cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1;
if (a->stolen_completion == cq->completed_tail) {
cq->completed_tail = &cq->completed_head;
}
gpr_mu_unlock(cq->mu);
return true;
}
gpr_mu_unlock(cq->mu);
return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) > 0;
}
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
grpc_pollset_worker *worker = NULL;
int first_loop = 1;
gpr_timespec now;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
@ -335,9 +359,23 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
cq_is_finished_arg is_finished_arg = {cc, deadline, NULL, NULL};
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
cq_is_next_finished, &is_finished_arg);
GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(cc->mu);
for (;;) {
if (is_finished_arg.stolen_completion != NULL) {
gpr_mu_unlock(cc->mu);
grpc_cq_completion *c = is_finished_arg.stolen_completion;
is_finished_arg.stolen_completion = NULL;
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
c->done(&exec_ctx, c->done_arg, c);
break;
}
if (cc->completed_tail != &cc->completed_head) {
grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next;
cc->completed_head.next = c->next & ~(uintptr_t)1;
@ -394,6 +432,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "next");
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
GPR_TIMER_END("grpc_completion_queue_next", 0);
@ -424,6 +463,30 @@ static void del_plucker(grpc_completion_queue *cc, void *tag,
GPR_UNREACHABLE_CODE(return );
}
static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = arg;
grpc_completion_queue *cq = a->cq;
GPR_ASSERT(a->stolen_completion == NULL);
gpr_mu_lock(cq->mu);
grpc_cq_completion *c;
grpc_cq_completion *prev = &cq->completed_head;
while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
&cq->completed_head) {
if (c->tag == a->tag) {
prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1);
if (c == cq->completed_tail) {
cq->completed_tail = prev;
}
gpr_mu_unlock(cq->mu);
a->stolen_completion = c;
return true;
}
prev = c;
}
gpr_mu_unlock(cq->mu);
return gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) > 0;
}
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
@ -432,7 +495,6 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_pollset_worker *worker = NULL;
gpr_timespec now;
int first_loop = 1;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
@ -450,9 +512,23 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
cq_is_finished_arg is_finished_arg = {cc, deadline, NULL, tag};
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(
cq_is_pluck_finished, &is_finished_arg);
GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(cc->mu);
for (;;) {
if (is_finished_arg.stolen_completion != NULL) {
gpr_mu_unlock(cc->mu);
grpc_cq_completion *c = is_finished_arg.stolen_completion;
is_finished_arg.stolen_completion = NULL;
ret.type = GRPC_OP_COMPLETE;
ret.success = c->next & 1u;
ret.tag = c->tag;
c->done(&exec_ctx, c->done_arg, c);
break;
}
prev = &cc->completed_head;
while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) !=
&cc->completed_head) {
@ -527,6 +603,7 @@ done:
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
GPR_TIMER_END("grpc_completion_queue_pluck", 0);

Loading…
Cancel
Save