@ -204,15 +204,15 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
typedef struct cq_vtable {
grpc_cq_completion_type cq_completion_type ;
size_t ( * size ) ( ) ;
void ( * begin_op ) ( grpc_completion_queue * cc , void * tag ) ;
void ( * end_op ) ( grpc_exec_ctx * exec_ctx , grpc_completion_queue * cc , void * tag ,
void ( * begin_op ) ( grpc_completion_queue * cq , void * tag ) ;
void ( * end_op ) ( grpc_exec_ctx * exec_ctx , grpc_completion_queue * cq , void * tag ,
grpc_error * error ,
void ( * done ) ( grpc_exec_ctx * exec_ctx , void * done_arg ,
grpc_cq_completion * storage ) ,
void * done_arg , grpc_cq_completion * storage ) ;
grpc_event ( * next ) ( grpc_completion_queue * cc , gpr_timespec deadline ,
grpc_event ( * next ) ( grpc_completion_queue * cq , gpr_timespec deadline ,
void * reserved ) ;
grpc_event ( * pluck ) ( grpc_completion_queue * cc , void * tag ,
grpc_event ( * pluck ) ( grpc_completion_queue * cq , void * tag ,
gpr_timespec deadline , void * reserved ) ;
} cq_vtable ;
@ -282,14 +282,14 @@ struct grpc_completion_queue {
/* Forward declarations */
static void cq_finish_shutdown ( grpc_exec_ctx * exec_ctx ,
grpc_completion_queue * cc ) ;
grpc_completion_queue * cq ) ;
static size_t cq_size ( grpc_completion_queue * cc ) ;
static size_t cq_size ( grpc_completion_queue * cq ) ;
static void cq_begin_op ( grpc_completion_queue * cc , void * tag ) ;
static void cq_begin_op ( grpc_completion_queue * cq , void * tag ) ;
static void cq_end_op_for_next ( grpc_exec_ctx * exec_ctx ,
grpc_completion_queue * cc , void * tag ,
grpc_completion_queue * cq , void * tag ,
grpc_error * error ,
void ( * done ) ( grpc_exec_ctx * exec_ctx ,
void * done_arg ,
@ -297,17 +297,17 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
void * done_arg , grpc_cq_completion * storage ) ;
static void cq_end_op_for_pluck ( grpc_exec_ctx * exec_ctx ,
grpc_completion_queue * cc , void * tag ,
grpc_completion_queue * cq , void * tag ,
grpc_error * error ,
void ( * done ) ( grpc_exec_ctx * exec_ctx ,
void * done_arg ,
grpc_cq_completion * storage ) ,
void * done_arg , grpc_cq_completion * storage ) ;
static grpc_event cq_next ( grpc_completion_queue * cc , gpr_timespec deadline ,
static grpc_event cq_next ( grpc_completion_queue * cq , gpr_timespec deadline ,
void * reserved ) ;
static grpc_event cq_pluck ( grpc_completion_queue * cc , void * tag ,
static grpc_event cq_pluck ( grpc_completion_queue * cq , void * tag ,
gpr_timespec deadline , void * reserved ) ;
/* Completion queue vtables based on the completion-type */
@ -343,7 +343,7 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true);
gpr_free ( _ev ) ; \
}
static void on_pollset_shutdown_done ( grpc_exec_ctx * exec_ctx , void * cc ,
static void on_pollset_shutdown_done ( grpc_exec_ctx * exec_ctx , void * cq ,
grpc_error * error ) ;
static void cq_event_queue_init ( grpc_cq_event_queue * q ) {
@ -356,9 +356,9 @@ static void cq_event_queue_destroy(grpc_cq_event_queue *q) {
gpr_mpscq_destroy ( & q - > queue ) ;
}
static void cq_event_queue_push ( grpc_cq_event_queue * q , grpc_cq_completion * c ) {
static bool cq_event_queue_push ( grpc_cq_event_queue * q , grpc_cq_completion * c ) {
gpr_mpscq_push ( & q - > queue , ( gpr_mpscq_node * ) c ) ;
gpr_atm_no_barrier_fetch_add ( & q - > num_queue_items , 1 ) ;
return gpr_atm_no_barrier_fetch_add ( & q - > num_queue_items , 1 ) = = 0 ;
}
static grpc_cq_completion * cq_event_queue_pop ( grpc_cq_event_queue * q ) {
@ -381,16 +381,16 @@ static long cq_event_queue_num_items(grpc_cq_event_queue *q) {
return ( long ) gpr_atm_no_barrier_load ( & q - > num_queue_items ) ;
}
static size_t cq_size ( grpc_completion_queue * cc ) {
static size_t cq_size ( grpc_completion_queue * cq ) {
/* Size of the completion queue and the size of the pollset whose memory is
allocated right after that of completion queue */
return sizeof ( grpc_completion_queue ) + cc - > poller_vtable - > size ( ) ;
return sizeof ( grpc_completion_queue ) + cq - > poller_vtable - > size ( ) ;
}
grpc_completion_queue * grpc_completion_queue_create_internal (
grpc_cq_completion_type completion_type ,
grpc_cq_polling_type polling_type ) {
grpc_completion_queue * cc ;
grpc_completion_queue * cq ;
GPR_TIMER_BEGIN ( " grpc_completion_queue_create_internal " , 0 ) ;
@ -403,13 +403,13 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
const cq_poller_vtable * poller_vtable =
& g_poller_vtable_by_poller_type [ polling_type ] ;
cc = gpr_zalloc ( sizeof ( grpc_completion_queue ) + poller_vtable - > size ( ) ) ;
cq_data * cqd = & cc - > data ;
cq = gpr_zalloc ( sizeof ( grpc_completion_queue ) + poller_vtable - > size ( ) ) ;
cq_data * cqd = & cq - > data ;
cc - > vtable = vtable ;
cc - > poller_vtable = poller_vtable ;
cq - > vtable = vtable ;
cq - > poller_vtable = poller_vtable ;
poller_vtable - > init ( POLLSET_FROM_CQ ( cc ) , & cc - > data . mu ) ;
poller_vtable - > init ( POLLSET_FROM_CQ ( cq ) , & cq - > data . mu ) ;
# ifndef NDEBUG
cqd - > outstanding_tags = NULL ;
@ -432,69 +432,69 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
cqd - > outstanding_tag_count = 0 ;
# endif
cq_event_queue_init ( & cqd - > queue ) ;
grpc_closure_init ( & cqd - > pollset_shutdown_done , on_pollset_shutdown_done , cc ,
grpc_closure_init ( & cqd - > pollset_shutdown_done , on_pollset_shutdown_done , cq ,
grpc_schedule_on_exec_ctx ) ;
GPR_TIMER_END ( " grpc_completion_queue_create_internal " , 0 ) ;
return cc ;
return cq ;
}
grpc_cq_completion_type grpc_get_cq_completion_type ( grpc_completion_queue * cc ) {
return cc - > vtable - > cq_completion_type ;
grpc_cq_completion_type grpc_get_cq_completion_type ( grpc_completion_queue * cq ) {
return cq - > vtable - > cq_completion_type ;
}
int grpc_get_cq_poll_num ( grpc_completion_queue * cc ) {
int grpc_get_cq_poll_num ( grpc_completion_queue * cq ) {
int cur_num_polls ;
gpr_mu_lock ( cc - > data . mu ) ;
cur_num_polls = cc - > data . num_polls ;
gpr_mu_unlock ( cc - > data . mu ) ;
gpr_mu_lock ( cq - > data . mu ) ;
cur_num_polls = cq - > data . num_polls ;
gpr_mu_unlock ( cq - > data . mu ) ;
return cur_num_polls ;
}
# ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref ( grpc_completion_queue * cc , const char * reason ,
void grpc_cq_internal_ref ( grpc_completion_queue * cq , const char * reason ,
const char * file , int line ) {
cq_data * cqd = & cc - > data ;
gpr_log ( file , line , GPR_LOG_SEVERITY_DEBUG , " CQ:%p ref %d -> %d %s " , cc ,
cq_data * cqd = & cq - > data ;
gpr_log ( file , line , GPR_LOG_SEVERITY_DEBUG , " CQ:%p ref %d -> %d %s " , cq ,
( int ) cqd - > owning_refs . count , ( int ) cqd - > owning_refs . count + 1 , reason ) ;
# else
void grpc_cq_internal_ref ( grpc_completion_queue * cc ) {
cq_data * cqd = & cc - > data ;
void grpc_cq_internal_ref ( grpc_completion_queue * cq ) {
cq_data * cqd = & cq - > data ;
# endif
gpr_ref ( & cqd - > owning_refs ) ;
}
static void on_pollset_shutdown_done ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_completion_queue * cc = arg ;
GRPC_CQ_INTERNAL_UNREF ( exec_ctx , cc , " pollset_destroy " ) ;
grpc_completion_queue * cq = arg ;
GRPC_CQ_INTERNAL_UNREF ( exec_ctx , cq , " pollset_destroy " ) ;
}
# ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_unref ( grpc_completion_queue * cc , const char * reason ,
void grpc_cq_internal_unref ( grpc_completion_queue * cq , const char * reason ,
const char * file , int line ) {
cq_data * cqd = & cc - > data ;
gpr_log ( file , line , GPR_LOG_SEVERITY_DEBUG , " CQ:%p unref %d -> %d %s " , cc ,
cq_data * cqd = & cq - > data ;
gpr_log ( file , line , GPR_LOG_SEVERITY_DEBUG , " CQ:%p unref %d -> %d %s " , cq ,
( int ) cqd - > owning_refs . count , ( int ) cqd - > owning_refs . count - 1 , reason ) ;
# else
void grpc_cq_internal_unref ( grpc_exec_ctx * exec_ctx ,
grpc_completion_queue * cc ) {
cq_data * cqd = & cc - > data ;
grpc_completion_queue * cq ) {
cq_data * cqd = & cq - > data ;
# endif
if ( gpr_unref ( & cqd - > owning_refs ) ) {
GPR_ASSERT ( cqd - > completed_head . next = = ( uintptr_t ) & cqd - > completed_head ) ;
cc - > poller_vtable - > destroy ( exec_ctx , POLLSET_FROM_CQ ( cc ) ) ;
cq - > poller_vtable - > destroy ( exec_ctx , POLLSET_FROM_CQ ( cq ) ) ;
cq_event_queue_destroy ( & cqd - > queue ) ;
# ifndef NDEBUG
gpr_free ( cqd - > outstanding_tags ) ;
# endif
gpr_free ( cc ) ;
gpr_free ( cq ) ;
}
}
static void cq_begin_op ( grpc_completion_queue * cc , void * tag ) {
cq_data * cqd = & cc - > data ;
static void cq_begin_op ( grpc_completion_queue * cq , void * tag ) {
cq_data * cqd = & cq - > data ;
# ifndef NDEBUG
gpr_mu_lock ( cqd - > mu ) ;
GPR_ASSERT ( ! cqd - > shutdown_called ) ;
@ -511,13 +511,13 @@ static void cq_begin_op(grpc_completion_queue *cc, void *tag) {
gpr_ref ( & cqd - > pending_events ) ;
}
void grpc_cq_begin_op ( grpc_completion_queue * cc , void * tag ) {
cc - > vtable - > begin_op ( cc , tag ) ;
void grpc_cq_begin_op ( grpc_completion_queue * cq , void * tag ) {
cq - > vtable - > begin_op ( cq , tag ) ;
}
# ifndef NDEBUG
static void cq_check_tag ( grpc_completion_queue * cc , void * tag , bool lock_cq ) {
cq_data * cqd = & cc - > data ;
static void cq_check_tag ( grpc_completion_queue * cq , void * tag , bool lock_cq ) {
cq_data * cqd = & cq - > data ;
int found = 0 ;
if ( lock_cq ) {
gpr_mu_lock ( cqd - > mu ) ;
@ -540,13 +540,13 @@ static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {
GPR_ASSERT ( found ) ;
}
# else
static void cq_check_tag ( grpc_completion_queue * cc , void * tag , bool lock_cq ) { }
static void cq_check_tag ( grpc_completion_queue * cq , void * tag , bool lock_cq ) { }
# endif
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
* type of GRPC_CQ_NEXT ) */
static void cq_end_op_for_next ( grpc_exec_ctx * exec_ctx ,
grpc_completion_queue * cc , void * tag ,
grpc_completion_queue * cq , void * tag ,
grpc_error * error ,
void ( * done ) ( grpc_exec_ctx * exec_ctx ,
void * done_arg ,
@ -559,16 +559,16 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
error ! = GRPC_ERROR_NONE ) ) {
const char * errmsg = grpc_error_string ( error ) ;
GRPC_API_TRACE (
" cq_end_op_for_next(exec_ctx=%p, cc =%p, tag=%p, error=%s, "
" cq_end_op_for_next(exec_ctx=%p, cq =%p, tag=%p, error=%s, "
" done=%p, done_arg=%p, storage=%p) " ,
7 , ( exec_ctx , cc , tag , errmsg , done , done_arg , storage ) ) ;
7 , ( exec_ctx , cq , tag , errmsg , done , done_arg , storage ) ) ;
if ( GRPC_TRACER_ON ( grpc_trace_operation_failures ) & &
error ! = GRPC_ERROR_NONE ) {
gpr_log ( GPR_ERROR , " Operation failed: tag=%p, error=%s " , tag , errmsg ) ;
}
}
cq_data * cqd = & cc - > data ;
cq_data * cqd = & cq - > data ;
int is_success = ( error = = GRPC_ERROR_NONE ) ;
storage - > tag = tag ;
@ -576,17 +576,20 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
storage - > done_arg = done_arg ;
storage - > next = ( uintptr_t ) ( is_success ) ;
cq_check_tag ( cc , tag , true ) ; /* Used in debug builds only */
cq_check_tag ( cq , tag , true ) ; /* Used in debug builds only */
/* Add the completion to the queue */
cq_event_queue_push ( & cqd - > queue , storage ) ;
bool is_first = cq_event_queue_push ( & cqd - > queue , storage ) ;
gpr_atm_no_barrier_fetch_add ( & cqd - > things_queued_ever , 1 ) ;
gpr_mu_lock ( cqd - > mu ) ;
int shutdown = gpr_unref ( & cqd - > pending_events ) ;
if ( ! shutdown ) {
grpc_error * kick_error = cc - > poller_vtable - > kick ( POLLSET_FROM_CQ ( cc ) , NULL ) ;
/* Only kick if this is the first item queued */
grpc_error * kick_error =
is_first ? cq - > poller_vtable - > kick ( POLLSET_FROM_CQ ( cq ) , NULL )
: GRPC_ERROR_NONE ;
gpr_mu_unlock ( cqd - > mu ) ;
if ( kick_error ! = GRPC_ERROR_NONE ) {
@ -596,7 +599,7 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF ( kick_error ) ;
}
} else {
cq_finish_shutdown ( exec_ctx , cc ) ;
cq_finish_shutdown ( exec_ctx , cq ) ;
gpr_mu_unlock ( cqd - > mu ) ;
}
@ -608,13 +611,13 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion
* type of GRPC_CQ_PLUCK ) */
static void cq_end_op_for_pluck ( grpc_exec_ctx * exec_ctx ,
grpc_completion_queue * cc , void * tag ,
grpc_completion_queue * cq , void * tag ,
grpc_error * error ,
void ( * done ) ( grpc_exec_ctx * exec_ctx ,
void * done_arg ,
grpc_cq_completion * storage ) ,
void * done_arg , grpc_cq_completion * storage ) {
cq_data * cqd = & cc - > data ;
cq_data * cqd = & cq - > data ;
int is_success = ( error = = GRPC_ERROR_NONE ) ;
GPR_TIMER_BEGIN ( " cq_end_op_for_pluck " , 0 ) ;
@ -624,9 +627,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
error ! = GRPC_ERROR_NONE ) ) {
const char * errmsg = grpc_error_string ( error ) ;
GRPC_API_TRACE (
" cq_end_op_for_pluck(exec_ctx=%p, cc =%p, tag=%p, error=%s, "
" cq_end_op_for_pluck(exec_ctx=%p, cq =%p, tag=%p, error=%s, "
" done=%p, done_arg=%p, storage=%p) " ,
7 , ( exec_ctx , cc , tag , errmsg , done , done_arg , storage ) ) ;
7 , ( exec_ctx , cq , tag , errmsg , done , done_arg , storage ) ) ;
if ( GRPC_TRACER_ON ( grpc_trace_operation_failures ) & &
error ! = GRPC_ERROR_NONE ) {
gpr_log ( GPR_ERROR , " Operation failed: tag=%p, error=%s " , tag , errmsg ) ;
@ -639,7 +642,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
storage - > next = ( ( uintptr_t ) & cqd - > completed_head ) | ( ( uintptr_t ) ( is_success ) ) ;
gpr_mu_lock ( cqd - > mu ) ;
cq_check_tag ( cc , tag , false ) ; /* Used in debug builds only */
cq_check_tag ( cq , tag , false ) ; /* Used in debug builds only */
/* Add to the list of completions */
gpr_atm_no_barrier_fetch_add ( & cqd - > things_queued_ever , 1 ) ;
@ -658,7 +661,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
}
grpc_error * kick_error =
cc - > poller_vtable - > kick ( POLLSET_FROM_CQ ( cc ) , pluck_worker ) ;
cq - > poller_vtable - > kick ( POLLSET_FROM_CQ ( cq ) , pluck_worker ) ;
gpr_mu_unlock ( cqd - > mu ) ;
@ -669,7 +672,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF ( kick_error ) ;
}
} else {
cq_finish_shutdown ( exec_ctx , cc ) ;
cq_finish_shutdown ( exec_ctx , cq ) ;
gpr_mu_unlock ( cqd - > mu ) ;
}
@ -678,12 +681,12 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF ( error ) ;
}
void grpc_cq_end_op ( grpc_exec_ctx * exec_ctx , grpc_completion_queue * cc ,
void grpc_cq_end_op ( grpc_exec_ctx * exec_ctx , grpc_completion_queue * cq ,
void * tag , grpc_error * error ,
void ( * done ) ( grpc_exec_ctx * exec_ctx , void * done_arg ,
grpc_cq_completion * storage ) ,
void * done_arg , grpc_cq_completion * storage ) {
cc - > vtable - > end_op ( exec_ctx , cc , tag , error , done , done_arg , storage ) ;
cq - > vtable - > end_op ( exec_ctx , cq , tag , error , done , done_arg , storage ) ;
}
typedef struct {
@ -722,10 +725,10 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
}
# ifndef NDEBUG
static void dump_pending_tags ( grpc_completion_queue * cc ) {
static void dump_pending_tags ( grpc_completion_queue * cq ) {
if ( ! GRPC_TRACER_ON ( grpc_trace_pending_tags ) ) return ;
cq_data * cqd = & cc - > data ;
cq_data * cqd = & cq - > data ;
gpr_strvec v ;
gpr_strvec_init ( & v ) ;
@ -743,37 +746,37 @@ static void dump_pending_tags(grpc_completion_queue *cc) {
gpr_free ( out ) ;
}
# else
static void dump_pending_tags ( grpc_completion_queue * cc ) { }
static void dump_pending_tags ( grpc_completion_queue * cq ) { }
# endif
static grpc_event cq_next ( grpc_completion_queue * cc , gpr_timespec deadline ,
static grpc_event cq_next ( grpc_completion_queue * cq , gpr_timespec deadline ,
void * reserved ) {
grpc_event ret ;
gpr_timespec now ;
cq_data * cqd = & cc - > data ;
cq_data * cqd = & cq - > data ;
GPR_TIMER_BEGIN ( " grpc_completion_queue_next " , 0 ) ;
GRPC_API_TRACE (
" grpc_completion_queue_next( "
" cc =%p, "
" cq =%p, "
" deadline=gpr_timespec { tv_sec: % " PRId64
" , tv_nsec: %d, clock_type: %d }, "
" reserved=%p) " ,
5 , ( cc , deadline . tv_sec , deadline . tv_nsec , ( int ) deadline . clock_type ,
5 , ( cq , deadline . tv_sec , deadline . tv_nsec , ( int ) deadline . clock_type ,
reserved ) ) ;
GPR_ASSERT ( ! reserved ) ;
dump_pending_tags ( cc ) ;
dump_pending_tags ( cq ) ;
deadline = gpr_convert_clock_type ( deadline , GPR_CLOCK_MONOTONIC ) ;
GRPC_CQ_INTERNAL_REF ( cc , " next " ) ;
GRPC_CQ_INTERNAL_REF ( cq , " next " ) ;
cq_is_finished_arg is_finished_arg = {
. last_seen_things_queued_ever =
gpr_atm_no_barrier_load ( & cqd - > things_queued_ever ) ,
. cq = cc ,
. cq = cq ,
. deadline = deadline ,
. stolen_completion = NULL ,
. tag = NULL ,
@ -819,8 +822,8 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
empty . If so , keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
if ( cq_event_queue_num_items ( & cqd - > queue ) > 0 ) {
/* Go to the beginning of the loop. No point doing a poll because
( cc - > shutdown = = true ) is only possible when there is no pending work
( i . e cc - > pending_events = = 0 ) and any outstanding grpc_cq_completion
( cq - > shutdown = = true ) is only possible when there is no pending work
( i . e cq - > pending_events = = 0 ) and any outstanding grpc_cq_completion
events are already queued on this cq */
continue ;
}
@ -834,14 +837,14 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
if ( ! is_finished_arg . first_loop & & gpr_time_cmp ( now , deadline ) > = 0 ) {
memset ( & ret , 0 , sizeof ( ret ) ) ;
ret . type = GRPC_QUEUE_TIMEOUT ;
dump_pending_tags ( cc ) ;
dump_pending_tags ( cq ) ;
break ;
}
/* The main polling work happens in grpc_pollset_work */
gpr_mu_lock ( cqd - > mu ) ;
cqd - > num_polls + + ;
grpc_error * err = cc - > poller_vtable - > work ( & exec_ctx , POLLSET_FROM_CQ ( cc ) ,
grpc_error * err = cq - > poller_vtable - > work ( & exec_ctx , POLLSET_FROM_CQ ( cq ) ,
NULL , now , iteration_deadline ) ;
gpr_mu_unlock ( cqd - > mu ) ;
@ -852,30 +855,36 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline,
GRPC_ERROR_UNREF ( err ) ;
memset ( & ret , 0 , sizeof ( ret ) ) ;
ret . type = GRPC_QUEUE_TIMEOUT ;
dump_pending_tags ( cc ) ;
dump_pending_tags ( cq ) ;
break ;
}
is_finished_arg . first_loop = false ;
}
GRPC_SURFACE_TRACE_RETURNED_EVENT ( cc , & ret ) ;
GRPC_CQ_INTERNAL_UNREF ( & exec_ctx , cc , " next " ) ;
GRPC_SURFACE_TRACE_RETURNED_EVENT ( cq , & ret ) ;
GRPC_CQ_INTERNAL_UNREF ( & exec_ctx , cq , " next " ) ;
grpc_exec_ctx_finish ( & exec_ctx ) ;
GPR_ASSERT ( is_finished_arg . stolen_completion = = NULL ) ;
if ( cq_event_queue_num_items ( & cqd - > queue ) > 0 ) {
gpr_mu_lock ( cqd - > mu ) ;
cq - > poller_vtable - > kick ( POLLSET_FROM_CQ ( cq ) , NULL ) ;
gpr_mu_unlock ( cqd - > mu ) ;
}
GPR_TIMER_END ( " grpc_completion_queue_next " , 0 ) ;
return ret ;
}
grpc_event grpc_completion_queue_next ( grpc_completion_queue * cc ,
grpc_event grpc_completion_queue_next ( grpc_completion_queue * cq ,
gpr_timespec deadline , void * reserved ) {
return cc - > vtable - > next ( cc , deadline , reserved ) ;
return cq - > vtable - > next ( cq , deadline , reserved ) ;
}
static int add_plucker ( grpc_completion_queue * cc , void * tag ,
static int add_plucker ( grpc_completion_queue * cq , void * tag ,
grpc_pollset_worker * * worker ) {
cq_data * cqd = & cc - > data ;
cq_data * cqd = & cq - > data ;
if ( cqd - > num_pluckers = = GRPC_MAX_COMPLETION_QUEUE_PLUCKERS ) {
return 0 ;
}
@ -885,9 +894,9 @@ static int add_plucker(grpc_completion_queue *cc, void *tag,
return 1 ;
}
static void del_plucker ( grpc_completion_queue * cc , void * tag ,
static void del_plucker ( grpc_completion_queue * cq , void * tag ,
grpc_pollset_worker * * worker ) {
cq_data * cqd = & cc - > data ;
cq_data * cqd = & cq - > data ;
for ( int i = 0 ; i < cqd - > num_pluckers ; i + + ) {
if ( cqd - > pluckers [ i ] . tag = = tag & & cqd - > pluckers [ i ] . worker = = worker ) {
cqd - > num_pluckers - - ;
@ -931,39 +940,39 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
gpr_time_cmp ( a - > deadline , gpr_now ( a - > deadline . clock_type ) ) < 0 ;
}
static grpc_event cq_pluck ( grpc_completion_queue * cc , void * tag ,
static grpc_event cq_pluck ( grpc_completion_queue * cq , void * tag ,
gpr_timespec deadline , void * reserved ) {
grpc_event ret ;
grpc_cq_completion * c ;
grpc_cq_completion * prev ;
grpc_pollset_worker * worker = NULL ;
gpr_timespec now ;
cq_data * cqd = & cc - > data ;
cq_data * cqd = & cq - > data ;
GPR_TIMER_BEGIN ( " grpc_completion_queue_pluck " , 0 ) ;
if ( GRPC_TRACER_ON ( grpc_cq_pluck_trace ) ) {
GRPC_API_TRACE (
" grpc_completion_queue_pluck( "
" cc =%p, tag=%p, "
" cq =%p, tag=%p, "
" deadline=gpr_timespec { tv_sec: % " PRId64
" , tv_nsec: %d, clock_type: %d }, "
" reserved=%p) " ,
6 , ( cc , tag , deadline . tv_sec , deadline . tv_nsec ,
6 , ( cq , tag , deadline . tv_sec , deadline . tv_nsec ,
( int ) deadline . clock_type , reserved ) ) ;
}
GPR_ASSERT ( ! reserved ) ;
dump_pending_tags ( cc ) ;
dump_pending_tags ( cq ) ;
deadline = gpr_convert_clock_type ( deadline , GPR_CLOCK_MONOTONIC ) ;
GRPC_CQ_INTERNAL_REF ( cc , " pluck " ) ;
GRPC_CQ_INTERNAL_REF ( cq , " pluck " ) ;
gpr_mu_lock ( cqd - > mu ) ;
cq_is_finished_arg is_finished_arg = {
. last_seen_things_queued_ever =
gpr_atm_no_barrier_load ( & cqd - > things_queued_ever ) ,
. cq = cc ,
. cq = cq ,
. deadline = deadline ,
. stolen_completion = NULL ,
. tag = tag ,
@ -1004,7 +1013,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
ret . type = GRPC_QUEUE_SHUTDOWN ;
break ;
}
if ( ! add_plucker ( cc , tag , & worker ) ) {
if ( ! add_plucker ( cq , tag , & worker ) ) {
gpr_log ( GPR_DEBUG ,
" Too many outstanding grpc_completion_queue_pluck calls: maximum "
" is %d " ,
@ -1013,24 +1022,24 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
memset ( & ret , 0 , sizeof ( ret ) ) ;
/* TODO(ctiller): should we use a different result here */
ret . type = GRPC_QUEUE_TIMEOUT ;
dump_pending_tags ( cc ) ;
dump_pending_tags ( cq ) ;
break ;
}
now = gpr_now ( GPR_CLOCK_MONOTONIC ) ;
if ( ! is_finished_arg . first_loop & & gpr_time_cmp ( now , deadline ) > = 0 ) {
del_plucker ( cc , tag , & worker ) ;
del_plucker ( cq , tag , & worker ) ;
gpr_mu_unlock ( cqd - > mu ) ;
memset ( & ret , 0 , sizeof ( ret ) ) ;
ret . type = GRPC_QUEUE_TIMEOUT ;
dump_pending_tags ( cc ) ;
dump_pending_tags ( cq ) ;
break ;
}
cqd - > num_polls + + ;
grpc_error * err = cc - > poller_vtable - > work ( & exec_ctx , POLLSET_FROM_CQ ( cc ) ,
grpc_error * err = cq - > poller_vtable - > work ( & exec_ctx , POLLSET_FROM_CQ ( cq ) ,
& worker , now , deadline ) ;
if ( err ! = GRPC_ERROR_NONE ) {
del_plucker ( cc , tag , & worker ) ;
del_plucker ( cq , tag , & worker ) ;
gpr_mu_unlock ( cqd - > mu ) ;
const char * msg = grpc_error_string ( err ) ;
gpr_log ( GPR_ERROR , " Completion queue pluck failed: %s " , msg ) ;
@ -1038,15 +1047,15 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag,
GRPC_ERROR_UNREF ( err ) ;
memset ( & ret , 0 , sizeof ( ret ) ) ;
ret . type = GRPC_QUEUE_TIMEOUT ;
dump_pending_tags ( cc ) ;
dump_pending_tags ( cq ) ;
break ;
}
is_finished_arg . first_loop = false ;
del_plucker ( cc , tag , & worker ) ;
del_plucker ( cq , tag , & worker ) ;
}
done :
GRPC_SURFACE_TRACE_RETURNED_EVENT ( cc , & ret ) ;
GRPC_CQ_INTERNAL_UNREF ( & exec_ctx , cc , " pluck " ) ;
GRPC_SURFACE_TRACE_RETURNED_EVENT ( cq , & ret ) ;
GRPC_CQ_INTERNAL_UNREF ( & exec_ctx , cq , " pluck " ) ;
grpc_exec_ctx_finish ( & exec_ctx ) ;
GPR_ASSERT ( is_finished_arg . stolen_completion = = NULL ) ;
@ -1055,9 +1064,9 @@ done:
return ret ;
}
grpc_event grpc_completion_queue_pluck ( grpc_completion_queue * cc , void * tag ,
grpc_event grpc_completion_queue_pluck ( grpc_completion_queue * cq , void * tag ,
gpr_timespec deadline , void * reserved ) {
return cc - > vtable - > pluck ( cc , tag , deadline , reserved ) ;
return cq - > vtable - > pluck ( cq , tag , deadline , reserved ) ;
}
/* Finishes the completion queue shutdown. This means that there are no more
@ -1067,24 +1076,24 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
- grpc_completion_queue_shutdown ( ) MUST have been called before calling
this function */
static void cq_finish_shutdown ( grpc_exec_ctx * exec_ctx ,
grpc_completion_queue * cc ) {
cq_data * cqd = & cc - > data ;
grpc_completion_queue * cq ) {
cq_data * cqd = & cq - > data ;
GPR_ASSERT ( cqd - > shutdown_called ) ;
GPR_ASSERT ( ! gpr_atm_no_barrier_load ( & cqd - > shutdown ) ) ;
gpr_atm_no_barrier_store ( & cqd - > shutdown , 1 ) ;
cc - > poller_vtable - > shutdown ( exec_ctx , POLLSET_FROM_CQ ( cc ) ,
cq - > poller_vtable - > shutdown ( exec_ctx , POLLSET_FROM_CQ ( cq ) ,
& cqd - > pollset_shutdown_done ) ;
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
to zero here , then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown ( grpc_completion_queue * cc ) {
void grpc_completion_queue_shutdown ( grpc_completion_queue * cq ) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT ;
GPR_TIMER_BEGIN ( " grpc_completion_queue_shutdown " , 0 ) ;
GRPC_API_TRACE ( " grpc_completion_queue_shutdown(cc =%p) " , 1 , ( cc ) ) ;
cq_data * cqd = & cc - > data ;
GRPC_API_TRACE ( " grpc_completion_queue_shutdown(cq =%p) " , 1 , ( cq ) ) ;
cq_data * cqd = & cq - > data ;
gpr_mu_lock ( cqd - > mu ) ;
if ( cqd - > shutdown_called ) {
@ -1094,46 +1103,46 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
}
cqd - > shutdown_called = 1 ;
if ( gpr_unref ( & cqd - > pending_events ) ) {
cq_finish_shutdown ( & exec_ctx , cc ) ;
cq_finish_shutdown ( & exec_ctx , cq ) ;
}
gpr_mu_unlock ( cqd - > mu ) ;
grpc_exec_ctx_finish ( & exec_ctx ) ;
GPR_TIMER_END ( " grpc_completion_queue_shutdown " , 0 ) ;
}
void grpc_completion_queue_destroy ( grpc_completion_queue * cc ) {
GRPC_API_TRACE ( " grpc_completion_queue_destroy(cc =%p) " , 1 , ( cc ) ) ;
void grpc_completion_queue_destroy ( grpc_completion_queue * cq ) {
GRPC_API_TRACE ( " grpc_completion_queue_destroy(cq =%p) " , 1 , ( cq ) ) ;
GPR_TIMER_BEGIN ( " grpc_completion_queue_destroy " , 0 ) ;
grpc_completion_queue_shutdown ( cc ) ;
grpc_completion_queue_shutdown ( cq ) ;
/* TODO (sreek): This should not ideally be here. Refactor it into the
* cq_vtable ( perhaps have a create / destroy methods in the cq vtable ) */
if ( cc - > vtable - > cq_completion_type = = GRPC_CQ_NEXT ) {
GPR_ASSERT ( cq_event_queue_num_items ( & cc - > data . queue ) = = 0 ) ;
if ( cq - > vtable - > cq_completion_type = = GRPC_CQ_NEXT ) {
GPR_ASSERT ( cq_event_queue_num_items ( & cq - > data . queue ) = = 0 ) ;
}
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT ;
GRPC_CQ_INTERNAL_UNREF ( & exec_ctx , cc , " destroy " ) ;
GRPC_CQ_INTERNAL_UNREF ( & exec_ctx , cq , " destroy " ) ;
grpc_exec_ctx_finish ( & exec_ctx ) ;
GPR_TIMER_END ( " grpc_completion_queue_destroy " , 0 ) ;
}
grpc_pollset * grpc_cq_pollset ( grpc_completion_queue * cc ) {
return cc - > poller_vtable - > can_get_pollset ? POLLSET_FROM_CQ ( cc ) : NULL ;
grpc_pollset * grpc_cq_pollset ( grpc_completion_queue * cq ) {
return cq - > poller_vtable - > can_get_pollset ? POLLSET_FROM_CQ ( cq ) : NULL ;
}
grpc_completion_queue * grpc_cq_from_pollset ( grpc_pollset * ps ) {
return CQ_FROM_POLLSET ( ps ) ;
}
void grpc_cq_mark_server_cq ( grpc_completion_queue * cc ) {
cc - > data . is_server_cq = 1 ;
void grpc_cq_mark_server_cq ( grpc_completion_queue * cq ) {
cq - > data . is_server_cq = 1 ;
}
bool grpc_cq_is_server_cq ( grpc_completion_queue * cc ) {
return cc - > data . is_server_cq ;
bool grpc_cq_is_server_cq ( grpc_completion_queue * cq ) {
return cq - > data . is_server_cq ;
}
bool grpc_cq_can_listen ( grpc_completion_queue * cc ) {
return cc - > poller_vtable - > can_listen ;
bool grpc_cq_can_listen ( grpc_completion_queue * cq ) {
return cq - > poller_vtable - > can_listen ;
}