|
|
|
@ -196,7 +196,7 @@ typedef struct cq_vtable { |
|
|
|
|
void (*init)(void *data); |
|
|
|
|
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); |
|
|
|
|
void (*destroy)(void *data); |
|
|
|
|
void (*begin_op)(grpc_completion_queue *cq, void *tag); |
|
|
|
|
bool (*begin_op)(grpc_completion_queue *cq, void *tag); |
|
|
|
|
void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, |
|
|
|
|
grpc_error *error, |
|
|
|
|
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, |
|
|
|
@ -288,8 +288,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, |
|
|
|
|
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_completion_queue *cq); |
|
|
|
|
|
|
|
|
|
static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); |
|
|
|
|
static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); |
|
|
|
|
static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); |
|
|
|
|
static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); |
|
|
|
|
|
|
|
|
|
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_completion_queue *cq, void *tag, |
|
|
|
@ -522,33 +522,6 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { |
|
|
|
|
cq_next_data *cqd = DATA_FROM_CQ(cq); |
|
|
|
|
GPR_ASSERT(!cqd->shutdown_called); |
|
|
|
|
gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { |
|
|
|
|
cq_pluck_data *cqd = DATA_FROM_CQ(cq); |
|
|
|
|
GPR_ASSERT(!cqd->shutdown_called); |
|
|
|
|
gpr_ref(&cqd->pending_events); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { |
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
gpr_mu_lock(cq->mu); |
|
|
|
|
if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { |
|
|
|
|
cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); |
|
|
|
|
cq->outstanding_tags = |
|
|
|
|
gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * |
|
|
|
|
cq->outstanding_tag_capacity); |
|
|
|
|
} |
|
|
|
|
cq->outstanding_tags[cq->outstanding_tag_count++] = tag; |
|
|
|
|
gpr_mu_unlock(cq->mu); |
|
|
|
|
#endif |
|
|
|
|
cq->vtable->begin_op(cq, tag); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { |
|
|
|
|
int found = 0; |
|
|
|
@ -576,6 +549,41 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { |
|
|
|
|
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { |
|
|
|
|
cq_next_data *cqd = DATA_FROM_CQ(cq); |
|
|
|
|
while (true) { |
|
|
|
|
gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events); |
|
|
|
|
if (count == 0) { |
|
|
|
|
return false; |
|
|
|
|
} else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { |
|
|
|
|
cq_pluck_data *cqd = DATA_FROM_CQ(cq); |
|
|
|
|
GPR_ASSERT(!cqd->shutdown_called); |
|
|
|
|
gpr_ref(&cqd->pending_events); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { |
|
|
|
|
#ifndef NDEBUG |
|
|
|
|
gpr_mu_lock(cq->mu); |
|
|
|
|
if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { |
|
|
|
|
cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); |
|
|
|
|
cq->outstanding_tags = |
|
|
|
|
gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * |
|
|
|
|
cq->outstanding_tag_capacity); |
|
|
|
|
} |
|
|
|
|
cq->outstanding_tags[cq->outstanding_tag_count++] = tag; |
|
|
|
|
gpr_mu_unlock(cq->mu); |
|
|
|
|
#endif |
|
|
|
|
return cq->vtable->begin_op(cq, tag); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
|
|
|
|
|
* completion |
|
|
|
|
* type of GRPC_CQ_NEXT) */ |
|
|
|
|