From 8ac5c6dedad0482ef6864ef310c79bf2e9c0e19b Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 11 May 2017 14:13:11 -0700 Subject: [PATCH] Merge with master and fix a bad-merge --- src/core/lib/surface/completion_queue.c | 164 ++++++++++++------------ 1 file changed, 83 insertions(+), 81 deletions(-) diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index d49847f461f..52f2df0c6ce 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -609,14 +609,16 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0); - if (grpc_api_trace || - (grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) { + if (GRPC_TRACER_ON(grpc_api_trace) || + (GRPC_TRACER_ON(grpc_trace_operation_failures) && + error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); - if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_trace_operation_failures) && + error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } } @@ -829,7 +831,7 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, /* The main polling work happens in grpc_pollset_work */ gpr_mu_lock(cqd->mu); grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), - NULL, now, deadline); + NULL, now, iteration_deadline); gpr_mu_unlock(cqd->mu); if (err != GRPC_ERROR_NONE) { @@ -1029,97 +1031,97 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, } is_finished_arg.first_loop = false; del_plucker(cc, tag, &worker); - done: - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "pluck"); - grpc_exec_ctx_finish(&exec_ctx); - GPR_ASSERT(is_finished_arg.stolen_completion == NULL); + } +done: + GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "pluck"); + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(is_finished_arg.stolen_completion == NULL); - GPR_TIMER_END("grpc_completion_queue_pluck", 0); + GPR_TIMER_END("grpc_completion_queue_pluck", 0); - return ret; - } + return ret; +} - grpc_event grpc_completion_queue_pluck(grpc_completion_queue * cc, void *tag, - gpr_timespec deadline, - void *reserved) { - return cc->vtable->pluck(cc, tag, deadline, reserved); - } +grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, + gpr_timespec deadline, void *reserved) { + return cc->vtable->pluck(cc, tag, deadline, reserved); +} - /* Finishes the completion queue shutdown. This means that there are no more - completion events / tags expected from the completion queue - - Must be called under completion queue lock - - Must be called only once in completion queue's lifetime - - grpc_completion_queue_shutdown() MUST have been called before calling - this function */ - static void cq_finish_shutdown(grpc_exec_ctx * exec_ctx, - grpc_completion_queue * cc) { - cq_data *cqd = &cc->data; - - GPR_ASSERT(cqd->shutdown_called); - GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); - gpr_atm_no_barrier_store(&cqd->shutdown, 1); - - cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cqd->pollset_shutdown_done); - } +/* Finishes the completion queue shutdown. This means that there are no more + completion events / tags expected from the completion queue + - Must be called under completion queue lock + - Must be called only once in completion queue's lifetime + - grpc_completion_queue_shutdown() MUST have been called before calling + this function */ +static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc) { + cq_data *cqd = &cc->data; - /* Shutdown simply drops a ref that we reserved at creation time; if we drop - to zero here, then enter shutdown mode and wake up any waiters */ - void grpc_completion_queue_shutdown(grpc_completion_queue * cc) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); - GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); - cq_data *cqd = &cc->data; + GPR_ASSERT(cqd->shutdown_called); + GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); + gpr_atm_no_barrier_store(&cqd->shutdown, 1); - gpr_mu_lock(cqd->mu); - if (cqd->shutdown_called) { - gpr_mu_unlock(cqd->mu); - GPR_TIMER_END("grpc_completion_queue_shutdown", 0); - return; - } - cqd->shutdown_called = 1; - if (gpr_unref(&cqd->pending_events)) { - cq_finish_shutdown(&exec_ctx, cc); - } + cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), + &cqd->pollset_shutdown_done); +} + +/* Shutdown simply drops a ref that we reserved at creation time; if we drop + to zero here, then enter shutdown mode and wake up any waiters */ +void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); + GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); + cq_data *cqd = &cc->data; + + gpr_mu_lock(cqd->mu); + if (cqd->shutdown_called) { gpr_mu_unlock(cqd->mu); - grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); + return; } + cqd->shutdown_called = 1; + if (gpr_unref(&cqd->pending_events)) { + cq_finish_shutdown(&exec_ctx, cc); + } + gpr_mu_unlock(cqd->mu); + grpc_exec_ctx_finish(&exec_ctx); + GPR_TIMER_END("grpc_completion_queue_shutdown", 0); +} - void grpc_completion_queue_destroy(grpc_completion_queue * cc) { - GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); - GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); - grpc_completion_queue_shutdown(cc); - - /* TODO (sreek): This should not ideally be here. Refactor it into the - * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */ - if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) { - GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0); - } +void grpc_completion_queue_destroy(grpc_completion_queue *cc) { + GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); + GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); + grpc_completion_queue_shutdown(cc); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); - GPR_TIMER_END("grpc_completion_queue_destroy", 0); + /* TODO (sreek): This should not ideally be here. Refactor it into the + * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */ + if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) { + GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0); } - grpc_pollset *grpc_cq_pollset(grpc_completion_queue * cc) { - return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL; - } + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy"); + grpc_exec_ctx_finish(&exec_ctx); + GPR_TIMER_END("grpc_completion_queue_destroy", 0); +} - grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset * ps) { - return CQ_FROM_POLLSET(ps); - } +grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { + return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL; +} - void grpc_cq_mark_server_cq(grpc_completion_queue * cc) { - cc->data.is_server_cq = 1; - } +grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { + return CQ_FROM_POLLSET(ps); +} - bool grpc_cq_is_server_cq(grpc_completion_queue * cc) { - return cc->data.is_server_cq; - } +void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { + cc->data.is_server_cq = 1; +} - bool grpc_cq_can_listen(grpc_completion_queue * cc) { - return cc->poller_vtable->can_listen; - } +bool grpc_cq_is_server_cq(grpc_completion_queue *cc) { + return cc->data.is_server_cq; +} + +bool grpc_cq_can_listen(grpc_completion_queue *cc) { + return cc->poller_vtable->can_listen; +}