@ -36,22 +36,22 @@
# include <stdlib.h>
# include <string.h>
# include <grpc/support/alloc.h>
# include <grpc/support/log.h>
# include <grpc/support/string_util.h>
# include <grpc/support/useful.h>
# include "src/core/channel/census_filter.h"
# include "src/core/channel/channel_args.h"
# include "src/core/channel/connected_channel.h"
# include "src/core/iomgr/iomgr.h"
# include "src/core/support/stack_lockfree.h"
# include "src/core/support/string.h"
# include "src/core/surface/call.h"
# include "src/core/surface/channel.h"
# include "src/core/surface/completion_queue.h"
# include "src/core/surface/init.h"
# include "src/core/transport/metadata.h"
# include <grpc/support/alloc.h>
# include <grpc/support/log.h>
# include <grpc/support/string_util.h>
# include <grpc/support/useful.h>
typedef enum { PENDING_START , CALL_LIST_COUNT } call_list ;
typedef struct listener {
void * arg ;
@ -74,8 +74,8 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
typedef struct requested_call {
requested_call_type type ;
struct requested_call * next ;
void * tag ;
grpc_server * server ;
grpc_completion_queue * cq_bound_to_call ;
grpc_completion_queue * cq_for_notification ;
grpc_call * * call ;
@ -94,14 +94,6 @@ typedef struct requested_call {
} data ;
} requested_call ;
struct registered_method {
char * method ;
char * host ;
call_data * pending ;
requested_call * requests ;
registered_method * next ;
} ;
typedef struct channel_registered_method {
registered_method * server_registered_method ;
grpc_mdstr * method ;
@ -130,44 +122,6 @@ typedef struct shutdown_tag {
grpc_cq_completion completion ;
} shutdown_tag ;
struct grpc_server {
size_t channel_filter_count ;
const grpc_channel_filter * * channel_filters ;
grpc_channel_args * channel_args ;
grpc_completion_queue * * cqs ;
grpc_pollset * * pollsets ;
size_t cq_count ;
/* The two following mutexes control access to server-state
mu_global controls access to non - call - related state ( e . g . , channel state )
mu_call controls access to call - related state ( e . g . , the call lists )
If they are ever required to be nested , you must lock mu_global
before mu_call . This is currently used in shutdown processing
( grpc_server_shutdown_and_notify and maybe_finish_shutdown ) */
gpr_mu mu_global ; /* mutex for server and channel state */
gpr_mu mu_call ; /* mutex for call-specific state */
registered_method * registered_methods ;
requested_call * requests ;
gpr_uint8 shutdown ;
gpr_uint8 shutdown_published ;
size_t num_shutdown_tags ;
shutdown_tag * shutdown_tags ;
call_data * lists [ CALL_LIST_COUNT ] ;
channel_data root_channel_data ;
listener * listeners ;
int listeners_destroyed ;
gpr_refcount internal_refcount ;
/** when did we print the last shutdown progress message */
gpr_timespec last_shutdown_message_time ;
} ;
typedef enum {
/* waiting for metadata */
NOT_STARTED ,
@ -179,6 +133,8 @@ typedef enum {
ZOMBIED
} call_state ;
typedef struct request_matcher request_matcher ;
struct call_data {
grpc_call * call ;
@ -201,8 +157,20 @@ struct call_data {
grpc_iomgr_closure server_on_recv ;
grpc_iomgr_closure kill_zombie_closure ;
call_data * * root [ CALL_LIST_COUNT ] ;
call_link links [ CALL_LIST_COUNT ] ;
call_data * pending_next ;
} ;
struct request_matcher {
call_data * pending_head ;
call_data * pending_tail ;
gpr_stack_lockfree * requests ;
} ;
struct registered_method {
char * method ;
char * host ;
request_matcher request_matcher ;
registered_method * next ;
} ;
typedef struct {
@ -210,6 +178,48 @@ typedef struct {
size_t num_channels ;
} channel_broadcaster ;
struct grpc_server {
size_t channel_filter_count ;
const grpc_channel_filter * * channel_filters ;
grpc_channel_args * channel_args ;
grpc_completion_queue * * cqs ;
grpc_pollset * * pollsets ;
size_t cq_count ;
/* The two following mutexes control access to server-state
mu_global controls access to non - call - related state ( e . g . , channel state )
mu_call controls access to call - related state ( e . g . , the call lists )
If they are ever required to be nested , you must lock mu_global
before mu_call . This is currently used in shutdown processing
( grpc_server_shutdown_and_notify and maybe_finish_shutdown ) */
gpr_mu mu_global ; /* mutex for server and channel state */
gpr_mu mu_call ; /* mutex for call-specific state */
registered_method * registered_methods ;
request_matcher unregistered_request_matcher ;
/** free list of available requested_calls indices */
gpr_stack_lockfree * request_freelist ;
/** requested call backing data */
requested_call * requested_calls ;
int max_requested_calls ;
gpr_atm shutdown_flag ;
gpr_uint8 shutdown_published ;
size_t num_shutdown_tags ;
shutdown_tag * shutdown_tags ;
channel_data root_channel_data ;
listener * listeners ;
int listeners_destroyed ;
gpr_refcount internal_refcount ;
/** when did we print the last shutdown progress message */
gpr_timespec last_shutdown_message_time ;
} ;
# define SERVER_FROM_CALL_ELEM(elem) \
( ( ( channel_data * ) ( elem ) - > channel_data ) - > server )
@ -220,7 +230,9 @@ static void fail_call(grpc_server *server, requested_call *rc);
hold mu_call */
static void maybe_finish_shutdown ( grpc_server * server ) ;
/* channel broadcaster */
/*
* channel broadcaster
*/
/* assumes server locked */
static void channel_broadcaster_init ( grpc_server * s , channel_broadcaster * cb ) {
@ -281,55 +293,44 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb,
gpr_free ( cb - > channels ) ;
}
/* call list */
/*
* request_matcher
*/
static int call_list_join ( call_data * * root , call_data * call , call_list list ) {
GPR_ASSERT ( ! call - > root [ list ] ) ;
call - > root [ list ] = root ;
if ( ! * root ) {
* root = call ;
call - > links [ list ] . next = call - > links [ list ] . prev = call ;
} else {
call - > links [ list ] . next = * root ;
call - > links [ list ] . prev = ( * root ) - > links [ list ] . prev ;
call - > links [ list ] . next - > links [ list ] . prev =
call - > links [ list ] . prev - > links [ list ] . next = call ;
}
return 1 ;
static void request_matcher_init ( request_matcher * request_matcher ,
int entries ) {
memset ( request_matcher , 0 , sizeof ( * request_matcher ) ) ;
request_matcher - > requests = gpr_stack_lockfree_create ( entries ) ;
}
static call_data * call_list_remove_head ( call_data * * root , call_list list ) {
call_data * out = * root ;
if ( out ) {
out - > root [ list ] = NULL ;
if ( out - > links [ list ] . next = = out ) {
* root = NULL ;
} else {
* root = out - > links [ list ] . next ;
out - > links [ list ] . next - > links [ list ] . prev = out - > links [ list ] . prev ;
out - > links [ list ] . prev - > links [ list ] . next = out - > links [ list ] . next ;
}
}
return out ;
static void request_matcher_destroy ( request_matcher * request_matcher ) {
GPR_ASSERT ( gpr_stack_lockfree_pop ( request_matcher - > requests ) = = - 1 ) ;
gpr_stack_lockfree_destroy ( request_matcher - > requests ) ;
}
static int call_list_remove ( call_data * call , call_list list ) {
call_data * * root = call - > root [ list ] ;
if ( root = = NULL ) return 0 ;
call - > root [ list ] = NULL ;
if ( * root = = call ) {
* root = call - > links [ list ] . next ;
if ( * root = = call ) {
* root = NULL ;
return 1 ;
}
}
GPR_ASSERT ( * root ! = call ) ;
call - > links [ list ] . next - > links [ list ] . prev = call - > links [ list ] . prev ;
call - > links [ list ] . prev - > links [ list ] . next = call - > links [ list ] . next ;
return 1 ;
static void kill_zombie ( void * elem , int success ) {
grpc_call_destroy ( grpc_call_from_top_element ( elem ) ) ;
}
static void request_matcher_zombify_all_pending_calls (
request_matcher * request_matcher ) {
while ( request_matcher - > pending_head ) {
call_data * calld = request_matcher - > pending_head ;
request_matcher - > pending_head = calld - > pending_next ;
gpr_mu_lock ( & calld - > mu_state ) ;
calld - > state = ZOMBIED ;
gpr_mu_unlock ( & calld - > mu_state ) ;
grpc_iomgr_closure_init (
& calld - > kill_zombie_closure , kill_zombie ,
grpc_call_stack_element ( grpc_call_get_call_stack ( calld - > call ) , 0 ) ) ;
grpc_iomgr_add_callback ( & calld - > kill_zombie_closure ) ;
}
}
/*
* server proper
*/
static void server_ref ( grpc_server * server ) {
gpr_ref ( & server - > internal_refcount ) ;
}
@ -343,6 +344,7 @@ static void server_delete(grpc_server *server) {
gpr_free ( server - > channel_filters ) ;
while ( ( rm = server - > registered_methods ) ! = NULL ) {
server - > registered_methods = rm - > next ;
request_matcher_destroy ( & rm - > request_matcher ) ;
gpr_free ( rm - > method ) ;
gpr_free ( rm - > host ) ;
gpr_free ( rm ) ;
@ -350,9 +352,12 @@ static void server_delete(grpc_server *server) {
for ( i = 0 ; i < server - > cq_count ; i + + ) {
GRPC_CQ_INTERNAL_UNREF ( server - > cqs [ i ] , " server " ) ;
}
request_matcher_destroy ( & server - > unregistered_request_matcher ) ;
gpr_stack_lockfree_destroy ( server - > request_freelist ) ;
gpr_free ( server - > cqs ) ;
gpr_free ( server - > pollsets ) ;
gpr_free ( server - > shutdown_tags ) ;
gpr_free ( server - > requested_calls ) ;
gpr_free ( server ) ;
}
@ -391,25 +396,29 @@ static void destroy_channel(channel_data *chand) {
}
static void finish_start_new_rpc ( grpc_server * server , grpc_call_element * elem ,
call_data * * pending_root ,
requested_call * * requests ) {
requested_call * rc ;
request_matcher * request_matcher ) {
call_data * calld = elem - > call_data ;
int request_id ;
request_id = gpr_stack_lockfree_pop ( request_matcher - > requests ) ;
if ( request_id = = - 1 ) {
gpr_mu_lock ( & server - > mu_call ) ;
rc = * requests ;
if ( rc = = NULL ) {
gpr_mu_lock ( & calld - > mu_state ) ;
calld - > state = PENDING ;
gpr_mu_unlock ( & calld - > mu_state ) ;
call_list_join ( pending_root , calld , PENDING_START ) ;
if ( request_matcher - > pending_head = = NULL ) {
request_matcher - > pending_tail = request_matcher - > pending_head = calld ;
} else {
request_matcher - > pending_tail - > pending_next = calld ;
request_matcher - > pending_tail = calld ;
}
calld - > pending_next = NULL ;
gpr_mu_unlock ( & server - > mu_call ) ;
} else {
* requests = rc - > next ;
gpr_mu_lock ( & calld - > mu_state ) ;
calld - > state = ACTIVATED ;
gpr_mu_unlock ( & calld - > mu_state ) ;
gpr_mu_unlock ( & server - > mu_call ) ;
begin_call ( server , calld , rc ) ;
begin_call ( server , calld , & server - > requested_calls [ request_id ] ) ;
}
}
@ -431,8 +440,8 @@ static void start_new_rpc(grpc_call_element *elem) {
if ( ! rm ) break ;
if ( rm - > host ! = calld - > host ) continue ;
if ( rm - > method ! = calld - > path ) continue ;
finish_start_new_rpc ( server , elem , & rm - > server_registered_method - > pending ,
& rm - > server_registered_method - > requests ) ;
finish_start_new_rpc ( server , elem ,
& rm - > server_registered_method - > request_matcher ) ;
return ;
}
/* check for a wildcard method definition (no host set) */
@ -443,17 +452,12 @@ static void start_new_rpc(grpc_call_element *elem) {
if ( ! rm ) break ;
if ( rm - > host ! = NULL ) continue ;
if ( rm - > method ! = calld - > path ) continue ;
finish_start_new_rpc ( server , elem , & rm - > server_registered_method - > pending ,
& rm - > server_registered_method - > requests ) ;
finish_start_new_rpc ( server , elem ,
& rm - > server_registered_method - > request_matcher ) ;
return ;
}
}
finish_start_new_rpc ( server , elem , & server - > lists [ PENDING_START ] ,
& server - > requests ) ;
}
static void kill_zombie ( void * elem , int success ) {
grpc_call_destroy ( grpc_call_from_top_element ( elem ) ) ;
finish_start_new_rpc ( server , elem , & server - > unregistered_request_matcher ) ;
}
static int num_listeners ( grpc_server * server ) {
@ -481,15 +485,15 @@ static int num_channels(grpc_server *server) {
static void maybe_finish_shutdown ( grpc_server * server ) {
size_t i ;
if ( ! server - > shutdown | | server - > shutdown_published ) {
if ( ! gpr_atm_acq_load ( & server - > shutdown_flag ) | | server - > shutdown_published ) {
return ;
}
if ( server - > root_channel_data . next ! = & server - > root_channel_data | |
server - > listeners_destroyed < num_listeners ( server ) ) {
if ( gpr_time_cmp (
gpr_time_sub ( gpr_now ( GPR_CLOCK_REALTIME ) , server - > last_shutdown_message_time ) ,
gpr_time_from_seconds ( 1 ) ) > = 0 ) {
if ( gpr_time_cmp ( gpr_time_sub ( gpr_now ( GPR_CLOCK_REALTIME ) ,
server - > last_shutdown_message_time ) ,
gpr_time_from_seconds ( 1 , GPR_TIMESPAN ) ) > = 0 ) {
server - > last_shutdown_message_time = gpr_now ( GPR_CLOCK_REALTIME ) ;
gpr_log ( GPR_DEBUG ,
" Waiting for %d channels and %d/%d listeners to be destroyed "
@ -526,7 +530,6 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
static void server_on_recv ( void * ptr , int success ) {
grpc_call_element * elem = ptr ;
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
if ( success & & ! calld - > got_initial_metadata ) {
size_t i ;
@ -536,7 +539,8 @@ static void server_on_recv(void *ptr, int success) {
grpc_stream_op * op = & ops [ i ] ;
if ( op - > type ! = GRPC_OP_METADATA ) continue ;
grpc_metadata_batch_filter ( & op - > data . metadata , server_filter , elem ) ;
if ( 0 ! = gpr_time_cmp ( op - > data . metadata . deadline , gpr_inf_future ) ) {
if ( 0 ! = gpr_time_cmp ( op - > data . metadata . deadline ,
gpr_inf_future ( GPR_CLOCK_REALTIME ) ) ) {
calld - > deadline = op - > data . metadata . deadline ;
}
calld - > got_initial_metadata = 1 ;
@ -571,11 +575,8 @@ static void server_on_recv(void *ptr, int success) {
} else if ( calld - > state = = PENDING ) {
calld - > state = ZOMBIED ;
gpr_mu_unlock ( & calld - > mu_state ) ;
gpr_mu_lock ( & chand - > server - > mu_call ) ;
call_list_remove ( calld , PENDING_START ) ;
gpr_mu_unlock ( & chand - > server - > mu_call ) ;
grpc_iomgr_closure_init ( & calld - > kill_zombie_closure , kill_zombie , elem ) ;
grpc_iomgr_add_callback ( & calld - > kill_zombie_closure ) ;
/* zombied call will be destroyed when it's removed from the pending
queue . . . later */
} else {
gpr_mu_unlock ( & calld - > mu_state ) ;
}
@ -610,7 +611,7 @@ static void accept_stream(void *cd, grpc_transport *transport,
channel_data * chand = cd ;
/* create a call */
grpc_call_create ( chand - > channel , NULL , transport_server_data , NULL , 0 ,
gpr_inf_future ) ;
gpr_inf_future ( GPR_CLOCK_REALTIME ) ) ;
}
static void channel_connectivity_changed ( void * cd , int iomgr_status_ignored ) {
@ -638,7 +639,7 @@ static void init_call_elem(grpc_call_element *elem,
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
memset ( calld , 0 , sizeof ( call_data ) ) ;
calld - > deadline = gpr_inf_future ;
calld - > deadline = gpr_inf_future ( GPR_CLOCK_REALTIME ) ;
calld - > call = grpc_call_from_top_element ( elem ) ;
gpr_mu_init ( & calld - > mu_state ) ;
@ -653,11 +654,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
if ( calld - > state = = PENDING ) {
gpr_mu_lock ( & chand - > server - > mu_call ) ;
call_list_remove ( elem - > call_data , PENDING_START ) ;
gpr_mu_unlock ( & chand - > server - > mu_call ) ;
}
GPR_ASSERT ( calld - > state ! = PENDING ) ;
if ( calld - > host ) {
GRPC_MDSTR_UNREF ( calld - > host ) ;
@ -764,6 +761,18 @@ grpc_server *grpc_server_create_from_filters(
server - > root_channel_data . next = server - > root_channel_data . prev =
& server - > root_channel_data ;
/* TODO(ctiller): expose a channel_arg for this */
server - > max_requested_calls = 32768 ;
server - > request_freelist =
gpr_stack_lockfree_create ( server - > max_requested_calls ) ;
for ( i = 0 ; i < ( size_t ) server - > max_requested_calls ; i + + ) {
gpr_stack_lockfree_push ( server - > request_freelist , i ) ;
}
request_matcher_init ( & server - > unregistered_request_matcher ,
server - > max_requested_calls ) ;
server - > requested_calls = gpr_malloc ( server - > max_requested_calls *
sizeof ( * server - > requested_calls ) ) ;
/* Server filter stack is:
server_surface_filter - for making surface API calls
@ -811,6 +820,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
}
m = gpr_malloc ( sizeof ( registered_method ) ) ;
memset ( m , 0 , sizeof ( * m ) ) ;
request_matcher_init ( & m - > request_matcher , server - > max_requested_calls ) ;
m - > method = gpr_strdup ( method ) ;
m - > host = gpr_strdup ( host ) ;
m - > next = server - > registered_methods ;
@ -926,13 +936,49 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_transport_perform_op ( transport , & op ) ;
}
typedef struct {
requested_call * * requests ;
size_t count ;
size_t capacity ;
} request_killer ;
static void request_killer_init ( request_killer * rk ) {
memset ( rk , 0 , sizeof ( * rk ) ) ;
}
static void request_killer_add ( request_killer * rk , requested_call * rc ) {
if ( rk - > capacity = = rk - > count ) {
rk - > capacity = GPR_MAX ( 8 , rk - > capacity * 2 ) ;
rk - > requests =
gpr_realloc ( rk - > requests , rk - > capacity * sizeof ( * rk - > requests ) ) ;
}
rk - > requests [ rk - > count + + ] = rc ;
}
static void request_killer_add_request_matcher ( request_killer * rk ,
grpc_server * server ,
request_matcher * rm ) {
int request_id ;
while ( ( request_id = gpr_stack_lockfree_pop ( rm - > requests ) ) ! = - 1 ) {
request_killer_add ( rk , & server - > requested_calls [ request_id ] ) ;
}
}
static void request_killer_run ( request_killer * rk , grpc_server * server ) {
size_t i ;
for ( i = 0 ; i < rk - > count ; i + + ) {
fail_call ( server , rk - > requests [ i ] ) ;
}
gpr_free ( rk - > requests ) ;
}
void grpc_server_shutdown_and_notify ( grpc_server * server ,
grpc_completion_queue * cq , void * tag ) {
listener * l ;
requested_call * requests = NULL ;
registered_method * rm ;
shutdown_tag * sdt ;
channel_broadcaster broadcaster ;
request_killer reqkill ;
/* lock, and gather up some stuff to do */
gpr_mu_lock ( & server - > mu_global ) ;
@ -943,7 +989,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
sdt = & server - > shutdown_tags [ server - > num_shutdown_tags + + ] ;
sdt - > tag = tag ;
sdt - > cq = cq ;
if ( server - > shutdown ) {
if ( gpr_atm_acq_load ( & server - > shutdown_flag ) ) {
gpr_mu_unlock ( & server - > mu_global ) ;
return ;
}
@ -951,31 +997,26 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
server - > last_shutdown_message_time = gpr_now ( GPR_CLOCK_REALTIME ) ;
channel_broadcaster_init ( server , & broadcaster ) ;
request_killer_init ( & reqkill ) ;
/* collect all unregistered then registered calls */
gpr_mu_lock ( & server - > mu_call ) ;
requests = server - > requests ;
server - > requests = NULL ;
request_killer_add_request_matcher ( & reqkill , server ,
& server - > unregistered_request_matcher ) ;
request_matcher_zombify_all_pending_calls (
& server - > unregistered_request_matcher ) ;
for ( rm = server - > registered_methods ; rm ; rm = rm - > next ) {
while ( rm - > requests ! = NULL ) {
requested_call * c = rm - > requests ;
rm - > requests = c - > next ;
c - > next = requests ;
requests = c ;
}
request_killer_add_request_matcher ( & reqkill , server , & rm - > request_matcher ) ;
request_matcher_zombify_all_pending_calls ( & rm - > request_matcher ) ;
}
gpr_mu_unlock ( & server - > mu_call ) ;
server - > shutdown = 1 ;
gpr_atm_rel_store ( & server - > shutdown_flag , 1 ) ;
maybe_finish_shutdown ( server ) ;
gpr_mu_unlock ( & server - > mu_global ) ;
/* terminate all the requested calls */
while ( requests ! = NULL ) {
requested_call * next = requests - > next ;
fail_call ( server , requests ) ;
requests = next ;
}
request_killer_run ( & reqkill , server ) ;
/* Shutdown listeners */
for ( l = server - > listeners ; l ; l = l - > next ) {
@ -1007,7 +1048,7 @@ void grpc_server_destroy(grpc_server *server) {
listener * l ;
gpr_mu_lock ( & server - > mu_global ) ;
GPR_ASSERT ( server - > shutdown | | ! server - > listeners ) ;
GPR_ASSERT ( gpr_atm_acq_load ( & server - > shutdown_flag ) | | ! server - > listeners ) ;
GPR_ASSERT ( server - > listeners_destroyed = = num_listeners ( server ) ) ;
while ( server - > listeners ) {
@ -1037,39 +1078,55 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
static grpc_call_error queue_call_request ( grpc_server * server ,
requested_call * rc ) {
call_data * calld = NULL ;
requested_call * * requests = NULL ;
gpr_mu_lock ( & server - > mu_call ) ;
if ( server - > shutdown ) {
gpr_mu_unlock ( & server - > mu_call ) ;
request_matcher * request_matcher = NULL ;
int request_id ;
if ( gpr_atm_acq_load ( & server - > shutdown_flag ) ) {
fail_call ( server , rc ) ;
return GRPC_CALL_OK ;
}
request_id = gpr_stack_lockfree_pop ( server - > request_freelist ) ;
if ( request_id = = - 1 ) {
/* out of request ids: just fail this one */
fail_call ( server , rc ) ;
return GRPC_CALL_OK ;
}
switch ( rc - > type ) {
case BATCH_CALL :
calld =
call_list_remove_head ( & server - > lists [ PENDING_START ] , PENDING_START ) ;
requests = & server - > requests ;
request_matcher = & server - > unregistered_request_matcher ;
break ;
case REGISTERED_CALL :
calld = call_list_remove_head (
& rc - > data . registered . registered_method - > pending , PENDING_START ) ;
requests = & rc - > data . registered . registered_method - > requests ;
request_matcher = & rc - > data . registered . registered_method - > request_matcher ;
break ;
}
if ( calld ! = NULL ) {
server - > requested_calls [ request_id ] = * rc ;
gpr_free ( rc ) ;
if ( gpr_stack_lockfree_push ( request_matcher - > requests , request_id ) ) {
/* this was the first queued request: we need to lock and start
matching calls */
gpr_mu_lock ( & server - > mu_call ) ;
while ( ( calld = request_matcher - > pending_head ) ! = NULL ) {
request_id = gpr_stack_lockfree_pop ( request_matcher - > requests ) ;
if ( request_id = = - 1 ) break ;
request_matcher - > pending_head = calld - > pending_next ;
gpr_mu_unlock ( & server - > mu_call ) ;
gpr_mu_lock ( & calld - > mu_state ) ;
if ( calld - > state = = ZOMBIED ) {
gpr_mu_unlock ( & calld - > mu_state ) ;
grpc_iomgr_closure_init (
& calld - > kill_zombie_closure , kill_zombie ,
grpc_call_stack_element ( grpc_call_get_call_stack ( calld - > call ) , 0 ) ) ;
grpc_iomgr_add_callback ( & calld - > kill_zombie_closure ) ;
} else {
GPR_ASSERT ( calld - > state = = PENDING ) ;
calld - > state = ACTIVATED ;
gpr_mu_unlock ( & calld - > mu_state ) ;
begin_call ( server , calld , rc ) ;
return GRPC_CALL_OK ;
} else {
rc - > next = * requests ;
* requests = rc ;
begin_call ( server , calld , & server - > requested_calls [ request_id ] ) ;
}
gpr_mu_lock ( & server - > mu_call ) ;
}
gpr_mu_unlock ( & server - > mu_call ) ;
return GRPC_CALL_OK ;
}
return GRPC_CALL_OK ;
}
grpc_call_error grpc_server_request_call (
@ -1087,6 +1144,7 @@ grpc_call_error grpc_server_request_call(
}
grpc_cq_begin_op ( cq_for_notification ) ;
rc - > type = BATCH_CALL ;
rc - > server = server ;
rc - > tag = tag ;
rc - > cq_bound_to_call = cq_bound_to_call ;
rc - > cq_for_notification = cq_for_notification ;
@ -1109,6 +1167,7 @@ grpc_call_error grpc_server_request_registered_call(
}
grpc_cq_begin_op ( cq_for_notification ) ;
rc - > type = REGISTERED_CALL ;
rc - > server = server ;
rc - > tag = tag ;
rc - > cq_bound_to_call = cq_bound_to_call ;
rc - > cq_for_notification = cq_for_notification ;
@ -1188,7 +1247,16 @@ static void begin_call(grpc_server *server, call_data *calld,
}
static void done_request_event ( void * req , grpc_cq_completion * c ) {
requested_call * rc = req ;
grpc_server * server = rc - > server ;
if ( rc > = server - > requested_calls & &
rc < server - > requested_calls + server - > max_requested_calls ) {
gpr_stack_lockfree_push ( server - > request_freelist ,
rc - server - > requested_calls ) ;
} else {
gpr_free ( req ) ;
}
}
static void fail_call ( grpc_server * server , requested_call * rc ) {