@ -61,13 +61,12 @@ typedef struct server_secure_state {
grpc_server_credentials * creds ;
bool is_shutdown ;
gpr_mu mu ;
gpr_refcount refcount ;
grpc_closure destroy_closure ;
grpc_closure * destroy_callback ;
grpc_closure tcp_server_shutdown_complete ;
grpc_closure * server_destroy_listener_done ;
} server_secure_state ;
typedef struct server_secure_connect {
server_secure_state * state ;
server_secure_state * server_s tate ;
grpc_pollset * accepting_pollset ;
grpc_tcp_server_acceptor * acceptor ;
grpc_handshake_manager * handshake_mgr ;
@ -77,39 +76,28 @@ typedef struct server_secure_connect {
grpc_channel_args * args ;
} server_secure_connect ;
static void state_ref ( server_secure_state * state ) { gpr_ref ( & state - > refcount ) ; }
static void state_unref ( server_secure_state * state ) {
if ( gpr_unref ( & state - > refcount ) ) {
/* ensure all threads have unlocked */
gpr_mu_lock ( & state - > mu ) ;
gpr_mu_unlock ( & state - > mu ) ;
/* clean up */
GRPC_SECURITY_CONNECTOR_UNREF ( & state - > sc - > base , " server " ) ;
grpc_server_credentials_unref ( state - > creds ) ;
gpr_free ( state ) ;
}
}
static void on_secure_handshake_done ( grpc_exec_ctx * exec_ctx , void * statep ,
grpc_security_status status ,
grpc_endpoint * secure_endpoint ,
grpc_auth_context * auth_context ) {
server_secure_connect * state = statep ;
server_secure_connect * connection_state = statep ;
if ( status = = GRPC_SECURITY_OK ) {
if ( secure_endpoint ) {
gpr_mu_lock ( & state - > state - > mu ) ;
if ( ! state - > state - > is_shutdown ) {
gpr_mu_lock ( & connection_state - > server_state - > mu ) ;
if ( ! connection_state - > server_state - > is_shutdown ) {
grpc_transport * transport = grpc_create_chttp2_transport (
exec_ctx , grpc_server_get_channel_args ( state - > state - > server ) ,
exec_ctx , grpc_server_get_channel_args (
connection_state - > server_state - > server ) ,
secure_endpoint , 0 ) ;
grpc_arg args_to_add [ 2 ] ;
args_to_add [ 0 ] = grpc_server_credentials_to_arg ( state - > state - > creds ) ;
args_to_add [ 0 ] = grpc_server_credentials_to_arg (
connection_state - > server_state - > creds ) ;
args_to_add [ 1 ] = grpc_auth_context_to_arg ( auth_context ) ;
grpc_channel_args * args_copy = grpc_channel_args_copy_and_add (
state - > args , args_to_add , GPR_ARRAY_SIZE ( args_to_add ) ) ;
grpc_server_setup_transport ( exec_ctx , state - > state - > server , transport ,
state - > accepting_pollset , args_copy ) ;
connection_state - > args , args_to_add , GPR_ARRAY_SIZE ( args_to_add ) ) ;
grpc_server_setup_transport (
exec_ctx , connection_state - > server_state - > server , transport ,
connection_state - > accepting_pollset , args_copy ) ;
grpc_channel_args_destroy ( args_copy ) ;
grpc_chttp2_transport_start_reading ( exec_ctx , transport , NULL ) ;
} else {
@ -117,21 +105,21 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
* gone away . */
grpc_endpoint_destroy ( exec_ctx , secure_endpoint ) ;
}
gpr_mu_unlock ( & state - > state - > mu ) ;
gpr_mu_unlock ( & connection_ state- > server_ state- > mu ) ;
}
} else {
gpr_log ( GPR_ERROR , " Secure transport failed with error %d " , status ) ;
}
grpc_channel_args_destroy ( state - > args ) ;
state _unref( state - > state ) ;
gpr_free ( state ) ;
grpc_channel_args_destroy ( connection_ state- > args ) ;
grpc_tcp_server _unref( exec_ctx , connection_ state- > server_s tate - > tcp ) ;
gpr_free ( connection_ state) ;
}
static void on_handshake_done ( grpc_exec_ctx * exec_ctx , grpc_endpoint * endpoint ,
grpc_channel_args * args ,
gpr_slice_buffer * read_buffer , void * user_data ,
grpc_error * error ) {
server_secure_connect * state = user_data ;
server_secure_connect * connection_ state = user_data ;
if ( error ! = GRPC_ERROR_NONE ) {
const char * error_str = grpc_error_string ( error ) ;
gpr_log ( GPR_ERROR , " Handshaking failed: %s " , error_str ) ;
@ -139,81 +127,107 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
GRPC_ERROR_UNREF ( error ) ;
grpc_channel_args_destroy ( args ) ;
gpr_free ( read_buffer ) ;
grpc_handshake_manager_shutdown ( exec_ctx , state - > handshake_mgr ) ;
grpc_handshake_manager_destroy ( exec_ctx , state - > handshake_mgr ) ;
state _unref( state - > state ) ;
gpr_free ( state ) ;
grpc_handshake_manager_shutdown ( exec_ctx , connection_ state- > handshake_mgr ) ;
grpc_handshake_manager_destroy ( exec_ctx , connection_ state- > handshake_mgr ) ;
grpc_tcp_server _unref( exec_ctx , connection_ state- > server_s tate - > tcp ) ;
gpr_free ( connection_ state) ;
return ;
}
grpc_handshake_manager_destroy ( exec_ctx , state - > handshake_mgr ) ;
state - > handshake_mgr = NULL ;
grpc_handshake_manager_destroy ( exec_ctx , connection_ state- > handshake_mgr ) ;
connection_ state- > handshake_mgr = NULL ;
// TODO(roth, jboeuf): Convert security connector handshaking to use new
// handshake API, and then move the code from on_secure_handshake_done()
// into this function.
state - > args = args ;
connection_ state- > args = args ;
grpc_server_security_connector_do_handshake (
exec_ctx , state - > state - > sc , state - > acceptor , endpoint , read_buffer ,
state - > deadline , on_secure_handshake_done , state ) ;
exec_ctx , connection_state - > server_state - > sc , connection_state - > acceptor ,
endpoint , read_buffer , connection_state - > deadline ,
on_secure_handshake_done , connection_state ) ;
}
static void on_accept ( grpc_exec_ctx * exec_ctx , void * statep , grpc_endpoint * tcp ,
grpc_pollset * accepting_pollset ,
grpc_tcp_server_acceptor * acceptor ) {
server_secure_connect * state = gpr_malloc ( sizeof ( * state ) ) ;
state - > state = statep ;
state_ref ( state - > state ) ;
state - > accepting_pollset = accepting_pollset ;
state - > acceptor = acceptor ;
state - > handshake_mgr = grpc_handshake_manager_create ( ) ;
server_secure_state * server_state = statep ;
server_secure_connect * connection_state = NULL ;
gpr_mu_lock ( & server_state - > mu ) ;
if ( server_state - > is_shutdown ) {
gpr_mu_unlock ( & server_state - > mu ) ;
grpc_endpoint_destroy ( exec_ctx , tcp ) ;
return ;
}
gpr_mu_unlock ( & server_state - > mu ) ;
grpc_tcp_server_ref ( server_state - > tcp ) ;
connection_state = gpr_malloc ( sizeof ( * connection_state ) ) ;
connection_state - > server_state = server_state ;
connection_state - > accepting_pollset = accepting_pollset ;
connection_state - > acceptor = acceptor ;
connection_state - > handshake_mgr = grpc_handshake_manager_create ( ) ;
// TODO(roth): We should really get this timeout value from channel
// args instead of hard-coding it.
state - > deadline = gpr_time_add ( gpr_now ( GPR_CLOCK_MONOTONIC ) ,
gpr_time_from_seconds ( 120 , GPR_TIMESPAN ) ) ;
connection_ state- > deadline = gpr_time_add (
gpr_now ( GPR_CLOCK_MONOTONIC ) , gpr_time_from_seconds ( 120 , GPR_TIMESPAN ) ) ;
grpc_handshake_manager_do_handshake (
exec_ctx , state - > handshake_mgr , tcp ,
grpc_server_get_channel_args ( state - > state - > server ) , state - > deadline ,
acceptor , on_handshake_done , state ) ;
exec_ctx , connection_state - > handshake_mgr , tcp ,
grpc_server_get_channel_args ( connection_state - > server_state - > server ) ,
connection_state - > deadline , acceptor , on_handshake_done ,
connection_state ) ;
}
/* Server callback: start listening on our ports */
static void start ( grpc_exec_ctx * exec_ctx , grpc_server * server , void * statep ,
grpc_pollset * * pollsets , size_t pollset_count ) {
server_secure_state * state = statep ;
grpc_tcp_server_start ( exec_ctx , state - > tcp , pollsets , pollset_count ,
on_accept , state ) ;
static void server_start_listener ( grpc_exec_ctx * exec_ctx , grpc_server * server ,
void * statep , grpc_pollset * * pollsets ,
size_t pollset_count ) {
server_secure_state * server_state = statep ;
gpr_mu_lock ( & server_state - > mu ) ;
server_state - > is_shutdown = false ;
gpr_mu_unlock ( & server_state - > mu ) ;
grpc_tcp_server_start ( exec_ctx , server_state - > tcp , pollsets , pollset_count ,
on_accept , server_state ) ;
}
static void destroy_done ( grpc_exec_ctx * exec_ctx , void * statep ,
static void tcp_server_shutdown_complet e( grpc_exec_ctx * exec_ctx , void * statep ,
grpc_error * error ) {
server_secure_state * state = statep ;
if ( state - > destroy_callback ! = NULL ) {
state - > destroy_callback - > cb ( exec_ctx , state - > destroy_callback - > cb_arg ,
GRPC_ERROR_REF ( error ) ) ;
server_secure_state * server_state = statep ;
/* ensure all threads have unlocked */
gpr_mu_lock ( & server_state - > mu ) ;
grpc_closure * destroy_done = server_state - > server_destroy_listener_done ;
GPR_ASSERT ( server_state - > is_shutdown ) ;
gpr_mu_unlock ( & server_state - > mu ) ;
/* clean up */
grpc_server_security_connector_shutdown ( exec_ctx , server_state - > sc ) ;
/* Flush queued work before a synchronous unref. */
grpc_exec_ctx_flush ( exec_ctx ) ;
GRPC_SECURITY_CONNECTOR_UNREF ( & server_state - > sc - > base , " server " ) ;
grpc_server_credentials_unref ( server_state - > creds ) ;
if ( destroy_done ! = NULL ) {
destroy_done - > cb ( exec_ctx , destroy_done - > cb_arg , GRPC_ERROR_REF ( error ) ) ;
grpc_exec_ctx_flush ( exec_ctx ) ;
}
grpc_server_security_connector_shutdown ( exec_ctx , state - > sc ) ;
state_unref ( state ) ;
gpr_free ( server_state ) ;
}
/* Server callback: destroy the tcp listener (so we don't generate further
callbacks ) */
static void destroy ( grpc_exec_ctx * exec_ctx , grpc_server * server , void * statep ,
static void server_destroy_listener ( grpc_exec_ctx * exec_ctx ,
grpc_server * server , void * statep ,
grpc_closure * callback ) {
server_secure_state * state = statep ;
server_secure_state * server_s tate = statep ;
grpc_tcp_server * tcp ;
gpr_mu_lock ( & state - > mu ) ;
state - > is_shutdown = true ;
state - > destroy_callback = callback ;
tcp = state - > tcp ;
gpr_mu_unlock ( & state - > mu ) ;
gpr_mu_lock ( & server_s tate - > mu ) ;
server_s tate - > is_shutdown = true ;
server_s tate - > server_destroy_listener_done = callback ;
tcp = server_s tate - > tcp ;
gpr_mu_unlock ( & server_s tate - > mu ) ;
grpc_tcp_server_shutdown_listeners ( exec_ctx , tcp ) ;
grpc_tcp_server_unref ( exec_ctx , tcp ) ;
grpc_tcp_server_unref ( exec_ctx , server_state - > tcp ) ;
}
int grpc_server_add_secure_http2_port ( grpc_server * server , const char * addr ,
grpc_server_credentials * creds ) {
grpc_resolved_addresses * resolved = NULL ;
grpc_tcp_server * tcp = NULL ;
server_secure_state * state = NULL ;
server_secure_state * server_s tate = NULL ;
size_t i ;
size_t count = 0 ;
int port_num = - 1 ;
@ -253,22 +267,22 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
if ( err ! = GRPC_ERROR_NONE ) {
goto error ;
}
state = gpr_malloc ( sizeof ( * state ) ) ;
memset ( state , 0 , sizeof ( * state ) ) ;
grpc_closure_init ( & state - > destroy_closure , destroy_done , state ) ;
err = grpc_tcp_server_create ( & state - > destroy_closure ,
server_state = gpr_malloc ( sizeof ( * server_state ) ) ;
memset ( server_state , 0 , sizeof ( * server_state ) ) ;
grpc_closure_init ( & server_state - > tcp_server_shutdown_complete ,
tcp_server_shutdown_complete , server_state ) ;
err = grpc_tcp_server_create ( & server_state - > tcp_server_shutdown_complete ,
grpc_server_get_channel_args ( server ) , & tcp ) ;
if ( err ! = GRPC_ERROR_NONE ) {
goto error ;
}
state - > server = server ;
state - > tcp = tcp ;
state - > sc = sc ;
state - > creds = grpc_server_credentials_ref ( creds ) ;
state - > is_shutdown = false ;
gpr_mu_init ( & state - > mu ) ;
gpr_ref_init ( & state - > refcount , 1 ) ;
server_state - > server = server ;
server_state - > tcp = tcp ;
server_state - > sc = sc ;
server_state - > creds = grpc_server_credentials_ref ( creds ) ;
server_state - > is_shutdown = true ;
gpr_mu_init ( & server_state - > mu ) ;
errors = gpr_malloc ( sizeof ( * errors ) * resolved - > naddrs ) ;
for ( i = 0 ; i < resolved - > naddrs ; i + + ) {
@ -311,7 +325,8 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
grpc_resolved_addresses_destroy ( resolved ) ;
/* Register with the server only upon success */
grpc_server_add_listener ( & exec_ctx , server , state , start , destroy ) ;
grpc_server_add_listener ( & exec_ctx , server , server_state ,
server_start_listener , server_destroy_listener ) ;
grpc_exec_ctx_finish ( & exec_ctx ) ;
return port_num ;
@ -332,10 +347,11 @@ error:
grpc_tcp_server_unref ( & exec_ctx , tcp ) ;
} else {
if ( sc ) {
grpc_exec_ctx_flush ( & exec_ctx ) ;
GRPC_SECURITY_CONNECTOR_UNREF ( & sc - > base , " server " ) ;
}
if ( state ) {
gpr_free ( state ) ;
if ( server_s tate ) {
gpr_free ( server_s tate ) ;
}
}
grpc_exec_ctx_finish ( & exec_ctx ) ;