@ -36,22 +36,22 @@
# include <stdlib.h>
# include <string.h>
# include <grpc/support/alloc.h>
# include <grpc/support/log.h>
# include <grpc/support/string_util.h>
# include <grpc/support/useful.h>
# include "src/core/channel/census_filter.h"
# include "src/core/channel/channel_args.h"
# include "src/core/channel/connected_channel.h"
# include "src/core/iomgr/iomgr.h"
# include "src/core/support/stack_lockfree.h"
# include "src/core/support/string.h"
# include "src/core/surface/call.h"
# include "src/core/surface/channel.h"
# include "src/core/surface/completion_queue.h"
# include "src/core/surface/init.h"
# include "src/core/transport/metadata.h"
# include <grpc/support/alloc.h>
# include <grpc/support/log.h>
# include <grpc/support/string_util.h>
# include <grpc/support/useful.h>
typedef enum { PENDING_START , CALL_LIST_COUNT } call_list ;
typedef struct listener {
void * arg ;
@ -72,12 +72,14 @@ typedef struct {
typedef enum { BATCH_CALL , REGISTERED_CALL } requested_call_type ;
typedef struct {
typedef struct requested_call {
requested_call_type type ;
void * tag ;
grpc_server * server ;
grpc_completion_queue * cq_bound_to_call ;
grpc_completion_queue * cq_for_notification ;
grpc_call * * call ;
grpc_cq_completion completion ;
union {
struct {
grpc_call_details * details ;
@ -92,20 +94,6 @@ typedef struct {
} data ;
} requested_call ;
typedef struct {
requested_call * calls ;
size_t count ;
size_t capacity ;
} requested_call_array ;
struct registered_method {
char * method ;
char * host ;
call_data * pending ;
requested_call_array requested ;
registered_method * next ;
} ;
typedef struct channel_registered_method {
registered_method * server_registered_method ;
grpc_mdstr * method ;
@ -131,46 +119,9 @@ struct channel_data {
typedef struct shutdown_tag {
void * tag ;
grpc_completion_queue * cq ;
grpc_cq_completion completion ;
} shutdown_tag ;
struct grpc_server {
size_t channel_filter_count ;
const grpc_channel_filter * * channel_filters ;
grpc_channel_args * channel_args ;
grpc_completion_queue * * cqs ;
grpc_pollset * * pollsets ;
size_t cq_count ;
/* The two following mutexes control access to server-state
mu_global controls access to non - call - related state ( e . g . , channel state )
mu_call controls access to call - related state ( e . g . , the call lists )
If they are ever required to be nested , you must lock mu_global
before mu_call . This is currently used in shutdown processing
( grpc_server_shutdown_and_notify and maybe_finish_shutdown ) */
gpr_mu mu_global ; /* mutex for server and channel state */
gpr_mu mu_call ; /* mutex for call-specific state */
registered_method * registered_methods ;
requested_call_array requested_calls ;
gpr_uint8 shutdown ;
gpr_uint8 shutdown_published ;
size_t num_shutdown_tags ;
shutdown_tag * shutdown_tags ;
call_data * lists [ CALL_LIST_COUNT ] ;
channel_data root_channel_data ;
listener * listeners ;
int listeners_destroyed ;
gpr_refcount internal_refcount ;
/** when did we print the last shutdown progress message */
gpr_timespec last_shutdown_message_time ;
} ;
typedef enum {
/* waiting for metadata */
NOT_STARTED ,
@ -182,6 +133,8 @@ typedef enum {
ZOMBIED
} call_state ;
typedef struct request_matcher request_matcher ;
struct call_data {
grpc_call * call ;
@ -204,8 +157,20 @@ struct call_data {
grpc_iomgr_closure server_on_recv ;
grpc_iomgr_closure kill_zombie_closure ;
call_data * * root [ CALL_LIST_COUNT ] ;
call_link links [ CALL_LIST_COUNT ] ;
call_data * pending_next ;
} ;
struct request_matcher {
call_data * pending_head ;
call_data * pending_tail ;
gpr_stack_lockfree * requests ;
} ;
struct registered_method {
char * method ;
char * host ;
request_matcher request_matcher ;
registered_method * next ;
} ;
typedef struct {
@ -213,6 +178,48 @@ typedef struct {
size_t num_channels ;
} channel_broadcaster ;
struct grpc_server {
size_t channel_filter_count ;
const grpc_channel_filter * * channel_filters ;
grpc_channel_args * channel_args ;
grpc_completion_queue * * cqs ;
grpc_pollset * * pollsets ;
size_t cq_count ;
/* The two following mutexes control access to server-state
mu_global controls access to non - call - related state ( e . g . , channel state )
mu_call controls access to call - related state ( e . g . , the call lists )
If they are ever required to be nested , you must lock mu_global
before mu_call . This is currently used in shutdown processing
( grpc_server_shutdown_and_notify and maybe_finish_shutdown ) */
gpr_mu mu_global ; /* mutex for server and channel state */
gpr_mu mu_call ; /* mutex for call-specific state */
registered_method * registered_methods ;
request_matcher unregistered_request_matcher ;
/** free list of available requested_calls indices */
gpr_stack_lockfree * request_freelist ;
/** requested call backing data */
requested_call * requested_calls ;
int max_requested_calls ;
gpr_atm shutdown_flag ;
gpr_uint8 shutdown_published ;
size_t num_shutdown_tags ;
shutdown_tag * shutdown_tags ;
channel_data root_channel_data ;
listener * listeners ;
int listeners_destroyed ;
gpr_refcount internal_refcount ;
/** when did we print the last shutdown progress message */
gpr_timespec last_shutdown_message_time ;
} ;
# define SERVER_FROM_CALL_ELEM(elem) \
( ( ( channel_data * ) ( elem ) - > channel_data ) - > server )
@ -223,7 +230,9 @@ static void fail_call(grpc_server *server, requested_call *rc);
hold mu_call */
static void maybe_finish_shutdown ( grpc_server * server ) ;
/* channel broadcaster */
/*
* channel broadcaster
*/
/* assumes server locked */
static void channel_broadcaster_init ( grpc_server * s , channel_broadcaster * cb ) {
@ -273,7 +282,8 @@ static void send_shutdown(grpc_channel *channel, int send_goaway,
}
static void channel_broadcaster_shutdown ( channel_broadcaster * cb ,
int send_goaway , int force_disconnect ) {
int send_goaway ,
int force_disconnect ) {
size_t i ;
for ( i = 0 ; i < cb - > num_channels ; i + + ) {
@ -283,71 +293,44 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb,
gpr_free ( cb - > channels ) ;
}
/* call list */
static int call_list_join ( call_data * * root , call_data * call , call_list list ) {
GPR_ASSERT ( ! call - > root [ list ] ) ;
call - > root [ list ] = root ;
if ( ! * root ) {
* root = call ;
call - > links [ list ] . next = call - > links [ list ] . prev = call ;
} else {
call - > links [ list ] . next = * root ;
call - > links [ list ] . prev = ( * root ) - > links [ list ] . prev ;
call - > links [ list ] . next - > links [ list ] . prev =
call - > links [ list ] . prev - > links [ list ] . next = call ;
}
return 1 ;
}
/*
* request_matcher
*/
static call_data * call_list_remove_head ( call_data * * root , call_list list ) {
call_data * out = * root ;
if ( out ) {
out - > root [ list ] = NULL ;
if ( out - > links [ list ] . next = = out ) {
* root = NULL ;
} else {
* root = out - > links [ list ] . next ;
out - > links [ list ] . next - > links [ list ] . prev = out - > links [ list ] . prev ;
out - > links [ list ] . prev - > links [ list ] . next = out - > links [ list ] . next ;
}
}
return out ;
static void request_matcher_init ( request_matcher * request_matcher ,
int entries ) {
memset ( request_matcher , 0 , sizeof ( * request_matcher ) ) ;
request_matcher - > requests = gpr_stack_lockfree_create ( entries ) ;
}
static int call_list_remove ( call_data * call , call_list list ) {
call_data * * root = call - > root [ list ] ;
if ( root = = NULL ) return 0 ;
call - > root [ list ] = NULL ;
if ( * root = = call ) {
* root = call - > links [ list ] . next ;
if ( * root = = call ) {
* root = NULL ;
return 1 ;
}
}
GPR_ASSERT ( * root ! = call ) ;
call - > links [ list ] . next - > links [ list ] . prev = call - > links [ list ] . prev ;
call - > links [ list ] . prev - > links [ list ] . next = call - > links [ list ] . next ;
return 1 ;
static void request_matcher_destroy ( request_matcher * request_matcher ) {
GPR_ASSERT ( gpr_stack_lockfree_pop ( request_matcher - > requests ) = = - 1 ) ;
gpr_stack_lockfree_destroy ( request_matcher - > requests ) ;
}
static void requested_call_array_destroy ( requested_call_array * array ) {
gpr_free ( array - > calls ) ;
static void kill_zombie ( void * elem , int success ) {
grpc_call_destroy ( grpc_call_from_top_element ( elem ) ) ;
}
static requested_call * requested_call_array_add ( requested_call_array * array ) {
requested_call * rc ;
if ( array - > count = = array - > capacity ) {
array - > capacity = GPR_MAX ( array - > capacity + 8 , array - > capacity * 2 ) ;
array - > calls =
gpr_realloc ( array - > calls , sizeof ( requested_call ) * array - > capacity ) ;
static void request_matcher_zombify_all_pending_calls (
request_matcher * request_matcher ) {
while ( request_matcher - > pending_head ) {
call_data * calld = request_matcher - > pending_head ;
request_matcher - > pending_head = calld - > pending_next ;
gpr_mu_lock ( & calld - > mu_state ) ;
calld - > state = ZOMBIED ;
gpr_mu_unlock ( & calld - > mu_state ) ;
grpc_iomgr_closure_init (
& calld - > kill_zombie_closure , kill_zombie ,
grpc_call_stack_element ( grpc_call_get_call_stack ( calld - > call ) , 0 ) ) ;
grpc_iomgr_add_callback ( & calld - > kill_zombie_closure ) ;
}
rc = & array - > calls [ array - > count + + ] ;
memset ( rc , 0 , sizeof ( * rc ) ) ;
return rc ;
}
/*
* server proper
*/
static void server_ref ( grpc_server * server ) {
gpr_ref ( & server - > internal_refcount ) ;
}
@ -359,20 +342,22 @@ static void server_delete(grpc_server *server) {
gpr_mu_destroy ( & server - > mu_global ) ;
gpr_mu_destroy ( & server - > mu_call ) ;
gpr_free ( server - > channel_filters ) ;
requested_call_array_destroy ( & server - > requested_calls ) ;
while ( ( rm = server - > registered_methods ) ! = NULL ) {
server - > registered_methods = rm - > next ;
request_matcher_destroy ( & rm - > request_matcher ) ;
gpr_free ( rm - > method ) ;
gpr_free ( rm - > host ) ;
requested_call_array_destroy ( & rm - > requested ) ;
gpr_free ( rm ) ;
}
for ( i = 0 ; i < server - > cq_count ; i + + ) {
GRPC_CQ_INTERNAL_UNREF ( server - > cqs [ i ] , " server " ) ;
}
request_matcher_destroy ( & server - > unregistered_request_matcher ) ;
gpr_stack_lockfree_destroy ( server - > request_freelist ) ;
gpr_free ( server - > cqs ) ;
gpr_free ( server - > pollsets ) ;
gpr_free ( server - > shutdown_tags ) ;
gpr_free ( server - > requested_calls ) ;
gpr_free ( server ) ;
}
@ -411,24 +396,29 @@ static void destroy_channel(channel_data *chand) {
}
static void finish_start_new_rpc ( grpc_server * server , grpc_call_element * elem ,
call_data * * pending_root ,
requested_call_array * array ) {
requested_call rc ;
request_matcher * request_matcher ) {
call_data * calld = elem - > call_data ;
int request_id ;
request_id = gpr_stack_lockfree_pop ( request_matcher - > requests ) ;
if ( request_id = = - 1 ) {
gpr_mu_lock ( & server - > mu_call ) ;
if ( array - > count = = 0 ) {
gpr_mu_lock ( & calld - > mu_state ) ;
calld - > state = PENDING ;
gpr_mu_unlock ( & calld - > mu_state ) ;
call_list_join ( pending_root , calld , PENDING_START ) ;
if ( request_matcher - > pending_head = = NULL ) {
request_matcher - > pending_tail = request_matcher - > pending_head = calld ;
} else {
request_matcher - > pending_tail - > pending_next = calld ;
request_matcher - > pending_tail = calld ;
}
calld - > pending_next = NULL ;
gpr_mu_unlock ( & server - > mu_call ) ;
} else {
rc = array - > calls [ - - array - > count ] ;
gpr_mu_lock ( & calld - > mu_state ) ;
calld - > state = ACTIVATED ;
gpr_mu_unlock ( & calld - > mu_state ) ;
gpr_mu_unlock ( & server - > mu_call ) ;
begin_call ( server , calld , & rc ) ;
begin_call ( server , calld , & server - > requested_calls [ request_id ] ) ;
}
}
@ -450,8 +440,8 @@ static void start_new_rpc(grpc_call_element *elem) {
if ( ! rm ) break ;
if ( rm - > host ! = calld - > host ) continue ;
if ( rm - > method ! = calld - > path ) continue ;
finish_start_new_rpc ( server , elem , & rm - > server_registered_method - > pending ,
& rm - > server_registered_method - > requested ) ;
finish_start_new_rpc ( server , elem ,
& rm - > server_registered_method - > request_matcher ) ;
return ;
}
/* check for a wildcard method definition (no host set) */
@ -462,17 +452,12 @@ static void start_new_rpc(grpc_call_element *elem) {
if ( ! rm ) break ;
if ( rm - > host ! = NULL ) continue ;
if ( rm - > method ! = calld - > path ) continue ;
finish_start_new_rpc ( server , elem , & rm - > server_registered_method - > pending ,
& rm - > server_registered_method - > requested ) ;
finish_start_new_rpc ( server , elem ,
& rm - > server_registered_method - > request_matcher ) ;
return ;
}
}
finish_start_new_rpc ( server , elem , & server - > lists [ PENDING_START ] ,
& server - > requested_calls ) ;
}
static void kill_zombie ( void * elem , int success ) {
grpc_call_destroy ( grpc_call_from_top_element ( elem ) ) ;
finish_start_new_rpc ( server , elem , & server - > unregistered_request_matcher ) ;
}
static int num_listeners ( grpc_server * server ) {
@ -484,6 +469,10 @@ static int num_listeners(grpc_server *server) {
return n ;
}
static void done_shutdown_event ( void * server , grpc_cq_completion * completion ) {
server_unref ( server ) ;
}
static int num_channels ( grpc_server * server ) {
channel_data * chand ;
int n = 0 ;
@ -496,16 +485,16 @@ static int num_channels(grpc_server *server) {
static void maybe_finish_shutdown ( grpc_server * server ) {
size_t i ;
if ( ! server - > shutdown | | server - > shutdown_published ) {
if ( ! gpr_atm_acq_load ( & server - > shutdown_flag ) | | server - > shutdown_published ) {
return ;
}
if ( server - > root_channel_data . next ! = & server - > root_channel_data | |
server - > listeners_destroyed < num_listeners ( server ) ) {
if ( gpr_time_cmp (
gpr_time_sub ( gpr_now ( ) , server - > last_shutdown_message_time ) ,
gpr_time_from_seconds ( 1 ) ) > = 0 ) {
server - > last_shutdown_message_time = gpr_now ( ) ;
if ( gpr_time_cmp ( gpr_time_sub ( gpr_now ( GPR_CLOCK_REALTIME ) ,
server - > last_shutdown_message_time ) ,
gpr_time_from_seconds ( 1 , GPR_TIMESPAN ) ) > = 0 ) {
server - > last_shutdown_message_time = gpr_now ( GPR_CLOCK_REALTIME ) ;
gpr_log ( GPR_DEBUG ,
" Waiting for %d channels and %d/%d listeners to be destroyed "
" before shutting down server " ,
@ -517,8 +506,10 @@ static void maybe_finish_shutdown(grpc_server *server) {
}
server - > shutdown_published = 1 ;
for ( i = 0 ; i < server - > num_shutdown_tags ; i + + ) {
grpc_cq_end_op ( server - > shutdown_tags [ i ] . cq , server - > shutdown_tags [ i ] . tag ,
NULL , 1 ) ;
server_ref ( server ) ;
grpc_cq_end_op ( server - > shutdown_tags [ i ] . cq , server - > shutdown_tags [ i ] . tag , 1 ,
done_shutdown_event , server ,
& server - > shutdown_tags [ i ] . completion ) ;
}
}
@ -539,7 +530,6 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
static void server_on_recv ( void * ptr , int success ) {
grpc_call_element * elem = ptr ;
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
if ( success & & ! calld - > got_initial_metadata ) {
size_t i ;
@ -549,7 +539,8 @@ static void server_on_recv(void *ptr, int success) {
grpc_stream_op * op = & ops [ i ] ;
if ( op - > type ! = GRPC_OP_METADATA ) continue ;
grpc_metadata_batch_filter ( & op - > data . metadata , server_filter , elem ) ;
if ( 0 ! = gpr_time_cmp ( op - > data . metadata . deadline , gpr_inf_future ) ) {
if ( 0 ! = gpr_time_cmp ( op - > data . metadata . deadline ,
gpr_inf_future ( GPR_CLOCK_REALTIME ) ) ) {
calld - > deadline = op - > data . metadata . deadline ;
}
calld - > got_initial_metadata = 1 ;
@ -584,11 +575,8 @@ static void server_on_recv(void *ptr, int success) {
} else if ( calld - > state = = PENDING ) {
calld - > state = ZOMBIED ;
gpr_mu_unlock ( & calld - > mu_state ) ;
gpr_mu_lock ( & chand - > server - > mu_call ) ;
call_list_remove ( calld , PENDING_START ) ;
gpr_mu_unlock ( & chand - > server - > mu_call ) ;
grpc_iomgr_closure_init ( & calld - > kill_zombie_closure , kill_zombie , elem ) ;
grpc_iomgr_add_callback ( & calld - > kill_zombie_closure ) ;
/* zombied call will be destroyed when it's removed from the pending
queue . . . later */
} else {
gpr_mu_unlock ( & calld - > mu_state ) ;
}
@ -623,7 +611,7 @@ static void accept_stream(void *cd, grpc_transport *transport,
channel_data * chand = cd ;
/* create a call */
grpc_call_create ( chand - > channel , NULL , transport_server_data , NULL , 0 ,
gpr_inf_future ) ;
gpr_inf_future ( GPR_CLOCK_REALTIME ) ) ;
}
static void channel_connectivity_changed ( void * cd , int iomgr_status_ignored ) {
@ -651,7 +639,7 @@ static void init_call_elem(grpc_call_element *elem,
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
memset ( calld , 0 , sizeof ( call_data ) ) ;
calld - > deadline = gpr_inf_future ;
calld - > deadline = gpr_inf_future ( GPR_CLOCK_REALTIME ) ;
calld - > call = grpc_call_from_top_element ( elem ) ;
gpr_mu_init ( & calld - > mu_state ) ;
@ -666,11 +654,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
if ( calld - > state = = PENDING ) {
gpr_mu_lock ( & chand - > server - > mu_call ) ;
call_list_remove ( elem - > call_data , PENDING_START ) ;
gpr_mu_unlock ( & chand - > server - > mu_call ) ;
}
GPR_ASSERT ( calld - > state ! = PENDING ) ;
if ( calld - > host ) {
GRPC_MDSTR_UNREF ( calld - > host ) ;
@ -777,6 +761,18 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
server - > root_channel_data . next = server - > root_channel_data . prev =
& server - > root_channel_data ;
/* TODO(ctiller): expose a channel_arg for this */
server - > max_requested_calls = 32768 ;
server - > request_freelist =
gpr_stack_lockfree_create ( server - > max_requested_calls ) ;
for ( i = 0 ; i < ( size_t ) server - > max_requested_calls ; i + + ) {
gpr_stack_lockfree_push ( server - > request_freelist , i ) ;
}
request_matcher_init ( & server - > unregistered_request_matcher ,
server - > max_requested_calls ) ;
server - > requested_calls = gpr_malloc ( server - > max_requested_calls *
sizeof ( * server - > requested_calls ) ) ;
/* Server filter stack is:
server_surface_filter - for making surface API calls
@ -824,6 +820,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
}
m = gpr_malloc ( sizeof ( registered_method ) ) ;
memset ( m , 0 , sizeof ( * m ) ) ;
request_matcher_init ( & m - > request_matcher , server - > max_requested_calls ) ;
m - > method = gpr_strdup ( method ) ;
m - > host = gpr_strdup ( host ) ;
m - > next = server - > registered_methods ;
@ -939,64 +936,87 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_transport_perform_op ( transport , & op ) ;
}
typedef struct {
requested_call * * requests ;
size_t count ;
size_t capacity ;
} request_killer ;
static void request_killer_init ( request_killer * rk ) {
memset ( rk , 0 , sizeof ( * rk ) ) ;
}
static void request_killer_add ( request_killer * rk , requested_call * rc ) {
if ( rk - > capacity = = rk - > count ) {
rk - > capacity = GPR_MAX ( 8 , rk - > capacity * 2 ) ;
rk - > requests =
gpr_realloc ( rk - > requests , rk - > capacity * sizeof ( * rk - > requests ) ) ;
}
rk - > requests [ rk - > count + + ] = rc ;
}
static void request_killer_add_request_matcher ( request_killer * rk ,
grpc_server * server ,
request_matcher * rm ) {
int request_id ;
while ( ( request_id = gpr_stack_lockfree_pop ( rm - > requests ) ) ! = - 1 ) {
request_killer_add ( rk , & server - > requested_calls [ request_id ] ) ;
}
}
static void request_killer_run ( request_killer * rk , grpc_server * server ) {
size_t i ;
for ( i = 0 ; i < rk - > count ; i + + ) {
fail_call ( server , rk - > requests [ i ] ) ;
}
gpr_free ( rk - > requests ) ;
}
void grpc_server_shutdown_and_notify ( grpc_server * server ,
grpc_completion_queue * cq , void * tag ) {
listener * l ;
requested_call_array requested_calls ;
size_t i ;
registered_method * rm ;
shutdown_tag * sdt ;
channel_broadcaster broadcaster ;
request_killer reqkill ;
/* lock, and gather up some stuff to do */
gpr_mu_lock ( & server - > mu_global ) ;
grpc_cq_begin_op ( cq , NULL ) ;
grpc_cq_begin_op ( cq ) ;
server - > shutdown_tags =
gpr_realloc ( server - > shutdown_tags ,
sizeof ( shutdown_tag ) * ( server - > num_shutdown_tags + 1 ) ) ;
sdt = & server - > shutdown_tags [ server - > num_shutdown_tags + + ] ;
sdt - > tag = tag ;
sdt - > cq = cq ;
if ( server - > shutdown ) {
if ( gpr_atm_acq_load ( & server - > shutdown_flag ) ) {
gpr_mu_unlock ( & server - > mu_global ) ;
return ;
}
server - > last_shutdown_message_time = gpr_now ( ) ;
server - > last_shutdown_message_time = gpr_now ( GPR_CLOCK_REALTIME ) ;
channel_broadcaster_init ( server , & broadcaster ) ;
request_killer_init ( & reqkill ) ;
/* collect all unregistered then registered calls */
gpr_mu_lock ( & server - > mu_call ) ;
requested_calls = server - > requested_calls ;
memset ( & server - > requested_calls , 0 , sizeof ( server - > requested_calls ) ) ;
request_killer_add_request_matcher ( & reqkill , server ,
& server - > unregistered_request_matcher ) ;
request_matcher_zombify_all_pending_calls (
& server - > unregistered_request_matcher ) ;
for ( rm = server - > registered_methods ; rm ; rm = rm - > next ) {
if ( requested_calls . count + rm - > requested . count >
requested_calls . capacity ) {
requested_calls . capacity =
GPR_MAX ( requested_calls . count + rm - > requested . count ,
2 * requested_calls . capacity ) ;
requested_calls . calls =
gpr_realloc ( requested_calls . calls , sizeof ( * requested_calls . calls ) *
requested_calls . capacity ) ;
}
memcpy ( requested_calls . calls + requested_calls . count , rm - > requested . calls ,
sizeof ( * requested_calls . calls ) * rm - > requested . count ) ;
requested_calls . count + = rm - > requested . count ;
gpr_free ( rm - > requested . calls ) ;
memset ( & rm - > requested , 0 , sizeof ( rm - > requested ) ) ;
request_killer_add_request_matcher ( & reqkill , server , & rm - > request_matcher ) ;
request_matcher_zombify_all_pending_calls ( & rm - > request_matcher ) ;
}
gpr_mu_unlock ( & server - > mu_call ) ;
server - > shutdown = 1 ;
gpr_atm_rel_store ( & server - > shutdown_flag , 1 ) ;
maybe_finish_shutdown ( server ) ;
gpr_mu_unlock ( & server - > mu_global ) ;
/* terminate all the requested calls */
for ( i = 0 ; i < requested_calls . count ; i + + ) {
fail_call ( server , & requested_calls . calls [ i ] ) ;
}
gpr_free ( requested_calls . calls ) ;
request_killer_run ( & reqkill , server ) ;
/* Shutdown listeners */
for ( l = server - > listeners ; l ; l = l - > next ) {
@ -1028,7 +1048,7 @@ void grpc_server_destroy(grpc_server *server) {
listener * l ;
gpr_mu_lock ( & server - > mu_global ) ;
GPR_ASSERT ( server - > shutdown | | ! server - > listeners ) ;
GPR_ASSERT ( gpr_atm_acq_load ( & server - > shutdown_flag ) | | ! server - > listeners ) ;
GPR_ASSERT ( server - > listeners_destroyed = = num_listeners ( server ) ) ;
while ( server - > listeners ) {
@ -1058,38 +1078,55 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
static grpc_call_error queue_call_request ( grpc_server * server ,
requested_call * rc ) {
call_data * calld = NULL ;
requested_call_array * requested_calls = NULL ;
gpr_mu_lock ( & server - > mu_call ) ;
if ( server - > shutdown ) {
gpr_mu_unlock ( & server - > mu_call ) ;
request_matcher * request_matcher = NULL ;
int request_id ;
if ( gpr_atm_acq_load ( & server - > shutdown_flag ) ) {
fail_call ( server , rc ) ;
return GRPC_CALL_OK ;
}
request_id = gpr_stack_lockfree_pop ( server - > request_freelist ) ;
if ( request_id = = - 1 ) {
/* out of request ids: just fail this one */
fail_call ( server , rc ) ;
return GRPC_CALL_OK ;
}
switch ( rc - > type ) {
case BATCH_CALL :
calld =
call_list_remove_head ( & server - > lists [ PENDING_START ] , PENDING_START ) ;
requested_calls = & server - > requested_calls ;
request_matcher = & server - > unregistered_request_matcher ;
break ;
case REGISTERED_CALL :
calld = call_list_remove_head (
& rc - > data . registered . registered_method - > pending , PENDING_START ) ;
requested_calls = & rc - > data . registered . registered_method - > requested ;
request_matcher = & rc - > data . registered . registered_method - > request_matcher ;
break ;
}
if ( calld ! = NULL ) {
server - > requested_calls [ request_id ] = * rc ;
gpr_free ( rc ) ;
if ( gpr_stack_lockfree_push ( request_matcher - > requests , request_id ) ) {
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock ( & server - > mu_call ) ;
while ( ( calld = request_matcher - > pending_head ) ! = NULL ) {
request_id = gpr_stack_lockfree_pop ( request_matcher - > requests ) ;
if ( request_id = = - 1 ) break ;
request_matcher - > pending_head = calld - > pending_next ;
gpr_mu_unlock ( & server - > mu_call ) ;
gpr_mu_lock ( & calld - > mu_state ) ;
if ( calld - > state = = ZOMBIED ) {
gpr_mu_unlock ( & calld - > mu_state ) ;
grpc_iomgr_closure_init (
& calld - > kill_zombie_closure , kill_zombie ,
grpc_call_stack_element ( grpc_call_get_call_stack ( calld - > call ) , 0 ) ) ;
grpc_iomgr_add_callback ( & calld - > kill_zombie_closure ) ;
} else {
GPR_ASSERT ( calld - > state = = PENDING ) ;
calld - > state = ACTIVATED ;
gpr_mu_unlock ( & calld - > mu_state ) ;
begin_call ( server , calld , rc ) ;
return GRPC_CALL_OK ;
} else {
* requested_call_array_add ( requested_calls ) = * rc ;
begin_call ( server , calld , & se rver - > requested_calls [ request_id ] ) ;
}
gpr_mu_lock ( & server - > mu_call ) ;
}
gpr_mu_unlock ( & server - > mu_call ) ;
return GRPC_CALL_OK ;
}
return GRPC_CALL_OK ;
}
grpc_call_error grpc_server_request_call (
@ -1097,22 +1134,24 @@ grpc_call_error grpc_server_request_call(
grpc_metadata_array * initial_metadata ,
grpc_completion_queue * cq_bound_to_call ,
grpc_completion_queue * cq_for_notification , void * tag ) {
requested_call rc ;
requested_call * rc = gpr_malloc ( sizeof ( * rc ) ) ;
GRPC_SERVER_LOG_REQUEST_CALL ( GPR_INFO , server , call , details ,
initial_metadata , cq_bound_to_call ,
cq_for_notification , tag ) ;
if ( ! grpc_cq_is_server_cq ( cq_for_notification ) ) {
gpr_free ( rc ) ;
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE ;
}
grpc_cq_begin_op ( cq_for_notification , NULL ) ;
rc . type = BATCH_CALL ;
rc . tag = tag ;
rc . cq_bound_to_call = cq_bound_to_call ;
rc . cq_for_notification = cq_for_notification ;
rc . call = call ;
rc . data . batch . details = details ;
rc . data . batch . initial_metadata = initial_metadata ;
return queue_call_request ( server , & rc ) ;
grpc_cq_begin_op ( cq_for_notification ) ;
rc - > type = BATCH_CALL ;
rc - > server = server ;
rc - > tag = tag ;
rc - > cq_bound_to_call = cq_bound_to_call ;
rc - > cq_for_notification = cq_for_notification ;
rc - > call = call ;
rc - > data . batch . details = details ;
rc - > data . batch . initial_metadata = initial_metadata ;
return queue_call_request ( server , rc ) ;
}
grpc_call_error grpc_server_request_registered_call (
@ -1120,22 +1159,24 @@ grpc_call_error grpc_server_request_registered_call(
grpc_metadata_array * initial_metadata , grpc_byte_buffer * * optional_payload ,
grpc_completion_queue * cq_bound_to_call ,
grpc_completion_queue * cq_for_notification , void * tag ) {
requested_call rc ;
requested_call * rc = gpr_malloc ( sizeof ( * rc ) ) ;
registered_method * registered_method = rm ;
if ( ! grpc_cq_is_server_cq ( cq_for_notification ) ) {
gpr_free ( rc ) ;
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE ;
}
grpc_cq_begin_op ( cq_for_notification , NULL ) ;
rc . type = REGISTERED_CALL ;
rc . tag = tag ;
rc . cq_bound_to_call = cq_bound_to_call ;
rc . cq_for_notification = cq_for_notification ;
rc . call = call ;
rc . data . registered . registered_method = registered_method ;
rc . data . registered . deadline = deadline ;
rc . data . registered . initial_metadata = initial_metadata ;
rc . data . registered . optional_payload = optional_payload ;
return queue_call_request ( server , & rc ) ;
grpc_cq_begin_op ( cq_for_notification ) ;
rc - > type = REGISTERED_CALL ;
rc - > server = server ;
rc - > tag = tag ;
rc - > cq_bound_to_call = cq_bound_to_call ;
rc - > cq_for_notification = cq_for_notification ;
rc - > call = call ;
rc - > data . registered . registered_method = registered_method ;
rc - > data . registered . deadline = deadline ;
rc - > data . registered . initial_metadata = initial_metadata ;
rc - > data . registered . optional_payload = optional_payload ;
return queue_call_request ( server , rc ) ;
}
static void publish_registered_or_batch ( grpc_call * call , int success ,
@ -1202,8 +1243,20 @@ static void begin_call(grpc_server *server, call_data *calld,
}
GRPC_CALL_INTERNAL_REF ( calld - > call , " server " ) ;
grpc_call_start_ioreq_and_call_back ( calld - > call , req , r - req , publish ,
rc - > tag ) ;
grpc_call_start_ioreq_and_call_back ( calld - > call , req , r - req , publish , rc ) ;
}
static void done_request_event ( void * req , grpc_cq_completion * c ) {
requested_call * rc = req ;
grpc_server * server = rc - > server ;
if ( rc > = server - > requested_calls & &
rc < server - > requested_calls + server - > max_requested_calls ) {
gpr_stack_lockfree_push ( server - > request_freelist ,
rc - server - > requested_calls ) ;
} else {
gpr_free ( req ) ;
}
}
static void fail_call ( grpc_server * server , requested_call * rc ) {
@ -1216,15 +1269,19 @@ static void fail_call(grpc_server *server, requested_call *rc) {
rc - > data . registered . initial_metadata - > count = 0 ;
break ;
}
grpc_cq_end_op ( rc - > cq_for_notification , rc - > tag , NULL , 0 ) ;
grpc_cq_end_op ( rc - > cq_for_notification , rc - > tag , 0 , done_request_event , rc ,
& rc - > completion ) ;
}
static void publish_registered_or_batch ( grpc_call * call , int success ,
void * tag ) {
void * prc ) {
grpc_call_element * elem =
grpc_call_stack_element ( grpc_call_get_call_stack ( call ) , 0 ) ;
requested_call * rc = prc ;
call_data * calld = elem - > call_data ;
grpc_cq_end_op ( calld - > cq_new , tag , call , success ) ;
grpc_cq_end_op ( calld - > cq_new , rc - > tag , success , done_request_event , rc ,
& rc - > completion ) ;
GRPC_CALL_INTERNAL_UNREF ( call , " server " , 0 ) ;
}
const grpc_channel_args * grpc_server_get_channel_args ( grpc_server * server ) {