Merge with master and fix a bad-merge

pull/10662/head
Sree Kuchibhotla 8 years ago
parent 0fb74c2bb7
commit 8ac5c6deda
  1. 164
      src/core/lib/surface/completion_queue.c

@ -609,14 +609,16 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0); GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0);
if (grpc_api_trace || if (GRPC_TRACER_ON(grpc_api_trace) ||
(grpc_trace_operation_failures && error != GRPC_ERROR_NONE)) { (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE)) {
const char *errmsg = grpc_error_string(error); const char *errmsg = grpc_error_string(error);
GRPC_API_TRACE( GRPC_API_TRACE(
"cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, " "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, "
"done=%p, done_arg=%p, storage=%p)", "done=%p, done_arg=%p, storage=%p)",
7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { if (GRPC_TRACER_ON(grpc_trace_operation_failures) &&
error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
} }
} }
@ -829,7 +831,7 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
/* The main polling work happens in grpc_pollset_work */ /* The main polling work happens in grpc_pollset_work */
gpr_mu_lock(cqd->mu); gpr_mu_lock(cqd->mu);
grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc),
NULL, now, deadline); NULL, now, iteration_deadline);
gpr_mu_unlock(cqd->mu); gpr_mu_unlock(cqd->mu);
if (err != GRPC_ERROR_NONE) { if (err != GRPC_ERROR_NONE) {
@ -1029,97 +1031,97 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
} }
is_finished_arg.first_loop = false; is_finished_arg.first_loop = false;
del_plucker(cc, tag, &worker); del_plucker(cc, tag, &worker);
done: }
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); done:
GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "pluck"); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
grpc_exec_ctx_finish(&exec_ctx); GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "pluck");
GPR_ASSERT(is_finished_arg.stolen_completion == NULL); grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
GPR_TIMER_END("grpc_completion_queue_pluck", 0); GPR_TIMER_END("grpc_completion_queue_pluck", 0);
return ret; return ret;
} }
grpc_event grpc_completion_queue_pluck(grpc_completion_queue * cc, void *tag, grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline, gpr_timespec deadline, void *reserved) {
void *reserved) { return cc->vtable->pluck(cc, tag, deadline, reserved);
return cc->vtable->pluck(cc, tag, deadline, reserved); }
}
/* Finishes the completion queue shutdown. This means that there are no more /* Finishes the completion queue shutdown. This means that there are no more
completion events / tags expected from the completion queue completion events / tags expected from the completion queue
- Must be called under completion queue lock - Must be called under completion queue lock
- Must be called only once in completion queue's lifetime - Must be called only once in completion queue's lifetime
- grpc_completion_queue_shutdown() MUST have been called before calling - grpc_completion_queue_shutdown() MUST have been called before calling
this function */ this function */
static void cq_finish_shutdown(grpc_exec_ctx * exec_ctx, static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx,
grpc_completion_queue * cc) { grpc_completion_queue *cc) {
cq_data *cqd = &cc->data; cq_data *cqd = &cc->data;
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
gpr_atm_no_barrier_store(&cqd->shutdown, 1);
cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
&cqd->pollset_shutdown_done);
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop GPR_ASSERT(cqd->shutdown_called);
to zero here, then enter shutdown mode and wake up any waiters */ GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
void grpc_completion_queue_shutdown(grpc_completion_queue * cc) { gpr_atm_no_barrier_store(&cqd->shutdown, 1);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc));
cq_data *cqd = &cc->data;
gpr_mu_lock(cqd->mu); cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc),
if (cqd->shutdown_called) { &cqd->pollset_shutdown_done);
gpr_mu_unlock(cqd->mu); }
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return; /* Shutdown simply drops a ref that we reserved at creation time; if we drop
} to zero here, then enter shutdown mode and wake up any waiters */
cqd->shutdown_called = 1; void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
if (gpr_unref(&cqd->pending_events)) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
cq_finish_shutdown(&exec_ctx, cc); GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
} GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc));
cq_data *cqd = &cc->data;
gpr_mu_lock(cqd->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cqd->mu); gpr_mu_unlock(cqd->mu);
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_completion_queue_shutdown", 0); GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return;
} }
cqd->shutdown_called = 1;
if (gpr_unref(&cqd->pending_events)) {
cq_finish_shutdown(&exec_ctx, cc);
}
gpr_mu_unlock(cqd->mu);
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
}
void grpc_completion_queue_destroy(grpc_completion_queue * cc) { void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc));
GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
grpc_completion_queue_shutdown(cc); grpc_completion_queue_shutdown(cc);
/* TODO (sreek): This should not ideally be here. Refactor it into the
* cq_vtable (perhaps have a create/destroy methods in the cq vtable) */
if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) {
GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0);
}
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; /* TODO (sreek): This should not ideally be here. Refactor it into the
GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy"); * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */
grpc_exec_ctx_finish(&exec_ctx); if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) {
GPR_TIMER_END("grpc_completion_queue_destroy", 0); GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0);
} }
grpc_pollset *grpc_cq_pollset(grpc_completion_queue * cc) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL; GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy");
} grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_completion_queue_destroy", 0);
}
grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset * ps) { grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
return CQ_FROM_POLLSET(ps); return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL;
} }
void grpc_cq_mark_server_cq(grpc_completion_queue * cc) { grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
cc->data.is_server_cq = 1; return CQ_FROM_POLLSET(ps);
} }
bool grpc_cq_is_server_cq(grpc_completion_queue * cc) { void grpc_cq_mark_server_cq(grpc_completion_queue *cc) {
return cc->data.is_server_cq; cc->data.is_server_cq = 1;
} }
bool grpc_cq_can_listen(grpc_completion_queue * cc) { bool grpc_cq_is_server_cq(grpc_completion_queue *cc) {
return cc->poller_vtable->can_listen; return cc->data.is_server_cq;
} }
bool grpc_cq_can_listen(grpc_completion_queue *cc) {
return cc->poller_vtable->can_listen;
}

Loading…
Cancel
Save