@ -44,6 +44,7 @@
# include "src/core/surface/call.h"
# include "src/core/surface/channel.h"
# include "src/core/surface/completion_queue.h"
# include "src/core/transport/metadata.h"
# include <grpc/support/alloc.h>
# include <grpc/support/log.h>
# include <grpc/support/useful.h>
@ -63,11 +64,22 @@ typedef struct channel_data channel_data;
struct channel_data {
grpc_server * server ;
grpc_channel * channel ;
grpc_mdstr * path_key ;
grpc_mdstr * authority_key ;
/* linked list of all channels on a server */
channel_data * next ;
channel_data * prev ;
} ;
typedef void ( * new_call_cb ) ( grpc_server * server , grpc_completion_queue * cq , grpc_metadata_array * initial_metadata , call_data * calld , void * user_data ) ;
typedef struct {
void * user_data ;
grpc_completion_queue * cq ;
grpc_metadata_array * initial_metadata ;
new_call_cb cb ;
} requested_call ;
struct grpc_server {
size_t channel_filter_count ;
const grpc_channel_filter * * channel_filters ;
@ -76,9 +88,9 @@ struct grpc_server {
gpr_mu mu ;
void * * tag s;
size_t ntags ;
size_t tag_cap ;
requested_call * requested_call s;
size_t requested_call_cou nt;
size_t requested_call_capacity ;
gpr_uint8 shutdown ;
gpr_uint8 have_shutdown_tag ;
@ -107,11 +119,19 @@ typedef enum {
ZOMBIED
} call_state ;
typedef struct legacy_data {
grpc_metadata_array client_metadata ;
} legacy_data ;
struct call_data {
grpc_call * call ;
call_state state ;
gpr_timespec deadline ;
grpc_mdstr * path ;
grpc_mdstr * host ;
legacy_data * legacy ;
gpr_uint8 included [ CALL_LIST_COUNT ] ;
call_link links [ CALL_LIST_COUNT ] ;
@ -179,7 +199,7 @@ static void server_unref(grpc_server *server) {
grpc_channel_args_destroy ( server - > channel_args ) ;
gpr_mu_destroy ( & server - > mu ) ;
gpr_free ( server - > channel_filters ) ;
gpr_free ( server - > tag s) ;
gpr_free ( server - > requested_call s) ;
gpr_free ( server ) ;
}
}
@ -210,62 +230,37 @@ static void destroy_channel(channel_data *chand) {
grpc_iomgr_add_callback ( finish_destroy_channel , chand ) ;
}
static void queue_new_rpc ( grpc_server * server , call_data * calld , void * tag ) {
grpc_call * call = calld - > call ;
grpc_metadata_buffer * mdbuf = grpc_call_get_metadata_buffer ( call ) ;
size_t count = grpc_metadata_buffer_count ( mdbuf ) ;
grpc_metadata * elements = grpc_metadata_buffer_extract_elements ( mdbuf ) ;
const char * host = NULL ;
const char * method = NULL ;
size_t i ;
for ( i = 0 ; i < count ; i + + ) {
if ( 0 = = strcmp ( elements [ i ] . key , " :authority " ) ) {
host = elements [ i ] . value ;
} else if ( 0 = = strcmp ( elements [ i ] . key , " :path " ) ) {
method = elements [ i ] . value ;
}
}
grpc_call_internal_ref ( call ) ;
grpc_cq_end_new_rpc ( server - > cq , tag , call ,
grpc_metadata_buffer_cleanup_elements , elements , method ,
host , calld - > deadline , count , elements ) ;
}
static void start_new_rpc ( grpc_call_element * elem ) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
grpc_server * server = chand - > server ;
gpr_mu_lock ( & server - > mu ) ;
if ( server - > ntags ) {
if ( server - > requested_call_count > 0 ) {
requested_call rc = server - > requested_calls [ - - server - > requested_call_count ] ;
calld - > state = ACTIVATED ;
queue_new_rpc ( server , calld , server - > tags [ - - server - > ntags ] ) ;
gpr_mu_unlock ( & server - > mu ) ;
rc . cb ( server , rc . cq , rc . initial_metadata , calld , rc . user_data ) ;
} else {
calld - > state = PENDING ;
call_list_join ( server , calld , PENDING_START ) ;
gpr_mu_unlock ( & server - > mu ) ;
}
gpr_mu_unlock ( & server - > mu ) ;
}
static void kill_zombie ( void * elem , int success ) {
grpc_call_destroy ( grpc_call_from_top_element ( elem ) ) ;
}
static void finish_rpc ( grpc_call_element * elem , int is_full_close ) {
static void stream_closed ( grpc_call_element * elem ) {
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
gpr_mu_lock ( & chand - > server - > mu ) ;
switch ( calld - > state ) {
case ACTIVATED :
grpc_call_recv_finish ( elem , is_full_close ) ;
grpc_call_stream_closed ( elem ) ;
break ;
case PENDING :
if ( ! is_full_close ) {
grpc_call_recv_finish ( elem , is_full_close ) ;
break ;
}
call_list_remove ( chand - > server , calld , PENDING_START ) ;
/* fallthrough intended */
case NOT_STARTED :
@ -278,25 +273,56 @@ static void finish_rpc(grpc_call_element *elem, int is_full_close) {
gpr_mu_unlock ( & chand - > server - > mu ) ;
}
static void read_closed ( grpc_call_element * elem ) {
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
gpr_mu_lock ( & chand - > server - > mu ) ;
switch ( calld - > state ) {
case ACTIVATED :
case PENDING :
grpc_call_read_closed ( elem ) ;
break ;
case NOT_STARTED :
calld - > state = ZOMBIED ;
grpc_iomgr_add_callback ( kill_zombie , elem ) ;
break ;
case ZOMBIED :
break ;
}
gpr_mu_unlock ( & chand - > server - > mu ) ;
}
static void call_op ( grpc_call_element * elem , grpc_call_element * from_elemn ,
grpc_call_op * op ) {
channel_data * chand = elem - > channel_data ;
call_data * calld = elem - > call_data ;
grpc_mdelem * md ;
GRPC_CALL_LOG_OP ( GPR_INFO , elem , op ) ;
switch ( op - > type ) {
case GRPC_RECV_METADATA :
grpc_call_recv_metadata ( elem , op ) ;
md = op - > data . metadata ;
if ( md - > key = = chand - > path_key ) {
calld - > path = grpc_mdstr_ref ( md - > value ) ;
grpc_mdelem_unref ( md ) ;
} else if ( md - > key = = chand - > authority_key ) {
calld - > host = grpc_mdstr_ref ( md - > value ) ;
grpc_mdelem_unref ( md ) ;
} else {
grpc_call_recv_metadata ( elem , md ) ;
}
break ;
case GRPC_RECV_END_OF_INITIAL_METADATA :
start_new_rpc ( elem ) ;
break ;
case GRPC_RECV_MESSAGE :
grpc_call_recv_message ( elem , op - > data . message , op - > done_cb ,
op - > user_data ) ;
grpc_call_recv_message ( elem , op - > data . message ) ;
op - > done_cb ( op - > user_data , GRPC_OP_OK ) ;
break ;
case GRPC_RECV_HALF_CLOSE :
finish_rpc ( elem , 0 ) ;
read_closed ( elem ) ;
break ;
case GRPC_RECV_FINISH :
finish_rpc ( elem , 1 ) ;
stream_closed ( elem ) ;
break ;
case GRPC_RECV_DEADLINE :
grpc_call_set_deadline ( elem , op - > data . deadline ) ;
@ -395,6 +421,8 @@ static void init_channel_elem(grpc_channel_element *elem,
GPR_ASSERT ( ! is_last ) ;
chand - > server = NULL ;
chand - > channel = NULL ;
chand - > path_key = grpc_mdstr_from_string ( metadata_context , " :path " ) ;
chand - > authority_key = grpc_mdstr_from_string ( metadata_context , " :authority " ) ;
chand - > next = chand - > prev = chand ;
}
@ -406,6 +434,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
chand - > prev - > next = chand - > next ;
chand - > next = chand - > prev = chand ;
gpr_mu_unlock ( & chand - > server - > mu ) ;
grpc_mdstr_unref ( chand - > path_key ) ;
grpc_mdstr_unref ( chand - > authority_key ) ;
server_unref ( chand - > server ) ;
}
}
@ -415,16 +445,6 @@ static const grpc_channel_filter server_surface_filter = {
init_call_elem , destroy_call_elem , sizeof ( channel_data ) ,
init_channel_elem , destroy_channel_elem , " server " , } ;
static void early_terminate_requested_calls ( grpc_completion_queue * cq ,
void * * tags , size_t ntags ) {
size_t i ;
for ( i = 0 ; i < ntags ; i + + ) {
grpc_cq_end_new_rpc ( cq , tags [ i ] , NULL , do_nothing , NULL , NULL , NULL ,
gpr_inf_past , 0 , NULL ) ;
}
}
grpc_server * grpc_server_create_from_filters ( grpc_completion_queue * cq ,
grpc_channel_filter * * filters ,
size_t filter_count ,
@ -517,8 +537,8 @@ grpc_transport_setup_result grpc_server_setup_transport(
void shutdown_internal ( grpc_server * server , gpr_uint8 have_shutdown_tag ,
void * shutdown_tag ) {
listener * l ;
void * * tag s;
size_t ntags ;
requested_call * requested_call s;
size_t requested_call_cou nt;
channel_data * * channels ;
channel_data * c ;
size_t nchannels ;
@ -547,10 +567,10 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
i + + ;
}
tag s = server - > tag s;
ntags = server - > ntags ;
server - > tag s = NULL ;
server - > ntags = 0 ;
requested_call s = server - > requested_call s;
requested_call_cou nt = server - > requested_call_cou nt;
server - > requested_call s = NULL ;
server - > requested_call_cou nt = 0 ;
server - > shutdown = 1 ;
server - > have_shutdown_tag = have_shutdown_tag ;
@ -579,8 +599,10 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
gpr_free ( channels ) ;
/* terminate all the requested calls */
early_terminate_requested_calls ( server - > cq , tags , ntags ) ;
gpr_free ( tags ) ;
for ( i = 0 ; i < requested_call_count ; i + + ) {
requested_calls [ i ] . cb ( server , requested_calls [ i ] . cq , requested_calls [ i ] . initial_metadata , NULL , requested_calls [ i ] . user_data ) ;
}
gpr_free ( requested_calls ) ;
/* Shutdown listeners */
for ( l = server - > listeners ; l ; l = l - > next ) {
@ -625,35 +647,81 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
server - > listeners = l ;
}
grpc_call_error grpc_server_request_call ( grpc_server * server , void * tag_new ) {
static grpc_call_error queue_call_request ( grpc_server * server , grpc_completion_queue * cq , grpc_metadata_array * initial_metadata , new_call_cb cb , void * user_da ta) {
call_data * calld ;
grpc_cq_begin_op ( server - > cq , NULL , GRPC_SERVER_RPC_NEW ) ;
requested_call * rc ;
gpr_mu_lock ( & server - > mu ) ;
if ( server - > shutdown ) {
gpr_mu_unlock ( & server - > mu ) ;
early_terminate_requested_calls ( server - > cq , & tag_new , 1 ) ;
cb ( server , cq , initial_metadata , NULL , user_data ) ;
return GRPC_CALL_OK ;
}
calld = call_list_remove_head ( server , PENDING_START ) ;
if ( calld ) {
GPR_ASSERT ( calld - > state = = PENDING ) ;
calld - > state = ACTIVATED ;
queue_new_rpc ( server , calld , tag_new ) ;
GPR_ASSERT ( calld - > state = = PENDING ) ;
gpr_mu_unlock ( & server - > mu ) ;
cb ( server , cq , initial_metadata , calld , user_data ) ;
return GRPC_CALL_OK ;
} else {
if ( server - > tag_cap = = server - > ntags ) {
server - > tag_cap = GPR_MAX ( 3 * server - > tag_cap / 2 , server - > tag_cap + 1 ) ;
server - > tags =
gpr_realloc ( server - > tags , sizeof ( void * ) * server - > tag_cap ) ;
if ( server - > requested_call_count = = server - > requested_call_capacity ) {
server - > requested_call_capacity = GPR_MAX ( server - > requested_call_capacity + 8 , server - > requested_call_capacity * 2 ) ;
server - > reques ted_c alls = gpr_realloc ( server - > requested_calls ,
sizeof ( requested_call ) * server - > requested_call_capacity ) ;
}
server - > tags [ server - > ntags + + ] = tag_new ;
rc = & server - > requested_calls [ server - > requested_call_count + + ] ;
rc - > cb = cb ;
rc - > cq = cq ;
rc - > user_data = user_data ;
rc - > initial_metadata = initial_metadata ;
gpr_mu_unlock ( & server - > mu ) ;
return GRPC_CALL_OK ;
}
gpr_mu_unlock ( & server - > mu ) ;
}
static void begin_request ( grpc_server * server , grpc_completion_queue * cq , grpc_metadata_array * initial_metadata , call_data * call_data , void * tag ) {
abort ( ) ;
}
grpc_call_error grpc_server_request_call (
grpc_server * server , grpc_completion_queue * cq , grpc_call_details * details ,
grpc_metadata_array * initial_metadata , void * tag ) {
grpc_cq_begin_op ( cq , NULL , GRPC_IOREQ ) ;
return queue_call_request ( server , cq , initial_metadata , begin_request , tag ) ;
}
static void publish_legacy_request ( grpc_call * call , grpc_op_error status , void * tag ) {
grpc_call_element * elem = grpc_call_stack_element ( grpc_call_get_call_stack ( call ) , 0 ) ;
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
grpc_server * server = chand - > server ;
return GRPC_CALL_OK ;
if ( status = = GRPC_OP_OK ) {
grpc_cq_end_new_rpc ( server - > cq , tag , call ,
do_nothing , NULL , grpc_mdstr_as_c_string ( calld - > path ) ,
grpc_mdstr_as_c_string ( calld - > host ) , calld - > deadline ,
calld - > legacy - > client_metadata . count , calld - > legacy - > client_metadata . metadata ) ;
}
}
static void begin_legacy_request ( grpc_server * server , grpc_completion_queue * cq , grpc_metadata_array * initial_metadata , call_data * calld , void * tag ) {
grpc_ioreq req ;
if ( ! calld ) {
gpr_free ( initial_metadata ) ;
grpc_cq_end_new_rpc ( cq , tag , NULL , do_nothing , NULL , NULL , NULL ,
gpr_inf_past , 0 , NULL ) ;
return ;
}
req . op = GRPC_IOREQ_RECV_INITIAL_METADATA ;
req . data . recv_metadata = initial_metadata ;
grpc_call_start_ioreq_and_call_back ( calld - > call , & req , 1 , publish_legacy_request , tag ) ;
}
grpc_call_error grpc_server_request_call_old ( grpc_server * server , void * tag_new ) {
grpc_metadata_array * client_metadata = gpr_malloc ( sizeof ( grpc_metadata_array ) ) ;
memset ( client_metadata , 0 , sizeof ( * client_metadata ) ) ;
grpc_cq_begin_op ( server - > cq , NULL , GRPC_SERVER_RPC_NEW ) ;
return queue_call_request ( server , server - > cq , client_metadata , begin_legacy_request , tag_new ) ;
}
const grpc_channel_args * grpc_server_get_channel_args ( grpc_server * server ) {