@ -47,7 +47,8 @@
# include "src/core/lib/iomgr/executor.h"
# include "src/core/lib/iomgr/iomgr.h"
# include "src/core/lib/slice/slice_internal.h"
# include "src/core/lib/support/stack_lockfree.h"
# include "src/core/lib/support/mpscq.h"
# include "src/core/lib/support/spinlock.h"
# include "src/core/lib/support/string.h"
# include "src/core/lib/surface/api_trace.h"
# include "src/core/lib/surface/call.h"
@ -76,6 +77,7 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
int grpc_server_channel_trace = 0 ;
typedef struct requested_call {
gpr_mpscq_node request_link ; /* must be first */
requested_call_type type ;
size_t cq_idx ;
void * tag ;
@ -175,7 +177,7 @@ struct request_matcher {
grpc_server * server ;
call_data * pending_head ;
call_data * pending_tail ;
gpr_stack_lockfree * * requests_per_cq ;
gpr_locked_mpscq * requests_per_cq ;
} ;
struct registered_method {
@ -220,11 +222,6 @@ struct grpc_server {
registered_method * registered_methods ;
/** one request matcher for unregistered methods */
request_matcher unregistered_request_matcher ;
/** free list of available requested_calls_per_cq indices */
gpr_stack_lockfree * * request_freelist_per_cq ;
/** requested call backing data */
requested_call * * requested_calls_per_cq ;
int max_requested_calls_per_cq ;
gpr_atm shutdown_flag ;
uint8_t shutdown_published ;
@ -324,21 +321,20 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
* request_matcher
*/
static void request_matcher_init ( request_matcher * rm , size_t entries ,
grpc_server * server ) {
static void request_matcher_init ( request_matcher * rm , grpc_server * server ) {
memset ( rm , 0 , sizeof ( * rm ) ) ;
rm - > server = server ;
rm - > requests_per_cq =
gpr_malloc ( sizeof ( * rm - > requests_per_cq ) * server - > cq_count ) ;
for ( size_t i = 0 ; i < server - > cq_count ; i + + ) {
rm - > requests_per_cq [ i ] = gpr_stack_lockfree_create ( entries ) ;
gpr_locked_mpscq_init ( & rm - > requests_per_cq [ i ] ) ;
}
}
static void request_matcher_destroy ( request_matcher * rm ) {
for ( size_t i = 0 ; i < rm - > server - > cq_count ; i + + ) {
GPR_ASSERT ( gpr_stack_lockfree_pop ( rm - > requests_per_cq [ i ] ) = = - 1 ) ;
gpr_stack_lockfree_destroy ( rm - > requests_per_cq [ i ] ) ;
GPR_ASSERT ( gpr_locked_mpscq_pop ( & rm - > requests_per_cq [ i ] ) = = NULL ) ;
gpr_locked_mpscq_destroy ( & rm - > requests_per_cq [ i ] ) ;
}
gpr_free ( rm - > requests_per_cq ) ;
}
@ -368,13 +364,17 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
grpc_server * server ,
request_matcher * rm ,
grpc_error * error ) {
int request_id ;
requested_call * rc ;
for ( size_t i = 0 ; i < server - > cq_count ; i + + ) {
while ( ( request_id = gpr_stack_lockfree_pop ( rm - > requests_per_cq [ i ] ) ) ! =
- 1 ) {
fail_call ( exec_ctx , server , i ,
& server - > requested_calls_per_cq [ i ] [ request_id ] ,
GRPC_ERROR_REF ( error ) ) ;
/* Here we know:
1. no requests are being added ( since the server is shut down )
2. no other threads are pulling ( since the shut down process is single
threaded )
So , we can ignore the queue lock and just pop , with the guarantee that a
NULL returned here truly means that the queue is empty */
while ( ( rc = ( requested_call * ) gpr_mpscq_pop (
& rm - > requests_per_cq [ i ] . queue ) ) ! = NULL ) {
fail_call ( exec_ctx , server , i , rc , GRPC_ERROR_REF ( error ) ) ;
}
}
GRPC_ERROR_UNREF ( error ) ;
@ -409,13 +409,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
}
for ( i = 0 ; i < server - > cq_count ; i + + ) {
GRPC_CQ_INTERNAL_UNREF ( server - > cqs [ i ] , " server " ) ;
if ( server - > started ) {
gpr_stack_lockfree_destroy ( server - > request_freelist_per_cq [ i ] ) ;
gpr_free ( server - > requested_calls_per_cq [ i ] ) ;
}
}
gpr_free ( server - > request_freelist_per_cq ) ;
gpr_free ( server - > requested_calls_per_cq ) ;
gpr_free ( server - > cqs ) ;
gpr_free ( server - > pollsets ) ;
gpr_free ( server - > shutdown_tags ) ;
@ -473,21 +467,7 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand,
static void done_request_event ( grpc_exec_ctx * exec_ctx , void * req ,
grpc_cq_completion * c ) {
requested_call * rc = req ;
grpc_server * server = rc - > server ;
if ( rc > = server - > requested_calls_per_cq [ rc - > cq_idx ] & &
rc < server - > requested_calls_per_cq [ rc - > cq_idx ] +
server - > max_requested_calls_per_cq ) {
GPR_ASSERT ( rc - server - > requested_calls_per_cq [ rc - > cq_idx ] < = INT_MAX ) ;
gpr_stack_lockfree_push (
server - > request_freelist_per_cq [ rc - > cq_idx ] ,
( int ) ( rc - server - > requested_calls_per_cq [ rc - > cq_idx ] ) ) ;
} else {
gpr_free ( req ) ;
}
server_unref ( exec_ctx , server ) ;
gpr_free ( req ) ;
}
static void publish_call ( grpc_exec_ctx * exec_ctx , grpc_server * server ,
@ -516,10 +496,6 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
GPR_UNREACHABLE_CODE ( return ) ;
}
grpc_call_element * elem =
grpc_call_stack_element ( grpc_call_get_call_stack ( call ) , 0 ) ;
channel_data * chand = elem - > channel_data ;
server_ref ( chand - > server ) ;
grpc_cq_end_op ( exec_ctx , calld - > cq_new , rc - > tag , GRPC_ERROR_NONE ,
done_request_event , rc , & rc - > completion ) ;
}
@ -547,15 +523,15 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
for ( size_t i = 0 ; i < server - > cq_count ; i + + ) {
size_t cq_idx = ( chand - > cq_idx + i ) % server - > cq_count ;
int request_id = gpr_stack_lockfree_pop ( rm - > requests_per_cq [ cq_idx ] ) ;
if ( request_id = = - 1 ) {
requested_call * rc =
( requested_call * ) gpr_locked_mpscq_pop ( & rm - > requests_per_cq [ cq_idx ] ) ;
if ( rc = = NULL ) {
continue ;
} else {
gpr_mu_lock ( & calld - > mu_state ) ;
calld - > state = ACTIVATED ;
gpr_mu_unlock ( & calld - > mu_state ) ;
publish_call ( exec_ctx , server , calld , cq_idx ,
& server - > requested_calls_per_cq [ cq_idx ] [ request_id ] ) ;
publish_call ( exec_ctx , server , calld , cq_idx , rc ) ;
return ; /* early out */
}
}
@ -1029,8 +1005,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
server - > root_channel_data . next = server - > root_channel_data . prev =
& server - > root_channel_data ;
/* TODO(ctiller): expose a channel_arg for this */
server - > max_requested_calls_per_cq = 32768 ;
server - > channel_args = grpc_channel_args_copy ( args ) ;
return server ;
@ -1103,29 +1077,15 @@ void grpc_server_start(grpc_server *server) {
server - > started = true ;
server - > pollset_count = 0 ;
server - > pollsets = gpr_malloc ( sizeof ( grpc_pollset * ) * server - > cq_count ) ;
server - > request_freelist_per_cq =
gpr_malloc ( sizeof ( * server - > request_freelist_per_cq ) * server - > cq_count ) ;
server - > requested_calls_per_cq =
gpr_malloc ( sizeof ( * server - > requested_calls_per_cq ) * server - > cq_count ) ;
for ( i = 0 ; i < server - > cq_count ; i + + ) {
if ( grpc_cq_can_listen ( server - > cqs [ i ] ) ) {
server - > pollsets [ server - > pollset_count + + ] =
grpc_cq_pollset ( server - > cqs [ i ] ) ;
}
server - > request_freelist_per_cq [ i ] =
gpr_stack_lockfree_create ( ( size_t ) server - > max_requested_calls_per_cq ) ;
for ( int j = 0 ; j < server - > max_requested_calls_per_cq ; j + + ) {
gpr_stack_lockfree_push ( server - > request_freelist_per_cq [ i ] , j ) ;
}
server - > requested_calls_per_cq [ i ] =
gpr_malloc ( ( size_t ) server - > max_requested_calls_per_cq *
sizeof ( * server - > requested_calls_per_cq [ i ] ) ) ;
}
request_matcher_init ( & server - > unregistered_request_matcher ,
( size_t ) server - > max_requested_calls_per_cq , server ) ;
request_matcher_init ( & server - > unregistered_request_matcher , server ) ;
for ( registered_method * rm = server - > registered_methods ; rm ; rm = rm - > next ) {
request_matcher_init ( & rm - > request_matcher ,
( size_t ) server - > max_requested_calls_per_cq , server ) ;
request_matcher_init ( & rm - > request_matcher , server ) ;
}
server_ref ( server ) ;
@ -1379,21 +1339,11 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
requested_call * rc ) {
call_data * calld = NULL ;
request_matcher * rm = NULL ;
int request_id ;
if ( gpr_atm_acq_load ( & server - > shutdown_flag ) ) {
fail_call ( exec_ctx , server , cq_idx , rc ,
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Server Shutdown " ) ) ;
return GRPC_CALL_OK ;
}
request_id = gpr_stack_lockfree_pop ( server - > request_freelist_per_cq [ cq_idx ] ) ;
if ( request_id = = - 1 ) {
/* out of request ids: just fail this one */
fail_call ( exec_ctx , server , cq_idx , rc ,
grpc_error_set_int (
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Out of request ids " ) ,
GRPC_ERROR_INT_LIMIT , server - > max_requested_calls_per_cq ) ) ;
return GRPC_CALL_OK ;
}
switch ( rc - > type ) {
case BATCH_CALL :
rm = & server - > unregistered_request_matcher ;
@ -1402,15 +1352,13 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
rm = & rc - > data . registered . registered_method - > request_matcher ;
break ;
}
server - > requested_calls_per_cq [ cq_idx ] [ request_id ] = * rc ;
gpr_free ( rc ) ;
if ( gpr_stack_lockfree_push ( rm - > requests_per_cq [ cq_idx ] , request_id ) ) {
if ( gpr_locked_mpscq_push ( & rm - > requests_per_cq [ cq_idx ] , & rc - > request_link ) ) {
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock ( & server - > mu_call ) ;
while ( ( calld = rm - > pending_head ) ! = NULL ) {
request_id = gpr_stack_lockfree_pop ( rm - > requests_per_cq [ cq_idx ] ) ;
if ( request_id = = - 1 ) break ;
rc = ( requested_call * ) gpr_locked_mpscq_pop ( & rm - > requests_per_cq [ cq_idx ] ) ;
if ( rc = = NULL ) break ;
rm - > pending_head = calld - > pending_next ;
gpr_mu_unlock ( & server - > mu_call ) ;
gpr_mu_lock ( & calld - > mu_state ) ;
@ -1426,8 +1374,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GPR_ASSERT ( calld - > state = = PENDING ) ;
calld - > state = ACTIVATED ;
gpr_mu_unlock ( & calld - > mu_state ) ;
publish_call ( exec_ctx , server , calld , cq_idx ,
& server - > requested_calls_per_cq [ cq_idx ] [ request_id ] ) ;
publish_call ( exec_ctx , server , calld , cq_idx , rc ) ;
}
gpr_mu_lock ( & server - > mu_call ) ;
}
@ -1534,7 +1481,6 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
rc - > initial_metadata - > count = 0 ;
GPR_ASSERT ( error ! = GRPC_ERROR_NONE ) ;
server_ref ( server ) ;
grpc_cq_end_op ( exec_ctx , server - > cqs [ cq_idx ] , rc - > tag , error ,
done_request_event , rc , & rc - > completion ) ;
}