@ -33,6 +33,7 @@
# include "src/core/lib/gpr/spinlock.h"
# include "src/core/lib/gpr/spinlock.h"
# include "src/core/lib/gpr/string.h"
# include "src/core/lib/gpr/string.h"
# include "src/core/lib/gpr/tls.h"
# include "src/core/lib/gpr/tls.h"
# include "src/core/lib/gprpp/atomic.h"
# include "src/core/lib/iomgr/pollset.h"
# include "src/core/lib/iomgr/pollset.h"
# include "src/core/lib/iomgr/timer.h"
# include "src/core/lib/iomgr/timer.h"
# include "src/core/lib/profiling/timers.h"
# include "src/core/lib/profiling/timers.h"
@ -44,6 +45,8 @@ grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure");
grpc_core : : DebugOnlyTraceFlag grpc_trace_pending_tags ( false , " pending_tags " ) ;
grpc_core : : DebugOnlyTraceFlag grpc_trace_pending_tags ( false , " pending_tags " ) ;
grpc_core : : DebugOnlyTraceFlag grpc_trace_cq_refcount ( false , " cq_refcount " ) ;
grpc_core : : DebugOnlyTraceFlag grpc_trace_cq_refcount ( false , " cq_refcount " ) ;
namespace {
// Specifies a cq thread local cache.
// Specifies a cq thread local cache.
// The first event that occurs on a thread
// The first event that occurs on a thread
// with a cq cache will go into that cache, and
// with a cq cache will go into that cache, and
@ -84,24 +87,22 @@ typedef struct {
grpc_closure * shutdown ;
grpc_closure * shutdown ;
} non_polling_poller ;
} non_polling_poller ;
static size_t non_polling_poller_size ( void ) {
size_t non_polling_poller_size ( void ) { return sizeof ( non_polling_poller ) ; }
return sizeof ( non_polling_poller ) ;
}
static void non_polling_poller_init ( grpc_pollset * pollset , gpr_mu * * mu ) {
void non_polling_poller_init ( grpc_pollset * pollset , gpr_mu * * mu ) {
non_polling_poller * npp = reinterpret_cast < non_polling_poller * > ( pollset ) ;
non_polling_poller * npp = reinterpret_cast < non_polling_poller * > ( pollset ) ;
gpr_mu_init ( & npp - > mu ) ;
gpr_mu_init ( & npp - > mu ) ;
* mu = & npp - > mu ;
* mu = & npp - > mu ;
}
}
static void non_polling_poller_destroy ( grpc_pollset * pollset ) {
void non_polling_poller_destroy ( grpc_pollset * pollset ) {
non_polling_poller * npp = reinterpret_cast < non_polling_poller * > ( pollset ) ;
non_polling_poller * npp = reinterpret_cast < non_polling_poller * > ( pollset ) ;
gpr_mu_destroy ( & npp - > mu ) ;
gpr_mu_destroy ( & npp - > mu ) ;
}
}
static grpc_error * non_polling_poller_work ( grpc_pollset * pollset ,
grpc_error * non_polling_poller_work ( grpc_pollset * pollset ,
grpc_pollset_worker * * worker ,
grpc_pollset_worker * * worker ,
grpc_millis deadline ) {
grpc_millis deadline ) {
non_polling_poller * npp = reinterpret_cast < non_polling_poller * > ( pollset ) ;
non_polling_poller * npp = reinterpret_cast < non_polling_poller * > ( pollset ) ;
if ( npp - > shutdown ) return GRPC_ERROR_NONE ;
if ( npp - > shutdown ) return GRPC_ERROR_NONE ;
if ( npp - > kicked_without_poller ) {
if ( npp - > kicked_without_poller ) {
@ -141,8 +142,8 @@ static grpc_error* non_polling_poller_work(grpc_pollset* pollset,
return GRPC_ERROR_NONE ;
return GRPC_ERROR_NONE ;
}
}
static grpc_error * non_polling_poller_kick (
grpc_error * non_polling_poller_kick ( grpc_pollset * pollset ,
grpc_pollset * pollset , grpc_pollset_worker * specific_worker ) {
grpc_pollset_worker * specific_worker ) {
non_polling_poller * p = reinterpret_cast < non_polling_poller * > ( pollset ) ;
non_polling_poller * p = reinterpret_cast < non_polling_poller * > ( pollset ) ;
if ( specific_worker = = nullptr )
if ( specific_worker = = nullptr )
specific_worker = reinterpret_cast < grpc_pollset_worker * > ( p - > root ) ;
specific_worker = reinterpret_cast < grpc_pollset_worker * > ( p - > root ) ;
@ -159,8 +160,7 @@ static grpc_error* non_polling_poller_kick(
return GRPC_ERROR_NONE ;
return GRPC_ERROR_NONE ;
}
}
static void non_polling_poller_shutdown ( grpc_pollset * pollset ,
void non_polling_poller_shutdown ( grpc_pollset * pollset , grpc_closure * closure ) {
grpc_closure * closure ) {
non_polling_poller * p = reinterpret_cast < non_polling_poller * > ( pollset ) ;
non_polling_poller * p = reinterpret_cast < non_polling_poller * > ( pollset ) ;
GPR_ASSERT ( closure ! = nullptr ) ;
GPR_ASSERT ( closure ! = nullptr ) ;
p - > shutdown = closure ;
p - > shutdown = closure ;
@ -175,7 +175,7 @@ static void non_polling_poller_shutdown(grpc_pollset* pollset,
}
}
}
}
static const cq_poller_vtable g_poller_vtable_by_poller_type [ ] = {
const cq_poller_vtable g_poller_vtable_by_poller_type [ ] = {
/* GRPC_CQ_DEFAULT_POLLING */
/* GRPC_CQ_DEFAULT_POLLING */
{ true , true , grpc_pollset_size , grpc_pollset_init , grpc_pollset_kick ,
{ true , true , grpc_pollset_size , grpc_pollset_init , grpc_pollset_kick ,
grpc_pollset_work , grpc_pollset_shutdown , grpc_pollset_destroy } ,
grpc_pollset_work , grpc_pollset_shutdown , grpc_pollset_destroy } ,
@ -188,7 +188,9 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
non_polling_poller_shutdown , non_polling_poller_destroy } ,
non_polling_poller_shutdown , non_polling_poller_destroy } ,
} ;
} ;
typedef struct cq_vtable {
} // namespace
struct cq_vtable {
grpc_cq_completion_type cq_completion_type ;
grpc_cq_completion_type cq_completion_type ;
size_t data_size ;
size_t data_size ;
void ( * init ) ( void * data ,
void ( * init ) ( void * data ,
@ -203,80 +205,116 @@ typedef struct cq_vtable {
void * reserved ) ;
void * reserved ) ;
grpc_event ( * pluck ) ( grpc_completion_queue * cq , void * tag ,
grpc_event ( * pluck ) ( grpc_completion_queue * cq , void * tag ,
gpr_timespec deadline , void * reserved ) ;
gpr_timespec deadline , void * reserved ) ;
} cq_vtable ;
} ;
namespace {
/* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue
/* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue
* ( a lockfree multiproducer single consumer queue ) . It uses a queue_lock
* ( a lockfree multiproducer single consumer queue ) . It uses a queue_lock
* to support multiple consumers .
* to support multiple consumers .
* Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
* Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
typedef struct grpc_cq_event_queue {
class CqEventQueue {
public :
CqEventQueue ( ) { gpr_mpscq_init ( & queue_ ) ; }
~ CqEventQueue ( ) { gpr_mpscq_destroy ( & queue_ ) ; }
/* Note: The counter is not incremented/decremented atomically with push/pop.
* The count is only eventually consistent */
intptr_t num_items ( ) const {
return num_queue_items_ . Load ( grpc_core : : MemoryOrder : : RELAXED ) ;
}
bool Push ( grpc_cq_completion * c ) ;
grpc_cq_completion * Pop ( ) ;
private :
/* Spinlock to serialize consumers i.e pop() operations */
/* Spinlock to serialize consumers i.e pop() operations */
gpr_spinlock queue_lock ;
gpr_spinlock queue_lock_ = GPR_SPINLOCK_INITIALIZER ;
gpr_mpscq queue ;
gpr_mpscq queue_ ;
/* A lazy counter of number of items in the queue. This is NOT atomically
/* A lazy counter of number of items in the queue. This is NOT atomically
incremented / decremented along with push / pop operations and hence is only
incremented / decremented along with push / pop operations and hence is only
eventually consistent */
eventually consistent */
gpr_atm num_queue_items ;
grpc_core : : Atomic < intptr_t > num_queue_items_ { 0 } ;
} grpc_cq_event_queue ;
} ;
struct cq_next_data {
~ cq_next_data ( ) { GPR_ASSERT ( queue . num_items ( ) = = 0 ) ; }
typedef struct cq_next_data {
/** Completed events for completion-queues of type GRPC_CQ_NEXT */
/** Completed events for completion-queues of type GRPC_CQ_NEXT */
grpc_cq_event_queue queue ;
CqEventQ ueue queue ;
/** Counter of how many things have ever been queued on this completion queue
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever ;
grpc_core : : Atomic < intptr_t > things_queued_ever { 0 } ;
/* Number of outstanding events (+1 if not shut down) */
/** Number of outstanding events (+1 if not shut down)
gpr_atm pending_events ;
Initial count is dropped by grpc_completion_queue_shutdown */
grpc_core : : Atomic < intptr_t > pending_events { 1 } ;
/** 0 initially. 1 once we initiated shutdown */
/** 0 initially. 1 once we initiated shutdown */
bool shutdown_called ;
bool shutdown_called = false ;
} cq_next_data ;
} ;
struct cq_pluck_data {
cq_pluck_data ( ) {
completed_tail = & completed_head ;
completed_head . next = reinterpret_cast < uintptr_t > ( completed_tail ) ;
}
~ cq_pluck_data ( ) {
GPR_ASSERT ( completed_head . next = =
reinterpret_cast < uintptr_t > ( & completed_head ) ) ;
}
typedef struct cq_pluck_data {
/** Completed events for completion-queues of type GRPC_CQ_PLUCK */
/** Completed events for completion-queues of type GRPC_CQ_PLUCK */
grpc_cq_completion completed_head ;
grpc_cq_completion completed_head ;
grpc_cq_completion * completed_tail ;
grpc_cq_completion * completed_tail ;
/** Number of pending events (+1 if we're not shutdown) */
/** Number of pending events (+1 if we're not shutdown).
gpr_atm pending_events ;
Initial count is dropped by grpc_completion_queue_shutdown . */
grpc_core : : Atomic < intptr_t > pending_events { 1 } ;
/** Counter of how many things have ever been queued on this completion queue
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever ;
grpc_core : : Atomic < intptr_t > things_queued_ever { 0 } ;
/** 0 initially. 1 once we completed shutting */
/** 0 initially. 1 once we completed shutting */
/* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
/* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
* ( pending_events = = 0 ) . So consider removing this in future and use
* ( pending_events = = 0 ) . So consider removing this in future and use
* pending_events */
* pending_events */
gpr_atm shutdown ;
grpc_core : : Atomic < bool > shutdown { false } ;
/** 0 initially. 1 once we initiated shutdown */
/** 0 initially. 1 once we initiated shutdown */
bool shutdown_called ;
bool shutdown_called = false ;
int num_pluckers ;
int num_pluckers = 0 ;
plucker pluckers [ GRPC_MAX_COMPLETION_QUEUE_PLUCKERS ] ;
plucker pluckers [ GRPC_MAX_COMPLETION_QUEUE_PLUCKERS ] ;
} cq_pluck_data ;
} ;
typedef struct cq_callback_data {
struct cq_callback_data {
cq_callback_data (
grpc_experimental_completion_queue_functor * shutdown_callback )
: shutdown_callback ( shutdown_callback ) { }
/** No actual completed events queue, unlike other types */
/** No actual completed events queue, unlike other types */
/** Number of pending events (+1 if we're not shutdown) */
/** Number of pending events (+1 if we're not shutdown).
gpr_atm pending_events ;
Initial count is dropped by grpc_completion_queue_shutdown . */
grpc_core : : Atomic < intptr_t > pending_events { 1 } ;
/** Counter of how many things have ever been queued on this completion queue
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever ;
grpc_core : : Atomic < intptr_t > things_queued_ever { 0 } ;
/** 0 initially. 1 once we initiated shutdown */
/** 0 initially. 1 once we initiated shutdown */
bool shutdown_called ;
bool shutdown_called = false ;
/** A callback that gets invoked when the CQ completes shutdown */
/** A callback that gets invoked when the CQ completes shutdown */
grpc_experimental_completion_queue_functor * shutdown_callback ;
grpc_experimental_completion_queue_functor * shutdown_callback ;
} cq_callback_data ;
} ;
} // namespace
/* Completion queue structure */
/* Completion queue structure */
struct grpc_completion_queue {
struct grpc_completion_queue {
@ -408,7 +446,7 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
storage - > done ( storage - > done_arg , storage ) ;
storage - > done ( storage - > done_arg , storage ) ;
ret = 1 ;
ret = 1 ;
cq_next_data * cqd = static_cast < cq_next_data * > DATA_FROM_CQ ( cq ) ;
cq_next_data * cqd = static_cast < cq_next_data * > DATA_FROM_CQ ( cq ) ;
if ( gpr_atm_full_fetch_add ( & cqd - > pending_events , - 1 ) = = 1 ) {
if ( cqd - > pending_events . FetchSub ( 1 , grpc_core : : MemoryOrder : : ACQ_REL ) = = 1 ) {
GRPC_CQ_INTERNAL_REF ( cq , " shutting_down " ) ;
GRPC_CQ_INTERNAL_REF ( cq , " shutting_down " ) ;
gpr_mu_lock ( cq - > mu ) ;
gpr_mu_lock ( cq - > mu ) ;
cq_finish_shutdown_next ( cq ) ;
cq_finish_shutdown_next ( cq ) ;
@ -422,31 +460,21 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
return ret ;
return ret ;
}
}
static void cq_event_queue_init ( grpc_cq_event_queue * q ) {
bool CqEventQueue : : Push ( grpc_cq_completion * c ) {
gpr_mpscq_init ( & q - > queue ) ;
gpr_mpscq_push ( & queue_ , reinterpret_cast < gpr_mpscq_node * > ( c ) ) ;
q - > queue_lock = GPR_SPINLOCK_INITIALIZER ;
return num_queue_items_ . FetchAdd ( 1 , grpc_core : : MemoryOrder : : RELAXED ) = = 0 ;
gpr_atm_no_barrier_store ( & q - > num_queue_items , 0 ) ;
}
}
static void cq_event_queue_destroy ( grpc_cq_event_queue * q ) {
grpc_cq_completion * CqEventQueue : : Pop ( ) {
gpr_mpscq_destroy ( & q - > queue ) ;
}
static bool cq_event_queue_push ( grpc_cq_event_queue * q , grpc_cq_completion * c ) {
gpr_mpscq_push ( & q - > queue , reinterpret_cast < gpr_mpscq_node * > ( c ) ) ;
return gpr_atm_no_barrier_fetch_add ( & q - > num_queue_items , 1 ) = = 0 ;
}
static grpc_cq_completion * cq_event_queue_pop ( grpc_cq_event_queue * q ) {
grpc_cq_completion * c = nullptr ;
grpc_cq_completion * c = nullptr ;
if ( gpr_spinlock_trylock ( & q - > q ueue_lock) ) {
if ( gpr_spinlock_trylock ( & queue_lock_ ) ) {
GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES ( ) ;
GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES ( ) ;
bool is_empty = false ;
bool is_empty = false ;
c = reinterpret_cast < grpc_cq_completion * > (
c = reinterpret_cast < grpc_cq_completion * > (
gpr_mpscq_pop_and_check_end ( & q - > q ueue, & is_empty ) ) ;
gpr_mpscq_pop_and_check_end ( & queue_ , & is_empty ) ) ;
gpr_spinlock_unlock ( & q - > q ueue_lock) ;
gpr_spinlock_unlock ( & queue_lock_ ) ;
if ( c = = nullptr & & ! is_empty ) {
if ( c = = nullptr & & ! is_empty ) {
GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES ( ) ;
GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES ( ) ;
@ -456,18 +484,12 @@ static grpc_cq_completion* cq_event_queue_pop(grpc_cq_event_queue* q) {
}
}
if ( c ) {
if ( c ) {
gpr_atm_no_barrier_fetch_add ( & q - > num_queue_items , - 1 ) ;
num_queue_items_ . FetchSub ( 1 , grpc_core : : MemoryOrder : : RELAXED ) ;
}
}
return c ;
return c ;
}
}
/* Note: The counter is not incremented/decremented atomically with push/pop.
* The count is only eventually consistent */
static long cq_event_queue_num_items ( grpc_cq_event_queue * q ) {
return static_cast < long > ( gpr_atm_no_barrier_load ( & q - > num_queue_items ) ) ;
}
grpc_completion_queue * grpc_completion_queue_create_internal (
grpc_completion_queue * grpc_completion_queue_create_internal (
grpc_cq_completion_type completion_type , grpc_cq_polling_type polling_type ,
grpc_cq_completion_type completion_type , grpc_cq_polling_type polling_type ,
grpc_experimental_completion_queue_functor * shutdown_callback ) {
grpc_experimental_completion_queue_functor * shutdown_callback ) {
@ -507,49 +529,33 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
static void cq_init_next (
static void cq_init_next (
void * data , grpc_experimental_completion_queue_functor * shutdown_callback ) {
void * data , grpc_experimental_completion_queue_functor * shutdown_callback ) {
cq_next_data * cqd = static_cast < cq_next_data * > ( data ) ;
new ( data ) cq_next_data ( ) ;
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store ( & cqd - > pending_events , 1 ) ;
cqd - > shutdown_called = false ;
gpr_atm_no_barrier_store ( & cqd - > things_queued_ever , 0 ) ;
cq_event_queue_init ( & cqd - > queue ) ;
}
}
static void cq_destroy_next ( void * data ) {
static void cq_destroy_next ( void * data ) {
cq_next_data * cqd = static_cast < cq_next_data * > ( data ) ;
cq_next_data * cqd = static_cast < cq_next_data * > ( data ) ;
GPR_ASSERT ( cq_event_queue_num_items ( & cqd - > queue ) = = 0 ) ;
cqd - > ~ cq_next_data ( ) ;
cq_event_queue_destroy ( & cqd - > queue ) ;
}
}
static void cq_init_pluck (
static void cq_init_pluck (
void * data , grpc_experimental_completion_queue_functor * shutdown_callback ) {
void * data , grpc_experimental_completion_queue_functor * shutdown_callback ) {
cq_pluck_data * cqd = static_cast < cq_pluck_data * > ( data ) ;
new ( data ) cq_pluck_data ( ) ;
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store ( & cqd - > pending_events , 1 ) ;
cqd - > completed_tail = & cqd - > completed_head ;
cqd - > completed_head . next = ( uintptr_t ) cqd - > completed_tail ;
gpr_atm_no_barrier_store ( & cqd - > shutdown , 0 ) ;
cqd - > shutdown_called = false ;
cqd - > num_pluckers = 0 ;
gpr_atm_no_barrier_store ( & cqd - > things_queued_ever , 0 ) ;
}
}
static void cq_destroy_pluck ( void * data ) {
static void cq_destroy_pluck ( void * data ) {
cq_pluck_data * cqd = static_cast < cq_pluck_data * > ( data ) ;
cq_pluck_data * cqd = static_cast < cq_pluck_data * > ( data ) ;
GPR_ASSERT ( cqd - > completed_head . next = = ( uintptr_t ) & cqd - > completed_head ) ;
cqd - > ~ cq_pluck_data ( ) ;
}
}
static void cq_init_callback (
static void cq_init_callback (
void * data , grpc_experimental_completion_queue_functor * shutdown_callback ) {
void * data , grpc_experimental_completion_queue_functor * shutdown_callback ) {
cq_callback_data * cqd = static_cast < cq_callback_data * > ( data ) ;
new ( data ) cq_callback_data ( shutdown_callback ) ;
/* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store ( & cqd - > pending_events , 1 ) ;
cqd - > shutdown_called = false ;
gpr_atm_no_barrier_store ( & cqd - > things_queued_ever , 0 ) ;
cqd - > shutdown_callback = shutdown_callback ;
}
}
static void cq_destroy_callback ( void * data ) { }
static void cq_destroy_callback ( void * data ) {
cq_callback_data * cqd = static_cast < cq_callback_data * > ( data ) ;
cqd - > ~ cq_callback_data ( ) ;
}
grpc_cq_completion_type grpc_get_cq_completion_type ( grpc_completion_queue * cq ) {
grpc_cq_completion_type grpc_get_cq_completion_type ( grpc_completion_queue * cq ) {
return cq - > vtable - > cq_completion_type ;
return cq - > vtable - > cq_completion_type ;
@ -632,37 +638,19 @@ static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
static void cq_check_tag ( grpc_completion_queue * cq , void * tag , bool lock_cq ) { }
static void cq_check_tag ( grpc_completion_queue * cq , void * tag , bool lock_cq ) { }
# endif
# endif
/* Atomically increments a counter only if the counter is not zero. Returns
* true if the increment was successful ; false if the counter is zero */
static bool atm_inc_if_nonzero ( gpr_atm * counter ) {
while ( true ) {
gpr_atm count = gpr_atm_acq_load ( counter ) ;
/* If zero, we are done. If not, we must to a CAS (instead of an atomic
* increment ) to maintain the contract : do not increment the counter if it
* is zero . */
if ( count = = 0 ) {
return false ;
} else if ( gpr_atm_full_cas ( counter , count , count + 1 ) ) {
break ;
}
}
return true ;
}
static bool cq_begin_op_for_next ( grpc_completion_queue * cq , void * tag ) {
static bool cq_begin_op_for_next ( grpc_completion_queue * cq , void * tag ) {
cq_next_data * cqd = static_cast < cq_next_data * > DATA_FROM_CQ ( cq ) ;
cq_next_data * cqd = static_cast < cq_next_data * > DATA_FROM_CQ ( cq ) ;
return atm_inc_if_nonzero ( & cqd - > pending_events ) ;
return cqd - > pending_events . IncrementIfNonzero ( ) ;
}
}
static bool cq_begin_op_for_pluck ( grpc_completion_queue * cq , void * tag ) {
static bool cq_begin_op_for_pluck ( grpc_completion_queue * cq , void * tag ) {
cq_pluck_data * cqd = static_cast < cq_pluck_data * > DATA_FROM_CQ ( cq ) ;
cq_pluck_data * cqd = static_cast < cq_pluck_data * > DATA_FROM_CQ ( cq ) ;
return atm_inc_if_nonzero ( & cqd - > pending_events ) ;
return cqd - > pending_events . IncrementIfNonzero ( ) ;
}
}
static bool cq_begin_op_for_callback ( grpc_completion_queue * cq , void * tag ) {
static bool cq_begin_op_for_callback ( grpc_completion_queue * cq , void * tag ) {
cq_callback_data * cqd = static_cast < cq_callback_data * > DATA_FROM_CQ ( cq ) ;
cq_callback_data * cqd = static_cast < cq_callback_data * > DATA_FROM_CQ ( cq ) ;
return atm_inc_if_nonzero ( & cqd - > pending_events ) ;
return cqd - > pending_events . IncrementIfNonzero ( ) ;
}
}
bool grpc_cq_begin_op ( grpc_completion_queue * cq , void * tag ) {
bool grpc_cq_begin_op ( grpc_completion_queue * cq , void * tag ) {
@ -716,17 +704,14 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
gpr_tls_set ( & g_cached_event , ( intptr_t ) storage ) ;
gpr_tls_set ( & g_cached_event , ( intptr_t ) storage ) ;
} else {
} else {
/* Add the completion to the queue */
/* Add the completion to the queue */
bool is_first = cq_event_queue_push ( & cqd - > queue , storage ) ;
bool is_first = cqd - > queue . Push ( storage ) ;
gpr_atm_no_barrier_fetch_add ( & cqd - > things_queued_ever , 1 ) ;
cqd - > things_queued_ever . FetchAdd ( 1 , grpc_core : : MemoryOrder : : RELAXED ) ;
/* Since we do not hold the cq lock here, it is important to do an 'acquire'
/* Since we do not hold the cq lock here, it is important to do an 'acquire'
load here ( instead of a ' no_barrier ' load ) to match with the release
load here ( instead of a ' no_barrier ' load ) to match with the release
store
store
( done via gpr_atm_full_fetch_add ( pending_events , - 1 ) ) in cq_shutdown_next
( done via pending_events . FetchSub ( 1 , ACQ_REL ) ) in cq_shutdown_next
*/
*/
bool will_definitely_shutdown = gpr_atm_acq_load ( & cqd - > pending_events ) = = 1 ;
if ( cqd - > pending_events . Load ( grpc_core : : MemoryOrder : : ACQUIRE ) ! = 1 ) {
if ( ! will_definitely_shutdown ) {
/* Only kick if this is the first item queued */
/* Only kick if this is the first item queued */
if ( is_first ) {
if ( is_first ) {
gpr_mu_lock ( cq - > mu ) ;
gpr_mu_lock ( cq - > mu ) ;
@ -740,7 +725,8 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
GRPC_ERROR_UNREF ( kick_error ) ;
GRPC_ERROR_UNREF ( kick_error ) ;
}
}
}
}
if ( gpr_atm_full_fetch_add ( & cqd - > pending_events , - 1 ) = = 1 ) {
if ( cqd - > pending_events . FetchSub ( 1 , grpc_core : : MemoryOrder : : ACQ_REL ) = =
1 ) {
GRPC_CQ_INTERNAL_REF ( cq , " shutting_down " ) ;
GRPC_CQ_INTERNAL_REF ( cq , " shutting_down " ) ;
gpr_mu_lock ( cq - > mu ) ;
gpr_mu_lock ( cq - > mu ) ;
cq_finish_shutdown_next ( cq ) ;
cq_finish_shutdown_next ( cq ) ;
@ -749,7 +735,7 @@ static void cq_end_op_for_next(grpc_completion_queue* cq, void* tag,
}
}
} else {
} else {
GRPC_CQ_INTERNAL_REF ( cq , " shutting_down " ) ;
GRPC_CQ_INTERNAL_REF ( cq , " shutting_down " ) ;
gpr_atm_rel_store ( & cqd - > pending_events , 0 ) ;
cqd - > pending_events . Store ( 0 , grpc_core : : MemoryOrder : : RELEASE ) ;
gpr_mu_lock ( cq - > mu ) ;
gpr_mu_lock ( cq - > mu ) ;
cq_finish_shutdown_next ( cq ) ;
cq_finish_shutdown_next ( cq ) ;
gpr_mu_unlock ( cq - > mu ) ;
gpr_mu_unlock ( cq - > mu ) ;
@ -795,12 +781,12 @@ static void cq_end_op_for_pluck(grpc_completion_queue* cq, void* tag,
cq_check_tag ( cq , tag , false ) ; /* Used in debug builds only */
cq_check_tag ( cq , tag , false ) ; /* Used in debug builds only */
/* Add to the list of completions */
/* Add to the list of completions */
gpr_atm_no_barrier_fetch_add ( & cqd - > things_queued_ever , 1 ) ;
cqd - > things_queued_ever . FetchAdd ( 1 , grpc_core : : MemoryOrder : : RELAXED ) ;
cqd - > completed_tail - > next =
cqd - > completed_tail - > next =
( ( uintptr_t ) storage ) | ( 1u & cqd - > completed_tail - > next ) ;
( ( uintptr_t ) storage ) | ( 1u & cqd - > completed_tail - > next ) ;
cqd - > completed_tail = storage ;
cqd - > completed_tail = storage ;
if ( gpr_atm_full_fetch_add ( & cqd - > pending_events , - 1 ) = = 1 ) {
if ( cqd - > pending_events . FetchSub ( 1 , grpc_core : : MemoryOrder : : ACQ_REL ) = = 1 ) {
cq_finish_shutdown_pluck ( cq ) ;
cq_finish_shutdown_pluck ( cq ) ;
gpr_mu_unlock ( cq - > mu ) ;
gpr_mu_unlock ( cq - > mu ) ;
} else {
} else {
@ -856,8 +842,8 @@ static void cq_end_op_for_callback(
cq_check_tag ( cq , tag , true ) ; /* Used in debug builds only */
cq_check_tag ( cq , tag , true ) ; /* Used in debug builds only */
gpr_atm_no_barrier_fetch_add ( & cqd - > things_queued_ever , 1 ) ;
cqd - > things_queued_ever . FetchAdd ( 1 , grpc_core : : MemoryOrder : : RELAXED ) ;
if ( gpr_atm_full_fetch_add ( & cqd - > pending_events , - 1 ) = = 1 ) {
if ( cqd - > pending_events . FetchSub ( 1 , grpc_core : : MemoryOrder : : ACQ_REL ) = = 1 ) {
cq_finish_shutdown_callback ( cq ) ;
cq_finish_shutdown_callback ( cq ) ;
}
}
@ -893,20 +879,20 @@ class ExecCtxNext : public grpc_core::ExecCtx {
cq_next_data * cqd = static_cast < cq_next_data * > DATA_FROM_CQ ( cq ) ;
cq_next_data * cqd = static_cast < cq_next_data * > DATA_FROM_CQ ( cq ) ;
GPR_ASSERT ( a - > stolen_completion = = nullptr ) ;
GPR_ASSERT ( a - > stolen_completion = = nullptr ) ;
gpr_atm current_last_seen_things_queued_ever =
intptr_t current_last_seen_things_queued_ever =
gpr_atm_no_barrier_load ( & cqd - > things_queued_ever ) ;
cqd - > things_queued_ever . Load ( grpc_core : : MemoryOrder : : RELAXED ) ;
if ( current_last_seen_things_queued_ever ! =
if ( current_last_seen_things_queued_ever ! =
a - > last_seen_things_queued_ever ) {
a - > last_seen_things_queued_ever ) {
a - > last_seen_things_queued_ever =
a - > last_seen_things_queued_ever =
gpr_atm_no_barrier_load ( & cqd - > things_queued_ever ) ;
cqd - > things_queued_ever . Load ( grpc_core : : MemoryOrder : : RELAXED ) ;
/* Pop a cq_completion from the queue. Returns NULL if the queue is empty
/* Pop a cq_completion from the queue. Returns NULL if the queue is empty
* might return NULL in some cases even if the queue is not empty ; but
* might return NULL in some cases even if the queue is not empty ; but
* that
* that
* is ok and doesn ' t affect correctness . Might effect the tail latencies a
* is ok and doesn ' t affect correctness . Might effect the tail latencies a
* bit ) */
* bit ) */
a - > stolen_completion = cq_event_queue_pop ( & cq d - > queue ) ;
a - > stolen_completion = cqd - > queue . Pop ( ) ;
if ( a - > stolen_completion ! = nullptr ) {
if ( a - > stolen_completion ! = nullptr ) {
return true ;
return true ;
}
}
@ -965,7 +951,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
grpc_millis deadline_millis = grpc_timespec_to_millis_round_up ( deadline ) ;
grpc_millis deadline_millis = grpc_timespec_to_millis_round_up ( deadline ) ;
cq_is_finished_arg is_finished_arg = {
cq_is_finished_arg is_finished_arg = {
gpr_atm_no_barrier_load ( & cqd - > things_queued_ever ) ,
cqd - > things_queued_ever . Load ( grpc_core : : MemoryOrder : : RELAXED ) ,
cq ,
cq ,
deadline_millis ,
deadline_millis ,
nullptr ,
nullptr ,
@ -985,7 +971,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
break ;
break ;
}
}
grpc_cq_completion * c = cq_event_queue_pop ( & cq d - > queue ) ;
grpc_cq_completion * c = cqd - > queue . Pop ( ) ;
if ( c ! = nullptr ) {
if ( c ! = nullptr ) {
ret . type = GRPC_OP_COMPLETE ;
ret . type = GRPC_OP_COMPLETE ;
@ -999,16 +985,16 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
so that the thread comes back quickly from poll to make a second
so that the thread comes back quickly from poll to make a second
attempt at popping . Not doing this can potentially deadlock this
attempt at popping . Not doing this can potentially deadlock this
thread forever ( if the deadline is infinity ) */
thread forever ( if the deadline is infinity ) */
if ( cq_event_queue_num_items ( & cq d - > queue ) > 0 ) {
if ( cqd - > queue . num_items ( ) > 0 ) {
iteration_deadline = 0 ;
iteration_deadline = 0 ;
}
}
}
}
if ( gpr_atm_acq_load ( & cqd - > pending_events ) = = 0 ) {
if ( cqd - > pending_events . Load ( grpc_core : : MemoryOrder : : ACQUIRE ) = = 0 ) {
/* Before returning, check if the queue has any items left over (since
/* Before returning, check if the queue has any items left over (since
gpr_mpscq_pop ( ) can sometimes return NULL even if the queue is not
gpr_mpscq_pop ( ) can sometimes return NULL even if the queue is not
empty . If so , keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
empty . If so , keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
if ( cq_event_queue_num_items ( & cq d - > queue ) > 0 ) {
if ( cqd - > queue . num_items ( ) > 0 ) {
/* Go to the beginning of the loop. No point doing a poll because
/* Go to the beginning of the loop. No point doing a poll because
( cq - > shutdown = = true ) is only possible when there is no pending
( cq - > shutdown = = true ) is only possible when there is no pending
work ( i . e cq - > pending_events = = 0 ) and any outstanding completion
work ( i . e cq - > pending_events = = 0 ) and any outstanding completion
@ -1049,8 +1035,8 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
is_finished_arg . first_loop = false ;
is_finished_arg . first_loop = false ;
}
}
if ( cq_event_queue_num_items ( & cq d - > queue ) > 0 & &
if ( cqd - > queue . num_items ( ) > 0 & &
gpr_atm_acq_load ( & cqd - > pending_events ) > 0 ) {
cqd - > pending_events . Load ( grpc_core : : MemoryOrder : : ACQUIRE ) > 0 ) {
gpr_mu_lock ( cq - > mu ) ;
gpr_mu_lock ( cq - > mu ) ;
cq - > poller_vtable - > kick ( POLLSET_FROM_CQ ( cq ) , nullptr ) ;
cq - > poller_vtable - > kick ( POLLSET_FROM_CQ ( cq ) , nullptr ) ;
gpr_mu_unlock ( cq - > mu ) ;
gpr_mu_unlock ( cq - > mu ) ;
@ -1074,7 +1060,7 @@ static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
cq_next_data * cqd = static_cast < cq_next_data * > DATA_FROM_CQ ( cq ) ;
cq_next_data * cqd = static_cast < cq_next_data * > DATA_FROM_CQ ( cq ) ;
GPR_ASSERT ( cqd - > shutdown_called ) ;
GPR_ASSERT ( cqd - > shutdown_called ) ;
GPR_ASSERT ( gpr_atm_no_barrier_load ( & cqd - > pending_events ) = = 0 ) ;
GPR_ASSERT ( cqd - > pending_events . Load ( grpc_core : : MemoryOrder : : RELAXED ) = = 0 ) ;
cq - > poller_vtable - > shutdown ( POLLSET_FROM_CQ ( cq ) , & cq - > pollset_shutdown_done ) ;
cq - > poller_vtable - > shutdown ( POLLSET_FROM_CQ ( cq ) , & cq - > pollset_shutdown_done ) ;
}
}
@ -1096,10 +1082,10 @@ static void cq_shutdown_next(grpc_completion_queue* cq) {
return ;
return ;
}
}
cqd - > shutdown_called = true ;
cqd - > shutdown_called = true ;
/* Doing a full_fetch_add (i.e acq/release) here to match with
/* Doing acq/release FetchSub here to match with
* cq_begin_op_for_next and cq_end_op_for_next functions which read / write
* cq_begin_op_for_next and cq_end_op_for_next functions which read / write
* on this counter without necessarily holding a lock on cq */
* on this counter without necessarily holding a lock on cq */
if ( gpr_atm_full_fetch_add ( & cqd - > pending_events , - 1 ) = = 1 ) {
if ( cqd - > pending_events . FetchSub ( 1 , grpc_core : : MemoryOrder : : ACQ_REL ) = = 1 ) {
cq_finish_shutdown_next ( cq ) ;
cq_finish_shutdown_next ( cq ) ;
}
}
gpr_mu_unlock ( cq - > mu ) ;
gpr_mu_unlock ( cq - > mu ) ;
@ -1148,12 +1134,12 @@ class ExecCtxPluck : public grpc_core::ExecCtx {
GPR_ASSERT ( a - > stolen_completion = = nullptr ) ;
GPR_ASSERT ( a - > stolen_completion = = nullptr ) ;
gpr_atm current_last_seen_things_queued_ever =
gpr_atm current_last_seen_things_queued_ever =
gpr_atm_no_barrier_load ( & cqd - > things_queued_ever ) ;
cqd - > things_queued_ever . Load ( grpc_core : : MemoryOrder : : RELAXED ) ;
if ( current_last_seen_things_queued_ever ! =
if ( current_last_seen_things_queued_ever ! =
a - > last_seen_things_queued_ever ) {
a - > last_seen_things_queued_ever ) {
gpr_mu_lock ( cq - > mu ) ;
gpr_mu_lock ( cq - > mu ) ;
a - > last_seen_things_queued_ever =
a - > last_seen_things_queued_ever =
gpr_atm_no_barrier_load ( & cqd - > things_queued_ever ) ;
cqd - > things_queued_ever . Load ( grpc_core : : MemoryOrder : : RELAXED ) ;
grpc_cq_completion * c ;
grpc_cq_completion * c ;
grpc_cq_completion * prev = & cqd - > completed_head ;
grpc_cq_completion * prev = & cqd - > completed_head ;
while ( ( c = ( grpc_cq_completion * ) ( prev - > next &
while ( ( c = ( grpc_cq_completion * ) ( prev - > next &
@ -1209,7 +1195,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_mu_lock ( cq - > mu ) ;
gpr_mu_lock ( cq - > mu ) ;
grpc_millis deadline_millis = grpc_timespec_to_millis_round_up ( deadline ) ;
grpc_millis deadline_millis = grpc_timespec_to_millis_round_up ( deadline ) ;
cq_is_finished_arg is_finished_arg = {
cq_is_finished_arg is_finished_arg = {
gpr_atm_no_barrier_load ( & cqd - > things_queued_ever ) ,
cqd - > things_queued_ever . Load ( grpc_core : : MemoryOrder : : RELAXED ) ,
cq ,
cq ,
deadline_millis ,
deadline_millis ,
nullptr ,
nullptr ,
@ -1246,7 +1232,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
}
}
prev = c ;
prev = c ;
}
}
if ( gpr_atm_no_barrier_load ( & cqd - > shutdown ) ) {
if ( cqd - > shutdown . Load ( grpc_core : : MemoryOrder : : RELAXED ) ) {
gpr_mu_unlock ( cq - > mu ) ;
gpr_mu_unlock ( cq - > mu ) ;
memset ( & ret , 0 , sizeof ( ret ) ) ;
memset ( & ret , 0 , sizeof ( ret ) ) ;
ret . type = GRPC_QUEUE_SHUTDOWN ;
ret . type = GRPC_QUEUE_SHUTDOWN ;
@ -1309,8 +1295,8 @@ static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
cq_pluck_data * cqd = static_cast < cq_pluck_data * > DATA_FROM_CQ ( cq ) ;
cq_pluck_data * cqd = static_cast < cq_pluck_data * > DATA_FROM_CQ ( cq ) ;
GPR_ASSERT ( cqd - > shutdown_called ) ;
GPR_ASSERT ( cqd - > shutdown_called ) ;
GPR_ASSERT ( ! gpr_atm_no_barrier_load ( & cqd - > shutdown ) ) ;
GPR_ASSERT ( ! cqd - > shutdown . Load ( grpc_core : : MemoryOrder : : RELAXED ) ) ;
gpr_atm_no_barrier_store ( & cqd - > shutdown , 1 ) ;
cqd - > shutdown . Store ( 1 , grpc_core : : MemoryOrder : : RELAXED ) ;
cq - > poller_vtable - > shutdown ( POLLSET_FROM_CQ ( cq ) , & cq - > pollset_shutdown_done ) ;
cq - > poller_vtable - > shutdown ( POLLSET_FROM_CQ ( cq ) , & cq - > pollset_shutdown_done ) ;
}
}
@ -1334,7 +1320,7 @@ static void cq_shutdown_pluck(grpc_completion_queue* cq) {
return ;
return ;
}
}
cqd - > shutdown_called = true ;
cqd - > shutdown_called = true ;
if ( gpr_atm_full_fetch_add ( & cqd - > pending_events , - 1 ) = = 1 ) {
if ( cqd - > pending_events . FetchSub ( 1 , grpc_core : : MemoryOrder : : ACQ_REL ) = = 1 ) {
cq_finish_shutdown_pluck ( cq ) ;
cq_finish_shutdown_pluck ( cq ) ;
}
}
gpr_mu_unlock ( cq - > mu ) ;
gpr_mu_unlock ( cq - > mu ) ;
@ -1368,7 +1354,7 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) {
return ;
return ;
}
}
cqd - > shutdown_called = true ;
cqd - > shutdown_called = true ;
if ( gpr_atm_full_fetch_add ( & cqd - > pending_events , - 1 ) = = 1 ) {
if ( cqd - > pending_events . FetchSub ( 1 , grpc_core : : MemoryOrder : : ACQ_REL ) = = 1 ) {
gpr_mu_unlock ( cq - > mu ) ;
gpr_mu_unlock ( cq - > mu ) ;
cq_finish_shutdown_callback ( cq ) ;
cq_finish_shutdown_callback ( cq ) ;
} else {
} else {