From 736dd90614e810a8cbf35e764ef5e9d7d7be90b3 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 25 Apr 2017 18:26:53 -0700 Subject: [PATCH] correctly use cq vtable for all functions --- src/core/lib/surface/completion_queue.c | 123 ++++++++++++++++-------- src/core/lib/surface/completion_queue.h | 16 --- 2 files changed, 82 insertions(+), 57 deletions(-) diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 266af30cfff..cfe23ec6d79 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -278,22 +278,52 @@ struct grpc_completion_queue { const cq_poller_vtable *poller_vtable; }; +/* Forward declarations */ +static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc); + +static size_t cq_size(grpc_completion_queue *cc); + +static void cq_begin_op(grpc_completion_queue *cc, void *tag); + +static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage); + +static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage); + +static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, + void *reserved); + +static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, + gpr_timespec deadline, void *reserved); + /* Completion queue vtables based on the completion-type */ static const cq_vtable g_cq_vtable[] = { /* GRPC_CQ_NEXT */ {.cq_completion_type = GRPC_CQ_NEXT, - .size = grpc_cq_size, - .begin_op = grpc_cq_begin_op, - .end_op = grpc_cq_end_op_for_next, - .next = grpc_completion_queue_next, + .size = cq_size, + .begin_op = cq_begin_op, + .end_op = cq_end_op_for_next, + .next = cq_next, .pluck = NULL}, /* GRPC_CQ_PLUCK */ {.cq_completion_type = GRPC_CQ_PLUCK, - .size = grpc_cq_size, - .begin_op = grpc_cq_begin_op, - .end_op = grpc_cq_end_op_for_pluck, + .size = cq_size, + .begin_op = cq_begin_op, + .end_op = cq_end_op_for_pluck, .next = NULL, - .pluck = grpc_completion_queue_pluck}, + .pluck = cq_pluck}, }; #define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) @@ -348,7 +378,7 @@ static long cq_event_queue_num_items(grpc_cq_event_queue *q) { return (long)gpr_atm_no_barrier_load(&q->num_queue_items); } -size_t grpc_cq_size(grpc_completion_queue *cc) { +static size_t cq_size(grpc_completion_queue *cc) { /* Size of the completion queue and the size of the pollset whose memory is allocated right after that of completion queue */ return sizeof(grpc_completion_queue) + cc->poller_vtable->size(); @@ -450,7 +480,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { } } -void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { +static void cq_begin_op(grpc_completion_queue *cc, void *tag) { cq_data *cqd = &cc->data; #ifndef NDEBUG gpr_mu_lock(cqd->mu); @@ -468,8 +498,12 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { gpr_ref(&cqd->pending_events); } +void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { + cc->vtable->begin_op(cc, tag); +} + #ifndef NDEBUG -void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) { +static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) { cq_data *cqd = &cc->data; int found = 0; if (lock_cq) { @@ -493,28 +527,25 @@ void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) { GPR_ASSERT(found); } #else -void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) {} +static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {} #endif -/* Forward declaration */ -static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc); - /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion * type of GRPC_CQ_NEXT) */ -void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, - void *tag, grpc_error *error, - void (*done)(grpc_exec_ctx *exec_ctx, - void *done_arg, - grpc_cq_completion *storage), - void *done_arg, grpc_cq_completion *storage) { - GPR_TIMER_BEGIN("grpc_cq_end_op_for_next", 0); +static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { + GPR_TIMER_BEGIN("cq_end_op_for_next", 0); if (grpc_api_trace || (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "grpc_cq_end_op_for_mext(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "cq_end_op_for_next(exec_ctx=%p, cc=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { @@ -530,7 +561,7 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, storage->done_arg = done_arg; storage->next = (uintptr_t)(is_success); - check_tag_in_cq(cc, tag, true); /* Used in debug builds only */ + cq_check_tag(cc, tag, true); /* Used in debug builds only */ /* Add the completion to the queue */ cq_event_queue_push(&cqd->queue, storage); @@ -554,30 +585,30 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, gpr_mu_unlock(cqd->mu); } - GPR_TIMER_END("grpc_cq_end_op_for_next", 0); + GPR_TIMER_END("cq_end_op_for_next", 0); GRPC_ERROR_UNREF(error); } /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion * type of GRPC_CQ_PLUCK) */ -void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, - grpc_error *error, - void (*done)(grpc_exec_ctx *exec_ctx, - void *done_arg, - grpc_cq_completion *storage), - void *done_arg, grpc_cq_completion *storage) { +static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { cq_data *cqd = &cc->data; int is_success = (error == GRPC_ERROR_NONE); - GPR_TIMER_BEGIN("grpc_cq_end_op_for_pluck", 0); + GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0); if (grpc_api_trace || (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "grpc_cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { @@ -591,7 +622,7 @@ void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success)); gpr_mu_lock(cqd->mu); - check_tag_in_cq(cc, tag, false); /* Used in debug builds only */ + cq_check_tag(cc, tag, false); /* Used in debug builds only */ /* Add to the list of completions */ gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); @@ -625,7 +656,7 @@ void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(cqd->mu); } - GPR_TIMER_END("grpc_cq_end_op_for_pluck", 0); + GPR_TIMER_END("cq_end_op_for_pluck", 0); GRPC_ERROR_UNREF(error); } @@ -698,8 +729,8 @@ static void dump_pending_tags(grpc_completion_queue *cc) { static void dump_pending_tags(grpc_completion_queue *cc) {} #endif -grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, - gpr_timespec deadline, void *reserved) { +static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, + void *reserved) { grpc_event ret; gpr_timespec now; cq_data *cqd = &cc->data; @@ -827,6 +858,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, return ret; } +grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, + gpr_timespec deadline, void *reserved) { + return cc->vtable->next(cc, deadline, reserved); +} + static int add_plucker(grpc_completion_queue *cc, void *tag, grpc_pollset_worker **worker) { cq_data *cqd = &cc->data; @@ -885,8 +921,8 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; } -grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, - gpr_timespec deadline, void *reserved) { +static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, + gpr_timespec deadline, void *reserved) { grpc_event ret; grpc_cq_completion *c; grpc_cq_completion *prev; @@ -1019,6 +1055,11 @@ done: return ret; } +grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, + gpr_timespec deadline, void *reserved) { + return cc->vtable->pluck(cc, tag, deadline, reserved); +} + /* Finishes the completion queue shutdown. This means that there are no more completion events / tags expected from the completion queue - Must be called under completion queue lock diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 4d6aa7fd91d..f7eb148982b 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -80,8 +80,6 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc); #define GRPC_CQ_INTERNAL_UNREF(cc, reason) grpc_cq_internal_unref(cc) #endif -size_t grpc_cq_size(grpc_completion_queue *cc); - /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made. \a tag is currently used only in debug builds. */ @@ -89,20 +87,6 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag); /* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to grpc_cq_begin_op */ -void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, - void *tag, grpc_error *error, - void (*done)(grpc_exec_ctx *exec_ctx, - void *done_arg, - grpc_cq_completion *storage), - void *done_arg, grpc_cq_completion *storage); -void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, - grpc_error *error, - void (*done)(grpc_exec_ctx *exec_ctx, - void *done_arg, - grpc_cq_completion *storage), - void *done_arg, grpc_cq_completion *storage); - void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,