Rebase with head and resolve conflicts

pull/11703/head
yang-g 8 years ago
parent 0eaf7debd2
commit 533fbd362f
  1. 76
      src/core/lib/surface/completion_queue.c

@ -196,7 +196,7 @@ typedef struct cq_vtable {
void (*init)(void *data);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq);
void (*destroy)(void *data);
void (*begin_op)(grpc_completion_queue *cq, void *tag);
int (*begin_op)(grpc_completion_queue *cq, void *tag);
void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
@ -288,8 +288,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq);
static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
static int cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
static int cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq, void *tag,
@ -522,28 +522,55 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
}
}
static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
#ifndef NDEBUG
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
int found = 0;
if (lock_cq) {
gpr_mu_lock(cq->mu);
}
for (int i = 0; i < (int)cq->outstanding_tag_count; i++) {
if (cq->outstanding_tags[i] == tag) {
cq->outstanding_tag_count--;
GPR_SWAP(void *, cq->outstanding_tags[i],
cq->outstanding_tags[cq->outstanding_tag_count]);
found = 1;
break;
}
}
if (lock_cq) {
gpr_mu_unlock(cq->mu);
}
GPR_ASSERT(found);
}
#else
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
#endif
static int cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
cq_next_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(!cqd->shutdown_called);
while (true) {
gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events.count);
gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events);
if (count == 0) {
cq_check_tag(cq, tag, true); /* Used in debug builds only */
return 1;
} else if (gpr_atm_no_barrier_cas(&cqd->pending_events.count, count,
count + 1)) {
} else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) {
break;
}
}
return 0;
}
static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
static int cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(!cqd->shutdown_called);
gpr_ref(&cqd->pending_events);
return 0;
}
void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
int grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
#ifndef NDEBUG
gpr_mu_lock(cq->mu);
if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
@ -555,35 +582,8 @@ void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
gpr_mu_unlock(cq->mu);
#endif
cq->vtable->begin_op(cq, tag);
}
#ifndef NDEBUG
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
int found = 0;
if (lock_cq) {
gpr_mu_lock(cq->mu);
}
for (int i = 0; i < (int)cq->outstanding_tag_count; i++) {
if (cq->outstanding_tags[i] == tag) {
cq->outstanding_tag_count--;
GPR_SWAP(void *, cq->outstanding_tags[i],
cq->outstanding_tags[cq->outstanding_tag_count]);
found = 1;
break;
}
}
if (lock_cq) {
gpr_mu_unlock(cq->mu);
}
GPR_ASSERT(found);
return cq->vtable->begin_op(cq, tag);
}
#else
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
#endif
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion

Loading…
Cancel
Save