@ -36,18 +36,19 @@
# include <stdio.h>
# include <string.h>
# include "src/core/iomgr/timer.h"
# include <grpc/support/alloc.h>
# include <grpc/support/atm.h>
# include <grpc/support/log.h>
# include <grpc/support/time.h>
# include "src/core/iomgr/pollset.h"
# include "src/core/iomgr/timer.h"
# include "src/core/profiling/timers.h"
# include "src/core/support/string.h"
# include "src/core/surface/api_trace.h"
# include "src/core/surface/call.h"
# include "src/core/surface/event_string.h"
# include "src/core/surface/surface_trace.h"
# include "src/core/profiling/timers.h"
# include <grpc/support/alloc.h>
# include <grpc/support/atm.h>
# include <grpc/support/log.h>
# include <grpc/support/time.h>
typedef struct {
grpc_pollset_worker * * worker ;
@ -56,6 +57,7 @@ typedef struct {
/* Completion queue structure */
struct grpc_completion_queue {
gpr_mu mu ;
/** completed events */
grpc_cq_completion completed_head ;
grpc_cq_completion * completed_tail ;
@ -63,8 +65,6 @@ struct grpc_completion_queue {
gpr_refcount pending_events ;
/** Once owning_refs drops to zero, we will destroy the cq */
gpr_refcount owning_refs ;
/** the set of low level i/o things that concern this cq */
grpc_pollset pollset ;
/** 0 initially, 1 once we've begun shutting down */
int shutdown ;
int shutdown_called ;
@ -82,6 +82,8 @@ struct grpc_completion_queue {
grpc_completion_queue * next_free ;
} ;
# define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1))
static gpr_mu g_freelist_mu ;
grpc_completion_queue * g_freelist ;
@ -94,7 +96,8 @@ void grpc_cq_global_shutdown(void) {
gpr_mu_destroy ( & g_freelist_mu ) ;
while ( g_freelist ) {
grpc_completion_queue * next = g_freelist - > next_free ;
grpc_pollset_destroy ( & g_freelist - > pollset ) ;
grpc_pollset_destroy ( POLLSET_FROM_CQ ( g_freelist ) ) ;
gpr_mu_destroy ( & g_freelist - > mu ) ;
# ifndef NDEBUG
gpr_free ( g_freelist - > outstanding_tags ) ;
# endif
@ -124,8 +127,9 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
if ( g_freelist = = NULL ) {
gpr_mu_unlock ( & g_freelist_mu ) ;
cc = gpr_malloc ( sizeof ( grpc_completion_queue ) ) ;
grpc_pollset_init ( & cc - > pollset ) ;
cc = gpr_malloc ( sizeof ( grpc_completion_queue ) + grpc_pollset_size ( ) ) ;
gpr_mu_init ( & cc - > mu ) ;
grpc_pollset_init ( POLLSET_FROM_CQ ( cc ) , & cc - > mu ) ;
# ifndef NDEBUG
cc - > outstanding_tags = NULL ;
cc - > outstanding_tag_capacity = 0 ;
@ -184,7 +188,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
# endif
if ( gpr_unref ( & cc - > owning_refs ) ) {
GPR_ASSERT ( cc - > completed_head . next = = ( uintptr_t ) & cc - > completed_head ) ;
grpc_pollset_reset ( & cc - > pollset ) ;
grpc_pollset_reset ( POLLSET_FROM_CQ ( cc ) ) ;
gpr_mu_lock ( & g_freelist_mu ) ;
cc - > next_free = g_freelist ;
g_freelist = cc ;
@ -194,7 +198,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) {
void grpc_cq_begin_op ( grpc_completion_queue * cc , void * tag ) {
# ifndef NDEBUG
gpr_mu_lock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_lock ( & cc - > mu ) ;
GPR_ASSERT ( ! cc - > shutdown_called ) ;
if ( cc - > outstanding_tag_count = = cc - > outstanding_tag_capacity ) {
cc - > outstanding_tag_capacity = GPR_MAX ( 4 , 2 * cc - > outstanding_tag_capacity ) ;
@ -203,7 +207,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) {
cc - > outstanding_tag_capacity ) ;
}
cc - > outstanding_tags [ cc - > outstanding_tag_count + + ] = tag ;
gpr_mu_unlock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_unlock ( & cc - > mu ) ;
# endif
gpr_ref ( & cc - > pending_events ) ;
}
@ -231,7 +235,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
storage - > next =
( ( uintptr_t ) & cc - > completed_head ) | ( ( uintptr_t ) ( success ! = 0 ) ) ;
gpr_mu_lock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_lock ( & cc - > mu ) ;
# ifndef NDEBUG
for ( i = 0 ; i < ( int ) cc - > outstanding_tag_count ; i + + ) {
if ( cc - > outstanding_tags [ i ] = = tag ) {
@ -256,8 +260,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
break ;
}
}
grpc_pollset_kick ( & cc - > pollset , pluck_worker ) ;
gpr_mu_unlock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
grpc_pollset_kick ( POLLSET_FROM_CQ ( cc ) , pluck_worker ) ;
gpr_mu_unlock ( & cc - > mu ) ;
} else {
cc - > completed_tail - > next =
( ( uintptr_t ) storage ) | ( 1u & ( uintptr_t ) cc - > completed_tail - > next ) ;
@ -265,8 +269,9 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GPR_ASSERT ( ! cc - > shutdown ) ;
GPR_ASSERT ( cc - > shutdown_called ) ;
cc - > shutdown = 1 ;
grpc_pollset_shutdown ( exec_ctx , & cc - > pollset , & cc - > pollset_shutdown_done ) ;
gpr_mu_unlock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
grpc_pollset_shutdown ( exec_ctx , POLLSET_FROM_CQ ( cc ) ,
& cc - > pollset_shutdown_done ) ;
gpr_mu_unlock ( & cc - > mu ) ;
}
GPR_TIMER_END ( " grpc_cq_end_op " , 0 ) ;
@ -294,7 +299,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
deadline = gpr_convert_clock_type ( deadline , GPR_CLOCK_MONOTONIC ) ;
GRPC_CQ_INTERNAL_REF ( cc , " next " ) ;
gpr_mu_lock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_lock ( & cc - > mu ) ;
for ( ; ; ) {
if ( cc - > completed_tail ! = & cc - > completed_head ) {
grpc_cq_completion * c = ( grpc_cq_completion * ) cc - > completed_head . next ;
@ -302,7 +307,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
if ( c = = cc - > completed_tail ) {
cc - > completed_tail = & cc - > completed_head ;
}
gpr_mu_unlock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_unlock ( & cc - > mu ) ;
ret . type = GRPC_OP_COMPLETE ;
ret . success = c - > next & 1u ;
ret . tag = c - > tag ;
@ -310,14 +315,14 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
break ;
}
if ( cc - > shutdown ) {
gpr_mu_unlock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_unlock ( & cc - > mu ) ;
memset ( & ret , 0 , sizeof ( ret ) ) ;
ret . type = GRPC_QUEUE_SHUTDOWN ;
break ;
}
now = gpr_now ( GPR_CLOCK_MONOTONIC ) ;
if ( ! first_loop & & gpr_time_cmp ( now , deadline ) > = 0 ) {
gpr_mu_unlock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_unlock ( & cc - > mu ) ;
memset ( & ret , 0 , sizeof ( ret ) ) ;
ret . type = GRPC_QUEUE_TIMEOUT ;
break ;
@ -330,12 +335,12 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec iteration_deadline = deadline ;
if ( grpc_timer_check ( & exec_ctx , now , & iteration_deadline ) ) {
GPR_TIMER_MARK ( " alarm_triggered " , 0 ) ;
gpr_mu_unlock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_unlock ( & cc - > mu ) ;
grpc_exec_ctx_flush ( & exec_ctx ) ;
gpr_mu_lock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_lock ( & cc - > mu ) ;
continue ;
}
grpc_pollset_work ( & exec_ctx , & cc - > pollset , & worker , now ,
grpc_pollset_work ( & exec_ctx , POLLSET_FROM_CQ ( cc ) , & worker , now ,
iteration_deadline ) ;
}
GRPC_SURFACE_TRACE_RETURNED_EVENT ( cc , & ret ) ;
@ -395,7 +400,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
deadline = gpr_convert_clock_type ( deadline , GPR_CLOCK_MONOTONIC ) ;
GRPC_CQ_INTERNAL_REF ( cc , " pluck " ) ;
gpr_mu_lock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_lock ( & cc - > mu ) ;
for ( ; ; ) {
prev = & cc - > completed_head ;
while ( ( c = ( grpc_cq_completion * ) ( prev - > next & ~ ( uintptr_t ) 1 ) ) ! =
@ -405,7 +410,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
if ( c = = cc - > completed_tail ) {
cc - > completed_tail = prev ;
}
gpr_mu_unlock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_unlock ( & cc - > mu ) ;
ret . type = GRPC_OP_COMPLETE ;
ret . success = c - > next & 1u ;
ret . tag = c - > tag ;
@ -415,7 +420,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
prev = c ;
}
if ( cc - > shutdown ) {
gpr_mu_unlock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_unlock ( & cc - > mu ) ;
memset ( & ret , 0 , sizeof ( ret ) ) ;
ret . type = GRPC_QUEUE_SHUTDOWN ;
break ;
@ -425,7 +430,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
" Too many outstanding grpc_completion_queue_pluck calls: maximum "
" is %d " ,
GRPC_MAX_COMPLETION_QUEUE_PLUCKERS ) ;
gpr_mu_unlock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_unlock ( & cc - > mu ) ;
memset ( & ret , 0 , sizeof ( ret ) ) ;
/* TODO(ctiller): should we use a different result here */
ret . type = GRPC_QUEUE_TIMEOUT ;
@ -434,7 +439,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
now = gpr_now ( GPR_CLOCK_MONOTONIC ) ;
if ( ! first_loop & & gpr_time_cmp ( now , deadline ) > = 0 ) {
del_plucker ( cc , tag , & worker ) ;
gpr_mu_unlock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_unlock ( & cc - > mu ) ;
memset ( & ret , 0 , sizeof ( ret ) ) ;
ret . type = GRPC_QUEUE_TIMEOUT ;
break ;
@ -447,12 +452,12 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec iteration_deadline = deadline ;
if ( grpc_timer_check ( & exec_ctx , now , & iteration_deadline ) ) {
GPR_TIMER_MARK ( " alarm_triggered " , 0 ) ;
gpr_mu_unlock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_unlock ( & cc - > mu ) ;
grpc_exec_ctx_flush ( & exec_ctx ) ;
gpr_mu_lock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_lock ( & cc - > mu ) ;
continue ;
}
grpc_pollset_work ( & exec_ctx , & cc - > pollset , & worker , now ,
grpc_pollset_work ( & exec_ctx , POLLSET_FROM_CQ ( cc ) , & worker , now ,
iteration_deadline ) ;
del_plucker ( cc , tag , & worker ) ;
}
@ -472,9 +477,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT ;
GPR_TIMER_BEGIN ( " grpc_completion_queue_shutdown " , 0 ) ;
GRPC_API_TRACE ( " grpc_completion_queue_shutdown(cc=%p) " , 1 , ( cc ) ) ;
gpr_mu_lock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_lock ( & cc - > mu ) ;
if ( cc - > shutdown_called ) {
gpr_mu_unlock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_unlock ( & cc - > mu ) ;
GPR_TIMER_END ( " grpc_completion_queue_shutdown " , 0 ) ;
return ;
}
@ -482,9 +487,10 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
if ( gpr_unref ( & cc - > pending_events ) ) {
GPR_ASSERT ( ! cc - > shutdown ) ;
cc - > shutdown = 1 ;
grpc_pollset_shutdown ( & exec_ctx , & cc - > pollset , & cc - > pollset_shutdown_done ) ;
grpc_pollset_shutdown ( & exec_ctx , POLLSET_FROM_CQ ( cc ) ,
& cc - > pollset_shutdown_done ) ;
}
gpr_mu_unlock ( GRPC_POLLSET_MU ( & cc - > pollset ) ) ;
gpr_mu_unlock ( & cc - > mu ) ;
grpc_exec_ctx_finish ( & exec_ctx ) ;
GPR_TIMER_END ( " grpc_completion_queue_shutdown " , 0 ) ;
}
@ -498,7 +504,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
}
grpc_pollset * grpc_cq_pollset ( grpc_completion_queue * cc ) {
return & cc - > pollset ;
return POLLSET_FROM_CQ ( cc ) ;
}
void grpc_cq_mark_server_cq ( grpc_completion_queue * cc ) { cc - > is_server_cq = 1 ; }