@ -53,7 +53,7 @@ typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
typedef struct listener {
void * arg ;
void ( * start ) ( grpc_server * server , void * arg , grpc_pollset * pollset ) ;
void ( * start ) ( grpc_server * server , void * arg , grpc_pollset * * pollsets , size_t pollset_coun t ) ;
void ( * destroy ) ( grpc_server * server , void * arg ) ;
struct listener * next ;
} listener ;
@ -101,6 +101,7 @@ struct registered_method {
char * host ;
call_data * pending ;
requested_call_array requested ;
grpc_completion_queue * cq ;
registered_method * next ;
} ;
@ -127,7 +128,11 @@ struct grpc_server {
size_t channel_filter_count ;
const grpc_channel_filter * * channel_filters ;
grpc_channel_args * channel_args ;
grpc_completion_queue * cq ;
grpc_completion_queue * unregistered_cq ;
grpc_completion_queue * * cqs ;
grpc_pollset * * pollsets ;
size_t cq_count ;
gpr_mu mu ;
@ -169,6 +174,7 @@ struct call_data {
grpc_mdstr * host ;
legacy_data * legacy ;
grpc_completion_queue * cq_new ;
call_data * * root [ CALL_LIST_COUNT ] ;
call_link links [ CALL_LIST_COUNT ] ;
@ -496,7 +502,7 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem ( grpc_call_element * elem ) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
in t i ;
size_ t i ;
gpr_mu_lock ( & chand - > server - > mu ) ;
for ( i = 0 ; i < CALL_LIST_COUNT ; i + + ) {
@ -504,7 +510,9 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
if ( chand - > server - > shutdown & & chand - > server - > have_shutdown_tag & &
chand - > server - > lists [ ALL_CALLS ] = = NULL ) {
grpc_cq_end_server_shutdown ( chand - > server - > cq , chand - > server - > shutdown_tag ) ;
for ( i = 0 ; i < chand - > server - > cq_count ; i + + ) {
grpc_cq_end_server_shutdown ( chand - > server - > cqs [ i ] , chand - > server - > shutdown_tag ) ;
}
}
gpr_mu_unlock ( & chand - > server - > mu ) ;
@ -557,6 +565,16 @@ static const grpc_channel_filter server_surface_filter = {
sizeof ( channel_data ) , init_channel_elem , destroy_channel_elem , " server " ,
} ;
static void addcq ( grpc_server * server , grpc_completion_queue * cq ) {
size_t i , n ;
for ( i = 0 ; i < server - > cq_count ; i + + ) {
if ( server - > cqs [ i ] = = cq ) return ;
}
n = server - > cq_count + + ;
server - > cqs = gpr_realloc ( server - > cqs , server - > cq_count * sizeof ( grpc_completion_queue * ) ) ;
server - > cqs [ n ] = cq ;
}
grpc_server * grpc_server_create_from_filters ( grpc_completion_queue * cq ,
grpc_channel_filter * * filters ,
size_t filter_count ,
@ -566,10 +584,11 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
grpc_server * server = gpr_malloc ( sizeof ( grpc_server ) ) ;
memset ( server , 0 , sizeof ( grpc_server ) ) ;
if ( cq ) addcq ( server , cq ) ;
gpr_mu_init ( & server - > mu ) ;
server - > cq = cq ;
server - > unregistered_ cq = cq ;
/* decremented by grpc_server_destroy */
gpr_ref_init ( & server - > internal_refcount , 1 ) ;
server - > root_channel_data . next = server - > root_channel_data . prev =
@ -605,7 +624,7 @@ static int streq(const char *a, const char *b) {
}
void * grpc_server_register_method ( grpc_server * server , const char * method ,
const char * host ) {
const char * host , grpc_completion_queue * cq_new_rpc ) {
registered_method * m ;
if ( ! method ) {
gpr_log ( GPR_ERROR , " %s method string cannot be NULL " , __FUNCTION__ ) ;
@ -618,20 +637,28 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
return NULL ;
}
}
addcq ( server , cq_new_rpc ) ;
m = gpr_malloc ( sizeof ( registered_method ) ) ;
memset ( m , 0 , sizeof ( * m ) ) ;
m - > method = gpr_strdup ( method ) ;
m - > host = gpr_strdup ( host ) ;
m - > next = server - > registered_methods ;
m - > cq = cq_new_rpc ;
server - > registered_methods = m ;
return m ;
}
void grpc_server_start ( grpc_server * server ) {
listener * l ;
size_t i ;
server - > pollsets = gpr_malloc ( sizeof ( grpc_pollset * ) * server - > cq_count ) ;
for ( i = 0 ; i < server - > cq_count ; i + + ) {
server - > pollsets [ i ] = grpc_cq_pollset ( server - > cqs [ i ] ) ;
}
for ( l = server - > listeners ; l ; l = l - > next ) {
l - > start ( server , l - > arg , grpc_cq_pollset ( server - > cq ) ) ;
l - > start ( server , l - > arg , server - > pollsets , server - > cq_count ) ;
}
}
@ -664,7 +691,9 @@ grpc_transport_setup_result grpc_server_setup_transport(
}
filters [ i ] = & grpc_connected_channel_filter ;
grpc_transport_add_to_pollset ( transport , grpc_cq_pollset ( s - > cq ) ) ;
for ( i = 0 ; i < s - > cq_count ; i + + ) {
grpc_transport_add_to_pollset ( transport , grpc_cq_pollset ( s - > cqs [ i ] ) ) ;
}
channel = grpc_channel_create_from_filters ( filters , num_filters ,
s - > channel_args , mdctx , 0 ) ;
@ -765,9 +794,11 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
server - > have_shutdown_tag = have_shutdown_tag ;
server - > shutdown_tag = shutdown_tag ;
if ( have_shutdown_tag ) {
grpc_cq_begin_op ( server - > cq , NULL , GRPC_SERVER_SHUTDOWN ) ;
if ( server - > lists [ ALL_CALLS ] = = NULL ) {
grpc_cq_end_server_shutdown ( server - > cq , shutdown_tag ) ;
for ( i = 0 ; i < server - > cq_count ; i + + ) {
grpc_cq_begin_op ( server - > cqs [ i ] , NULL , GRPC_SERVER_SHUTDOWN ) ;
if ( server - > lists [ ALL_CALLS ] = = NULL ) {
grpc_cq_end_server_shutdown ( server - > cqs [ i ] , shutdown_tag ) ;
}
}
}
gpr_mu_unlock ( & server - > mu ) ;
@ -826,7 +857,7 @@ void grpc_server_destroy(grpc_server *server) {
void grpc_server_add_listener ( grpc_server * server , void * arg ,
void ( * start ) ( grpc_server * server , void * arg ,
grpc_pollset * pollset ) ,
grpc_pollset * * pollsets , size_t pollset_coun t ) ,
void ( * destroy ) ( grpc_server * server , void * arg ) ) {
listener * l = gpr_malloc ( sizeof ( listener ) ) ;
l - > arg = arg ;
@ -878,7 +909,7 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_completion_queue * cq_bind ,
void * tag ) {
requested_call rc ;
grpc_cq_begin_op ( server - > cq , NULL , GRPC_OP_COMPLETE ) ;
grpc_cq_begin_op ( server - > unregistered_ cq, NULL , GRPC_OP_COMPLETE ) ;
rc . type = BATCH_CALL ;
rc . tag = tag ;
rc . data . batch . cq_bind = cq_bind ;
@ -889,12 +920,13 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
}
grpc_call_error grpc_server_request_registered_call (
grpc_server * server , void * registered_ method , grpc_call * * call ,
grpc_server * server , void * rm , grpc_call * * call ,
gpr_timespec * deadline , grpc_metadata_array * initial_metadata ,
grpc_byte_buffer * * optional_payload , grpc_completion_queue * cq_bind ,
void * tag ) {
requested_call rc ;
grpc_cq_begin_op ( server - > cq , NULL , GRPC_OP_COMPLETE ) ;
registered_method * registered_method = rm ;
grpc_cq_begin_op ( registered_method - > cq , NULL , GRPC_OP_COMPLETE ) ;
rc . type = REGISTERED_CALL ;
rc . tag = tag ;
rc . data . registered . cq_bind = cq_bind ;
@ -909,7 +941,7 @@ grpc_call_error grpc_server_request_registered_call(
grpc_call_error grpc_server_request_call_old ( grpc_server * server ,
void * tag_new ) {
requested_call rc ;
grpc_cq_begin_op ( server - > cq , NULL , GRPC_SERVER_RPC_NEW ) ;
grpc_cq_begin_op ( server - > unregistered_ cq, NULL , GRPC_SERVER_RPC_NEW ) ;
rc . type = LEGACY_CALL ;
rc . tag = tag_new ;
return queue_call_request ( server , & rc ) ;
@ -965,6 +997,7 @@ static void begin_call(grpc_server *server, call_data *calld,
r - > op = GRPC_IOREQ_RECV_INITIAL_METADATA ;
r - > data . recv_metadata = rc - > data . batch . initial_metadata ;
r + + ;
calld - > cq_new = server - > unregistered_cq ;
publish = publish_registered_or_batch ;
break ;
case REGISTERED_CALL :
@ -979,6 +1012,7 @@ static void begin_call(grpc_server *server, call_data *calld,
r - > data . recv_message = rc - > data . registered . optional_payload ;
r + + ;
}
calld - > cq_new = rc - > data . registered . registered_method - > cq ;
publish = publish_registered_or_batch ;
break ;
}
@ -991,19 +1025,19 @@ static void begin_call(grpc_server *server, call_data *calld,
static void fail_call ( grpc_server * server , requested_call * rc ) {
switch ( rc - > type ) {
case LEGACY_CALL :
grpc_cq_end_new_rpc ( server - > cq , rc - > tag , NULL , do_nothing , NULL , NULL ,
grpc_cq_end_new_rpc ( server - > unregistered_ cq, rc - > tag , NULL , do_nothing , NULL , NULL ,
NULL , gpr_inf_past , 0 , NULL ) ;
break ;
case BATCH_CALL :
* rc - > data . batch . call = NULL ;
rc - > data . batch . initial_metadata - > count = 0 ;
grpc_cq_end_op_complete ( server - > cq , rc - > tag , NULL , do_nothing , NULL ,
grpc_cq_end_op_complete ( server - > unregistered_ cq, rc - > tag , NULL , do_nothing , NULL ,
GRPC_OP_ERROR ) ;
break ;
case REGISTERED_CALL :
* rc - > data . registered . call = NULL ;
rc - > data . registered . initial_metadata - > count = 0 ;
grpc_cq_end_op_complete ( s erv er- > cq , rc - > tag , NULL , do_nothing , NULL ,
grpc_cq_end_op_complete ( rc - > data . regist ered . registered_method - > cq , rc - > tag , NULL , do_nothing , NULL ,
GRPC_OP_ERROR ) ;
break ;
}
@ -1017,7 +1051,7 @@ static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) {
grpc_server * server = chand - > server ;
if ( status = = GRPC_OP_OK ) {
grpc_cq_end_new_rpc ( server - > cq , tag , call , do_nothing , NULL ,
grpc_cq_end_new_rpc ( server - > unregistered_ cq, tag , call , do_nothing , NULL ,
grpc_mdstr_as_c_string ( calld - > path ) ,
grpc_mdstr_as_c_string ( calld - > host ) , calld - > deadline ,
calld - > legacy - > initial_metadata . count ,
@ -1032,9 +1066,8 @@ static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
void * tag ) {
grpc_call_element * elem =
grpc_call_stack_element ( grpc_call_get_call_stack ( call ) , 0 ) ;
channel_data * chand = elem - > channel_data ;
grpc_server * server = chand - > server ;
grpc_cq_end_op_complete ( server - > cq , tag , call , do_nothing , NULL , status ) ;
call_data * calld = elem - > call_data ;
grpc_cq_end_op_complete ( calld - > cq_new , tag , call , do_nothing , NULL , status ) ;
}
const grpc_channel_args * grpc_server_get_channel_args ( grpc_server * server ) {