@ -127,6 +127,11 @@ struct channel_data {
grpc_iomgr_closure finish_destroy_channel_closure ;
} ;
typedef struct shutdown_tag {
void * tag ;
grpc_completion_queue * cq ;
} shutdown_tag ;
struct grpc_server {
size_t channel_filter_count ;
const grpc_channel_filter * * channel_filters ;
@ -137,14 +142,14 @@ struct grpc_server {
size_t cq_count ;
gpr_mu mu ;
gpr_cv cv ;
registered_method * registered_methods ;
requested_call_array requested_calls ;
gpr_uint8 shutdown ;
gpr_uint8 shutdown_published ;
size_t num_shutdown_tags ;
void * * shutdown_tags ;
shutdown_tag * shutdown_tags ;
call_data * lists [ CALL_LIST_COUNT ] ;
channel_data root_channel_data ;
@ -261,29 +266,32 @@ static void server_ref(grpc_server *server) {
gpr_ref ( & server - > internal_refcount ) ;
}
static void server_unref ( grpc_server * server ) {
static void server_delete ( grpc_server * server ) {
registered_method * rm ;
size_t i ;
grpc_channel_args_destroy ( server - > channel_args ) ;
gpr_mu_destroy ( & server - > mu ) ;
gpr_free ( server - > channel_filters ) ;
requested_call_array_destroy ( & server - > requested_calls ) ;
while ( ( rm = server - > registered_methods ) ! = NULL ) {
server - > registered_methods = rm - > next ;
gpr_free ( rm - > method ) ;
gpr_free ( rm - > host ) ;
requested_call_array_destroy ( & rm - > requested ) ;
gpr_free ( rm ) ;
}
for ( i = 0 ; i < server - > cq_count ; i + + ) {
grpc_cq_internal_unref ( server - > cqs [ i ] ) ;
}
gpr_free ( server - > cqs ) ;
gpr_free ( server - > pollsets ) ;
gpr_free ( server - > shutdown_tags ) ;
gpr_free ( server ) ;
}
static void server_unref ( grpc_server * server ) {
if ( gpr_unref ( & server - > internal_refcount ) ) {
grpc_channel_args_destroy ( server - > channel_args ) ;
gpr_mu_destroy ( & server - > mu ) ;
gpr_cv_destroy ( & server - > cv ) ;
gpr_free ( server - > channel_filters ) ;
requested_call_array_destroy ( & server - > requested_calls ) ;
while ( ( rm = server - > registered_methods ) ! = NULL ) {
server - > registered_methods = rm - > next ;
gpr_free ( rm - > method ) ;
gpr_free ( rm - > host ) ;
requested_call_array_destroy ( & rm - > requested ) ;
gpr_free ( rm ) ;
}
for ( i = 0 ; i < server - > cq_count ; i + + ) {
grpc_cq_internal_unref ( server - > cqs [ i ] ) ;
}
gpr_free ( server - > cqs ) ;
gpr_free ( server - > pollsets ) ;
gpr_free ( server - > shutdown_tags ) ;
gpr_free ( server ) ;
server_delete ( server ) ;
}
}
@ -378,6 +386,26 @@ static void kill_zombie(void *elem, int success) {
grpc_call_destroy ( grpc_call_from_top_element ( elem ) ) ;
}
static int num_listeners ( grpc_server * server ) {
listener * l ;
int n = 0 ;
for ( l = server - > listeners ; l ; l = l - > next ) {
n + + ;
}
return n ;
}
static void maybe_finish_shutdown ( grpc_server * server ) {
size_t i ;
if ( server - > shutdown & & ! server - > shutdown_published & & server - > lists [ ALL_CALLS ] = = NULL & & server - > listeners_destroyed = = num_listeners ( server ) ) {
server - > shutdown_published = 1 ;
for ( i = 0 ; i < server - > num_shutdown_tags ; i + + ) {
grpc_cq_end_op ( server - > shutdown_tags [ i ] . cq , server - > shutdown_tags [ i ] . tag ,
NULL , 1 ) ;
}
}
}
static grpc_mdelem * server_filter ( void * user_data , grpc_mdelem * md ) {
grpc_call_element * elem = user_data ;
channel_data * chand = elem - > channel_data ;
@ -441,6 +469,9 @@ static void server_on_recv(void *ptr, int success) {
grpc_iomgr_add_callback ( & calld - > kill_zombie_closure ) ;
}
if ( call_list_remove ( calld , ALL_CALLS ) ) {
maybe_finish_shutdown ( chand - > server ) ;
}
gpr_mu_unlock ( & chand - > server - > mu ) ;
break ;
}
@ -539,19 +570,15 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem ( grpc_call_element * elem ) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
size_t i , j ;
int removed [ CALL_LIST_COUNT ] ;
size_t i ;
gpr_mu_lock ( & chand - > server - > mu ) ;
for ( i = 0 ; i < CALL_LIST_COUNT ; i + + ) {
call_list_remove ( elem - > call_data , i ) ;
removed [ i ] = call_list_remove ( elem - > call_data , i ) ;
}
if ( chand - > server - > shutdown & & chand - > server - > lists [ ALL_CALLS ] = = NULL ) {
for ( i = 0 ; i < chand - > server - > num_shutdown_tags ; i + + ) {
for ( j = 0 ; j < chand - > server - > cq_count ; j + + ) {
grpc_cq_end_op ( chand - > server - > cqs [ j ] , chand - > server - > shutdown_tags [ i ] ,
NULL , 1 ) ;
}
}
if ( removed [ ALL_CALLS ] ) {
maybe_finish_shutdown ( chand - > server ) ;
}
gpr_mu_unlock ( & chand - > server - > mu ) ;
@ -646,7 +673,6 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
memset ( server , 0 , sizeof ( grpc_server ) ) ;
gpr_mu_init ( & server - > mu ) ;
gpr_cv_init ( & server - > cv ) ;
/* decremented by grpc_server_destroy */
gpr_ref_init ( & server - > internal_refcount , 1 ) ;
@ -806,38 +832,28 @@ grpc_transport_setup_result grpc_server_setup_transport(
return result ;
}
static int num_listeners ( grpc_server * server ) {
listener * l ;
int n = 0 ;
for ( l = server - > listeners ; l ; l = l - > next ) {
n + + ;
}
return n ;
}
static void shutdown_internal ( grpc_server * server , gpr_uint8 have_shutdown_tag ,
void * shutdown_tag ) {
void grpc_server_shutdown_and_notify ( grpc_server * server ,
grpc_completion_queue * cq , void * tag ) {
listener * l ;
requested_call_array requested_calls ;
channel_data * * channels ;
channel_data * c ;
size_t nchannels ;
size_t i , j ;
size_t i ;
grpc_channel_op op ;
grpc_channel_element * elem ;
registered_method * rm ;
shutdown_tag * sdt ;
/* lock, and gather up some stuff to do */
gpr_mu_lock ( & server - > mu ) ;
if ( have_shutdown_tag ) {
for ( i = 0 ; i < server - > cq_count ; i + + ) {
grpc_cq_begin_op ( server - > cqs [ i ] , NULL ) ;
}
server - > shutdown_tags =
gpr_realloc ( server - > shutdown_tags ,
sizeof ( void * ) * ( server - > num_shutdown_tags + 1 ) ) ;
server - > shutdown_tags [ server - > num_shutdown_tags + + ] = shutdown_tag ;
}
grpc_cq_begin_op ( cq , NULL ) ;
server - > shutdown_tags =
gpr_realloc ( server - > shutdown_tags ,
sizeof ( shutdown_tag ) * ( server - > num_shutdown_tags + 1 ) ) ;
sdt = & server - > shutdown_tags [ server - > num_shutdown_tags + + ] ;
sdt - > tag = tag ;
sdt - > cq = cq ;
if ( server - > shutdown ) {
gpr_mu_unlock ( & server - > mu ) ;
return ;
@ -878,13 +894,7 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
}
server - > shutdown = 1 ;
if ( server - > lists [ ALL_CALLS ] = = NULL ) {
for ( i = 0 ; i < server - > num_shutdown_tags ; i + + ) {
for ( j = 0 ; j < server - > cq_count ; j + + ) {
grpc_cq_end_op ( server - > cqs [ j ] , server - > shutdown_tags [ i ] , NULL , 1 ) ;
}
}
}
maybe_finish_shutdown ( server ) ;
gpr_mu_unlock ( & server - > mu ) ;
for ( i = 0 ; i < nchannels ; i + + ) {
@ -914,46 +924,64 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
}
}
void grpc_server_shutdown ( grpc_server * server ) {
shutdown_internal ( server , 0 , NULL ) ;
}
void grpc_server_shutdown_and_notify ( grpc_server * server , void * tag ) {
shutdown_internal ( server , 1 , tag ) ;
}
void grpc_server_listener_destroy_done ( void * s ) {
grpc_server * server = s ;
gpr_mu_lock ( & server - > mu ) ;
server - > listeners_destroyed + + ;
gpr_cv_signal ( & server - > cv ) ;
maybe_finish_shutdown ( server ) ;
gpr_mu_unlock ( & server - > mu ) ;
}
void grpc_server_destroy ( grpc_server * server ) {
channel_data * c ;
listener * l ;
size_t i ;
void grpc_server_cancel_all_calls ( grpc_server * server ) {
call_data * calld ;
grpc_call * * calls ;
size_t call_count ;
size_t call_capacity ;
int is_first = 1 ;
size_t i ;
gpr_mu_lock ( & server - > mu ) ;
if ( ! server - > shutdown ) {
GPR_ASSERT ( server - > shutdown ) ;
if ( ! server - > lists [ ALL_CALLS ] ) {
gpr_mu_unlock ( & server - > mu ) ;
grpc_server_shutdown ( server ) ;
gpr_mu_lock ( & server - > mu ) ;
return ;
}
while ( server - > listeners_destroyed ! = num_listeners ( server ) ) {
for ( i = 0 ; i < server - > cq_count ; i + + ) {
gpr_mu_unlock ( & server - > mu ) ;
grpc_cq_hack_spin_pollset ( server - > cqs [ i ] ) ;
gpr_mu_lock ( & server - > mu ) ;
call_capacity = 8 ;
call_count = 0 ;
calls = gpr_malloc ( sizeof ( grpc_call * ) * call_capacity ) ;
for ( calld = server - > lists [ ALL_CALLS ] ; calld ! = server - > lists [ ALL_CALLS ] | | is_first ; calld = calld - > links [ ALL_CALLS ] . next ) {
if ( call_count = = call_capacity ) {
call_capacity * = 2 ;
calls = gpr_realloc ( calls , sizeof ( grpc_call * ) * call_capacity ) ;
}
calls [ call_count + + ] = calld - > call ;
GRPC_CALL_INTERNAL_REF ( calld - > call , " cancel_all " ) ;
is_first = 0 ;
}
gpr_mu_unlock ( & server - > mu ) ;
gpr_cv_wait ( & server - > cv , & server - > mu ,
gpr_time_add ( gpr_now ( ) , gpr_time_from_millis ( 100 ) ) ) ;
for ( i = 0 ; i < call_count ; i + + ) {
grpc_call_cancel_with_status ( calls [ i ] , GRPC_STATUS_UNAVAILABLE , " Unavailable " ) ;
GRPC_CALL_INTERNAL_UNREF ( calls [ i ] , " cancel_all " , 1 ) ;
}
gpr_free ( calls ) ;
}
void grpc_server_destroy ( grpc_server * server ) {
channel_data * c ;
listener * l ;
call_data * calld ;
gpr_mu_lock ( & server - > mu ) ;
GPR_ASSERT ( server - > shutdown | | ! server - > listeners ) ;
GPR_ASSERT ( server - > listeners_destroyed = = num_listeners ( server ) ) ;
while ( server - > listeners ) {
l = server - > listeners ;
server - > listeners = l - > next ;
@ -962,10 +990,6 @@ void grpc_server_destroy(grpc_server *server) {
while ( ( calld = call_list_remove_head ( & server - > lists [ PENDING_START ] ,
PENDING_START ) ) ! = NULL ) {
/* TODO(dgq): If we knew the size of the call list (or an upper bound), we
* could allocate all the memory for the closures in advance in a single
* chunk */
gpr_log ( GPR_DEBUG , " server destroys call %p " , calld - > call ) ;
calld - > state = ZOMBIED ;
grpc_iomgr_closure_init (
& calld - > kill_zombie_closure , kill_zombie ,
@ -1111,6 +1135,7 @@ static void begin_call(grpc_server *server, call_data *calld,
rc - > data . batch . details - > deadline = calld - > deadline ;
r - > op = GRPC_IOREQ_RECV_INITIAL_METADATA ;
r - > data . recv_metadata = rc - > data . batch . initial_metadata ;
r - > flags = 0 ;
r + + ;
publish = publish_registered_or_batch ;
break ;
@ -1118,10 +1143,12 @@ static void begin_call(grpc_server *server, call_data *calld,
* rc - > data . registered . deadline = calld - > deadline ;
r - > op = GRPC_IOREQ_RECV_INITIAL_METADATA ;
r - > data . recv_metadata = rc - > data . registered . initial_metadata ;
r - > flags = 0 ;
r + + ;
if ( rc - > data . registered . optional_payload ) {
r - > op = GRPC_IOREQ_RECV_MESSAGE ;
r - > data . recv_message = rc - > data . registered . optional_payload ;
r - > flags = 0 ;
r + + ;
}
publish = publish_registered_or_batch ;