correctly use cq vtable for all functions

pull/10662/head
Sree Kuchibhotla 8 years ago
parent 8e3684518d
commit 736dd90614
  1. 123
      src/core/lib/surface/completion_queue.c
  2. 16
      src/core/lib/surface/completion_queue.h

@ -278,22 +278,52 @@ struct grpc_completion_queue {
const cq_poller_vtable *poller_vtable; const cq_poller_vtable *poller_vtable;
}; };
/* Forward declarations */
static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cc);
static size_t cq_size(grpc_completion_queue *cc);
static void cq_begin_op(grpc_completion_queue *cc, void *tag);
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cc, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage);
static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cc, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage);
static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
void *reserved);
static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline, void *reserved);
/* Completion queue vtables based on the completion-type */ /* Completion queue vtables based on the completion-type */
static const cq_vtable g_cq_vtable[] = { static const cq_vtable g_cq_vtable[] = {
/* GRPC_CQ_NEXT */ /* GRPC_CQ_NEXT */
{.cq_completion_type = GRPC_CQ_NEXT, {.cq_completion_type = GRPC_CQ_NEXT,
.size = grpc_cq_size, .size = cq_size,
.begin_op = grpc_cq_begin_op, .begin_op = cq_begin_op,
.end_op = grpc_cq_end_op_for_next, .end_op = cq_end_op_for_next,
.next = grpc_completion_queue_next, .next = cq_next,
.pluck = NULL}, .pluck = NULL},
/* GRPC_CQ_PLUCK */ /* GRPC_CQ_PLUCK */
{.cq_completion_type = GRPC_CQ_PLUCK, {.cq_completion_type = GRPC_CQ_PLUCK,
.size = grpc_cq_size, .size = cq_size,
.begin_op = grpc_cq_begin_op, .begin_op = cq_begin_op,
.end_op = grpc_cq_end_op_for_pluck, .end_op = cq_end_op_for_pluck,
.next = NULL, .next = NULL,
.pluck = grpc_completion_queue_pluck}, .pluck = cq_pluck},
}; };
#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) #define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
@ -348,7 +378,7 @@ static long cq_event_queue_num_items(grpc_cq_event_queue *q) {
return (long)gpr_atm_no_barrier_load(&q->num_queue_items); return (long)gpr_atm_no_barrier_load(&q->num_queue_items);
} }
size_t grpc_cq_size(grpc_completion_queue *cc) { static size_t cq_size(grpc_completion_queue *cc) {
/* Size of the completion queue and the size of the pollset whose memory is /* Size of the completion queue and the size of the pollset whose memory is
allocated right after that of completion queue */ allocated right after that of completion queue */
return sizeof(grpc_completion_queue) + cc->poller_vtable->size(); return sizeof(grpc_completion_queue) + cc->poller_vtable->size();
@ -450,7 +480,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
} }
} }
void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { static void cq_begin_op(grpc_completion_queue *cc, void *tag) {
cq_data *cqd = &cc->data; cq_data *cqd = &cc->data;
#ifndef NDEBUG #ifndef NDEBUG
gpr_mu_lock(cqd->mu); gpr_mu_lock(cqd->mu);
@ -468,8 +498,12 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
gpr_ref(&cqd->pending_events); gpr_ref(&cqd->pending_events);
} }
void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
cc->vtable->begin_op(cc, tag);
}
#ifndef NDEBUG #ifndef NDEBUG
void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) { static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {
cq_data *cqd = &cc->data; cq_data *cqd = &cc->data;
int found = 0; int found = 0;
if (lock_cq) { if (lock_cq) {
@ -493,28 +527,25 @@ void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) {
GPR_ASSERT(found); GPR_ASSERT(found);
} }
#else #else
void check_tag_in_cq(grpc_completion_queue *cc, void *tag, bool lock_cq) {} static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {}
#endif #endif
/* Forward declaration */
static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cc);
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
* type of GRPC_CQ_NEXT) */ * type of GRPC_CQ_NEXT) */
void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
void *tag, grpc_error *error, grpc_completion_queue *cc, void *tag,
void (*done)(grpc_exec_ctx *exec_ctx, grpc_error *error,
void *done_arg, void (*done)(grpc_exec_ctx *exec_ctx,
grpc_cq_completion *storage), void *done_arg,
void *done_arg, grpc_cq_completion *storage) { grpc_cq_completion *storage),
GPR_TIMER_BEGIN("grpc_cq_end_op_for_next", 0); void *done_arg, grpc_cq_completion *storage) {
GPR_TIMER_BEGIN("cq_end_op_for_next", 0);
if (grpc_api_trace || if (grpc_api_trace ||
(grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) { (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) {
const char *errmsg = grpc_error_string(error); const char *errmsg = grpc_error_string(error);
GRPC_API_TRACE( GRPC_API_TRACE(
"grpc_cq_end_op_for_mext(exec_ctx=%p, cc=%p, tag=%p, error=%s, " "cq_end_op_for_next(exec_ctx=%p, cc=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)", "done=%p, done_arg=%p, storage=%p)",
7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) {
@ -530,7 +561,7 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
storage->done_arg = done_arg; storage->done_arg = done_arg;
storage->next = (uintptr_t)(is_success); storage->next = (uintptr_t)(is_success);
check_tag_in_cq(cc, tag, true); /* Used in debug builds only */ cq_check_tag(cc, tag, true); /* Used in debug builds only */
/* Add the completion to the queue */ /* Add the completion to the queue */
cq_event_queue_push(&cqd->queue, storage); cq_event_queue_push(&cqd->queue, storage);
@ -554,30 +585,30 @@ void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
gpr_mu_unlock(cqd->mu); gpr_mu_unlock(cqd->mu);
} }
GPR_TIMER_END("grpc_cq_end_op_for_next", 0); GPR_TIMER_END("cq_end_op_for_next", 0);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
* type of GRPC_CQ_PLUCK) */ * type of GRPC_CQ_PLUCK) */
void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cc, void *tag, grpc_completion_queue *cc, void *tag,
grpc_error *error, grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg, void *done_arg,
grpc_cq_completion *storage), grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) { void *done_arg, grpc_cq_completion *storage) {
cq_data *cqd = &cc->data; cq_data *cqd = &cc->data;
int is_success = (error == GRPC_ERROR_NONE); int is_success = (error == GRPC_ERROR_NONE);
GPR_TIMER_BEGIN("grpc_cq_end_op_for_pluck", 0); GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0);
if (grpc_api_trace || if (grpc_api_trace ||
(grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) { (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) {
const char *errmsg = grpc_error_string(error); const char *errmsg = grpc_error_string(error);
GRPC_API_TRACE( GRPC_API_TRACE(
"grpc_cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, " "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)", "done=%p, done_arg=%p, storage=%p)",
7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) {
@ -591,7 +622,7 @@ void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success)); storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success));
gpr_mu_lock(cqd->mu); gpr_mu_lock(cqd->mu);
check_tag_in_cq(cc, tag, false); /* Used in debug builds only */ cq_check_tag(cc, tag, false); /* Used in debug builds only */
/* Add to the list of completions */ /* Add to the list of completions */
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
@ -625,7 +656,7 @@ void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(cqd->mu); gpr_mu_unlock(cqd->mu);
} }
GPR_TIMER_END("grpc_cq_end_op_for_pluck", 0); GPR_TIMER_END("cq_end_op_for_pluck", 0);
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
@ -698,8 +729,8 @@ static void dump_pending_tags(grpc_completion_queue *cc) {
static void dump_pending_tags(grpc_completion_queue *cc) {} static void dump_pending_tags(grpc_completion_queue *cc) {}
#endif #endif
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
gpr_timespec deadline, void *reserved) { void *reserved) {
grpc_event ret; grpc_event ret;
gpr_timespec now; gpr_timespec now;
cq_data *cqd = &cc->data; cq_data *cqd = &cc->data;
@ -827,6 +858,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
return ret; return ret;
} }
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) {
return cc->vtable->next(cc, deadline, reserved);
}
static int add_plucker(grpc_completion_queue *cc, void *tag, static int add_plucker(grpc_completion_queue *cc, void *tag,
grpc_pollset_worker **worker) { grpc_pollset_worker **worker) {
cq_data *cqd = &cc->data; cq_data *cqd = &cc->data;
@ -885,8 +921,8 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0;
} }
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline, void *reserved) { gpr_timespec deadline, void *reserved) {
grpc_event ret; grpc_event ret;
grpc_cq_completion *c; grpc_cq_completion *c;
grpc_cq_completion *prev; grpc_cq_completion *prev;
@ -1019,6 +1055,11 @@ done:
return ret; return ret;
} }
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline, void *reserved) {
return cc->vtable->pluck(cc, tag, deadline, reserved);
}
/* Finishes the completion queue shutdown. This means that there are no more /* Finishes the completion queue shutdown. This means that there are no more
completion events / tags expected from the completion queue completion events / tags expected from the completion queue
- Must be called under completion queue lock - Must be called under completion queue lock

@ -80,8 +80,6 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc);
#define GRPC_CQ_INTERNAL_UNREF(cc, reason) grpc_cq_internal_unref(cc) #define GRPC_CQ_INTERNAL_UNREF(cc, reason) grpc_cq_internal_unref(cc)
#endif #endif
size_t grpc_cq_size(grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish /* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made. shutdown until a corrensponding grpc_cq_end_* call is made.
\a tag is currently used only in debug builds. */ \a tag is currently used only in debug builds. */
@ -89,20 +87,6 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
/* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to /* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to
grpc_cq_begin_op */ grpc_cq_begin_op */
void grpc_cq_end_op_for_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
void *tag, grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage);
void grpc_cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cc, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx,
void *done_arg,
grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage);
void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
void *tag, grpc_error *error, void *tag, grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,

Loading…
Cancel
Save