|
|
|
@ -263,12 +263,6 @@ typedef struct cq_callback_data { |
|
|
|
|
useful for avoiding locks to check the queue */ |
|
|
|
|
gpr_atm things_queued_ever; |
|
|
|
|
|
|
|
|
|
/** 0 initially. 1 once we completed shutting */ |
|
|
|
|
/* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
|
|
|
|
|
* (pending_events == 0). So consider removing this in future and use |
|
|
|
|
* pending_events */ |
|
|
|
|
gpr_atm shutdown; |
|
|
|
|
|
|
|
|
|
/** 0 initially. 1 once we initiated shutdown */ |
|
|
|
|
bool shutdown_called; |
|
|
|
|
|
|
|
|
@ -308,6 +302,12 @@ static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag); |
|
|
|
|
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag); |
|
|
|
|
static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag); |
|
|
|
|
|
|
|
|
|
// A cq_end_op function is called when an operation on a given CQ with
|
|
|
|
|
// a given tag has completed. The storage argument is a reference to the
|
|
|
|
|
// space reserved for this completion as it is placed into the corresponding
|
|
|
|
|
// queue. The done argument is a callback that will be invoked when it is
|
|
|
|
|
// safe to free up that storage. The storage MUST NOT be freed until the
|
|
|
|
|
// done callback is invoked.
|
|
|
|
|
static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag, |
|
|
|
|
grpc_error* error, |
|
|
|
|
void (*done)(void* done_arg, |
|
|
|
@ -332,8 +332,11 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline, |
|
|
|
|
static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, |
|
|
|
|
gpr_timespec deadline, void* reserved); |
|
|
|
|
|
|
|
|
|
static void cq_init_next(void* data, grpc_core::CQCallbackInterface*); |
|
|
|
|
static void cq_init_pluck(void* data, grpc_core::CQCallbackInterface*); |
|
|
|
|
// Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
|
|
|
|
|
static void cq_init_next(void* data, |
|
|
|
|
grpc_core::CQCallbackInterface* shutdown_callback); |
|
|
|
|
static void cq_init_pluck(void* data, |
|
|
|
|
grpc_core::CQCallbackInterface* shutdown_callback); |
|
|
|
|
static void cq_init_callback(void* data, |
|
|
|
|
grpc_core::CQCallbackInterface* shutdown_callback); |
|
|
|
|
static void cq_destroy_next(void* data); |
|
|
|
@ -494,7 +497,11 @@ grpc_completion_queue* grpc_completion_queue_create_internal( |
|
|
|
|
return cq; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cq_init_next(void* ptr, grpc_core::CQCallbackInterface*) { |
|
|
|
|
static void cq_init_next(void* ptr, |
|
|
|
|
grpc_core::CQCallbackInterface* shutdown_callback) { |
|
|
|
|
// shutdown_callback should not be provided to this CQ variant
|
|
|
|
|
GPR_ASSERT(shutdown_callback == nullptr); |
|
|
|
|
|
|
|
|
|
cq_next_data* cqd = static_cast<cq_next_data*>(ptr); |
|
|
|
|
/* Initial count is dropped by grpc_completion_queue_shutdown */ |
|
|
|
|
gpr_atm_no_barrier_store(&cqd->pending_events, 1); |
|
|
|
@ -509,7 +516,11 @@ static void cq_destroy_next(void* ptr) { |
|
|
|
|
cq_event_queue_destroy(&cqd->queue); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cq_init_pluck(void* ptr, grpc_core::CQCallbackInterface*) { |
|
|
|
|
static void cq_init_pluck(void* ptr, |
|
|
|
|
grpc_core::CQCallbackInterface* shutdown_callback) { |
|
|
|
|
// shutdown_callback should not be provided to this CQ variant
|
|
|
|
|
GPR_ASSERT(shutdown_callback == nullptr); |
|
|
|
|
|
|
|
|
|
cq_pluck_data* cqd = static_cast<cq_pluck_data*>(ptr); |
|
|
|
|
/* Initial count is dropped by grpc_completion_queue_shutdown */ |
|
|
|
|
gpr_atm_no_barrier_store(&cqd->pending_events, 1); |
|
|
|
@ -531,7 +542,6 @@ static void cq_init_callback( |
|
|
|
|
cq_callback_data* cqd = static_cast<cq_callback_data*>(ptr); |
|
|
|
|
/* Initial count is dropped by grpc_completion_queue_shutdown */ |
|
|
|
|
gpr_atm_no_barrier_store(&cqd->pending_events, 1); |
|
|
|
|
gpr_atm_no_barrier_store(&cqd->shutdown, 0); |
|
|
|
|
cqd->shutdown_called = false; |
|
|
|
|
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); |
|
|
|
|
cqd->shutdown_callback = shutdown_callback; |
|
|
|
@ -838,7 +848,8 @@ static void cq_end_op_for_callback( |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* We don't care for the storage content */ |
|
|
|
|
// The callback-based CQ isn't really a queue at all and thus has no need
|
|
|
|
|
// for reserved storage. Invoke the done callback right away to release it.
|
|
|
|
|
done(done_arg, storage); |
|
|
|
|
|
|
|
|
|
gpr_mu_lock(cq->mu); |
|
|
|
@ -1336,8 +1347,6 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { |
|
|
|
|
auto* callback = cqd->shutdown_callback; |
|
|
|
|
|
|
|
|
|
GPR_ASSERT(cqd->shutdown_called); |
|
|
|
|
GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); |
|
|
|
|
gpr_atm_no_barrier_store(&cqd->shutdown, 1); |
|
|
|
|
|
|
|
|
|
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); |
|
|
|
|
callback->Run(true); |
|
|
|
@ -1347,7 +1356,7 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) { |
|
|
|
|
cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq); |
|
|
|
|
|
|
|
|
|
/* Need an extra ref for cq here because:
|
|
|
|
|
* We call cq_finish_shutdown_pluck() below, that would call pollset shutdown. |
|
|
|
|
* We call cq_finish_shutdown_callback() below, which calls pollset shutdown. |
|
|
|
|
* Pollset shutdown decrements the cq ref count which can potentially destroy |
|
|
|
|
* the cq (if that happens to be the last ref). |
|
|
|
|
* Creating an extra ref here prevents the cq from getting destroyed while |
|
|
|
|