@ -102,7 +102,7 @@ struct grpc_tcp_server {
/* Public function. Allocates the proper data structures to hold a
grpc_tcp_server . */
grpc_tcp_serve r * grpc_tcp_server_create ( grpc_closure * shutdown_complete ) {
grpc_erro r * grpc_tcp_server_create ( grpc_closure * shutdown_complete , grpc_tcp_server * * server ) {
grpc_tcp_server * s = gpr_malloc ( sizeof ( grpc_tcp_server ) ) ;
gpr_ref_init ( & s - > refs , 1 ) ;
gpr_mu_init ( & s - > mu ) ;
@ -114,12 +114,13 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
s - > shutdown_starting . head = NULL ;
s - > shutdown_starting . tail = NULL ;
s - > shutdown_complete = shutdown_complete ;
return s ;
* server = s ;
return GRPC_ERROR_NONE ;
}
static void finish_shutdown ( grpc_exec_ctx * exec_ctx , grpc_tcp_server * s ) {
if ( s - > shutdown_complete ! = NULL ) {
grpc_exec_ctx_enqueue ( exec_ctx , s - > shutdown_complete , true , NULL ) ;
grpc_exec_ctx_push ( exec_ctx , s - > shutdown_complete , GRPC_ERROR_NONE , NULL ) ;
}
/* Now that the accepts have been aborted, we can destroy the sockets.
@ -143,7 +144,7 @@ grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
void grpc_tcp_server_shutdown_starting_add ( grpc_tcp_server * s ,
grpc_closure * shutdown_starting ) {
gpr_mu_lock ( & s - > mu ) ;
grpc_closure_list_ad d ( & s - > shutdown_starting , shutdown_starting , 1 ) ;
grpc_closure_list_appen d ( & s - > shutdown_starting , shutdown_starting , GRPC_ERROR_NONE ) ;
gpr_mu_unlock ( & s - > mu ) ;
}
@ -187,51 +188,45 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
/* Prepare (bind) a recently-created socket for listening. */
static int prepare_socket ( SOCKET sock , const struct sockaddr * addr ,
size_t addr_len ) {
static grpc_error * prepare_socket ( SOCKET sock , const struct sockaddr * addr ,
size_t addr_len , int * port ) {
struct sockaddr_storage sockname_temp ;
socklen_t sockname_len ;
grpc_error * error = GRPC_ERROR_NONE ;
if ( sock = = INVALID_SOCKET ) goto error ;
if ( ! grpc_tcp_prepare_socket ( sock ) ) {
char * utf8_message = gpr_format_message ( WSAGetLastError ( ) ) ;
gpr_log ( GPR_ERROR , " Unable to prepare socket: %s " , utf8_message ) ;
gpr_free ( utf8_message ) ;
goto error ;
error = grpc_tcp_prepare_socket ( sock ) ;
if ( error ! = GRPC_ERROR_NONE ) {
goto failure ;
}
if ( bind ( sock , addr , ( int ) addr_len ) = = SOCKET_ERROR ) {
char * addr_str ;
char * utf8_message = gpr_format_message ( WSAGetLastError ( ) ) ;
grpc_sockaddr_to_string ( & addr_str , addr , 0 ) ;
gpr_log ( GPR_ERROR , " bind addr=%s: %s " , addr_str , utf8_message ) ;
gpr_free ( utf8_message ) ;
gpr_free ( addr_str ) ;
goto error ;
error = GRPC_WSA_ERROR ( WSAGetLastError ( ) , " bind " ) ;
goto failure ;
}
if ( listen ( sock , SOMAXCONN ) = = SOCKET_ERROR ) {
char * utf8_message = gpr_format_message ( WSAGetLastError ( ) ) ;
gpr_log ( GPR_ERROR , " listen: %s " , utf8_message ) ;
gpr_free ( utf8_message ) ;
goto error ;
error = GRPC_WSA_ERROR ( WSAGetLastError ( ) , " listen " ) ;
goto failure ;
}
sockname_len = sizeof ( sockname_temp ) ;
if ( getsockname ( sock , ( struct sockaddr * ) & sockname_temp , & sockname_len ) = =
SOCKET_ERROR ) {
char * utf8_message = gpr_format_message ( WSAGetLastError ( ) ) ;
gpr_log ( GPR_ERROR , " getsockname: %s " , utf8_message ) ;
gpr_free ( utf8_message ) ;
goto error ;
error = GRPC_WSA_ERROR ( WSAGetLastError ( ) , " getsockname " ) ;
goto failure ;
}
return grpc_sockaddr_get_port ( ( struct sockaddr * ) & sockname_temp ) ;
* port = grpc_sockaddr_get_port ( ( struct sockaddr * ) & sockname_temp ) ;
return GRPC_ERROR_NONE ;
error :
failure :
GPR_ASSERT ( error ! = GRPC_ERROR_NONE ) ;
char * tgtaddr = grpc_sockaddr_to_uri ( addr ) ;
grpc_error * final_error = grpc_error_set_int ( grpc_error_set_str ( GRPC_ERROR_CREATE_REFERENCING ( " Failed to prepare server socket " , & error , 1 ) , GRPC_ERROR_STR_TARGET_ADDRESS , tgtaddr ) , GRPC_ERROR_INT_FD , ( intptr_t ) sock ) ;
gpr_free ( tgtaddr ) ;
GRPC_ERROR_UNREF ( error ) ;
if ( sock ! = INVALID_SOCKET ) closesocket ( sock ) ;
return - 1 ;
return error ;
}
static void decrement_active_ports_and_notify ( grpc_exec_ctx * exec_ctx ,
@ -251,26 +246,22 @@ static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx,
/* In order to do an async accept, we need to create a socket first which
will be the one assigned to the new incoming connection . */
static void start_accept ( grpc_exec_ctx * exec_ctx , grpc_tcp_listener * port ) {
static grpc_error * start_accept ( grpc_exec_ctx * exec_ctx , grpc_tcp_listener * port ) {
SOCKET sock = INVALID_SOCKET ;
char * message ;
char * utf8_message ;
BOOL success ;
DWORD addrlen = sizeof ( struct sockaddr_in6 ) + 16 ;
DWORD bytes_received = 0 ;
grpc_error * error = GRPC_ERROR_NONE ;
sock = WSASocket ( AF_INET6 , SOCK_STREAM , IPPROTO_TCP , NULL , 0 ,
WSA_FLAG_OVERLAPPED ) ;
if ( sock = = INVALID_SOCKET ) {
message = " Unable to create socket: %s " ;
error = GRPC_WSA_ERROR ( WSAGetLastError ( ) , " WSASocket " ) ;
goto failure ;
}
if ( ! grpc_tcp_prepare_socket ( sock ) ) {
message = " Unable to prepare socket: %s " ;
goto failure ;
}
error = grpc_tcp_prepare_socket ( sock ) ;
if ( error ! = GRPC_ERROR_NONE ) goto failure ;
/* Start the "accept" asynchronously. */
success = port - > AcceptEx ( port - > socket - > socket , sock , port - > addresses , 0 ,
@ -280,9 +271,9 @@ static void start_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *port) {
/* It is possible to get an accept immediately without delay. However, we
will still get an IOCP notification for it . So let ' s just ignore it . */
if ( ! success ) {
int error = WSAGetLastError ( ) ;
if ( error ! = ERROR_IO_PENDING ) {
message = " AcceptEx failed: %s " ;
int last_ error = WSAGetLastError ( ) ;
if ( last_ error ! = ERROR_IO_PENDING ) {
error = GRPC_WSA_ERROR ( last_error , " AcceptEx " ) ;
goto failure ;
}
}
@ -291,9 +282,10 @@ static void start_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *port) {
immediately process an accept that happened in the meantime . */
port - > new_socket = sock ;
grpc_socket_notify_on_read ( exec_ctx , port - > socket , & port - > on_accept ) ;
return ;
return error ;
failure :
GPR_ASSERT ( error ! = GRPC_ERROR_NONE ) ;
if ( port - > shutting_down ) {
/* We are abandoning the listener port, take that into account to prevent
occasional hangs on shutdown . The hang happens when sp - > shutting_down
@ -301,16 +293,15 @@ failure:
but we fail there because the listening port has been closed in the
meantime . */
decrement_active_ports_and_notify ( exec_ctx , port ) ;
return ;
GRPC_ERROR_UNREF ( error ) ;
return GRPC_ERROR_NONE ;
}
utf8_message = gpr_format_message ( WSAGetLastError ( ) ) ;
gpr_log ( GPR_ERROR , message , utf8_message ) ;
gpr_free ( utf8_message ) ;
if ( sock ! = INVALID_SOCKET ) closesocket ( sock ) ;
return error ;
}
/* Event manager callback when reads are ready. */
static void on_accept ( grpc_exec_ctx * exec_ctx , void * arg , bool from_iocp ) {
static void on_accept ( grpc_exec_ctx * exec_ctx , void * arg , grpc_error * error ) {
grpc_tcp_listener * sp = arg ;
grpc_tcp_server_acceptor acceptor = { sp - > server , sp - > port_index , 0 } ;
SOCKET sock = sp - > new_socket ;
@ -328,7 +319,10 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, bool from_iocp) {
/* The general mechanism for shutting down is to queue abortion calls. While
this is necessary in the read / write case , it ' s useless for the accept
case . We only need to adjust the pending callback count */
if ( ! from_iocp ) {
if ( error ! = GRPC_ERROR_NONE ) {
const char * msg = grpc_error_string ( error ) ;
gpr_log ( GPR_INFO , " Skipping on_accept due to error: %s " , msg ) ;
grpc_error_free_string ( msg ) ;
return ;
}
@ -386,21 +380,20 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, bool from_iocp) {
the former socked we created has now either been destroy or assigned
to the new connection . We need to create a new one for the next
connection . */
start_accept ( exec_ctx , sp ) ;
GPR_ASSERT ( GRPC_LOG_IF_ERROR ( " start_accept " , start_accept ( exec_ctx , sp ) ) ) ;
}
static grpc_tcp_listen er * add_socket_to_server ( grpc_tcp_server * s , SOCKET sock ,
static grpc_erro r * add_socket_to_server ( grpc_tcp_server * s , SOCKET sock ,
const struct sockaddr * addr ,
size_t addr_len ,
unsigned port_index ) {
unsigned port_index , grpc_tcp_listener * * listener ) {
grpc_tcp_listener * sp = NULL ;
int port ;
int status ;
GUID guid = WSAID_ACCEPTEX ;
DWORD ioctl_num_bytes ;
LPFN_ACCEPTEX AcceptEx ;
if ( sock = = INVALID_SOCKET ) return NULL ;
grpc_error * error = GRPC_ERROR_NONE ;
/* We need to grab the AcceptEx pointer for that port, as it may be
interface - dependent . We ' ll cache it to avoid doing that again . */
@ -416,8 +409,12 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
return NULL ;
}
port = prepare_socket ( sock , addr , addr_len ) ;
if ( port > = 0 ) {
error = prepare_socket ( sock , addr , addr_len , & port ) ;
if ( error ! = GRPC_ERROR_NONE ) {
return error ;
}
GPR_ASSERT ( port > = 0 ) ;
gpr_mu_lock ( & s - > mu ) ;
GPR_ASSERT ( ! s - > on_accept_cb & & " must add ports before starting server " ) ;
sp = gpr_malloc ( sizeof ( grpc_tcp_listener ) ) ;
@ -438,13 +435,13 @@ static grpc_tcp_listener *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
grpc_closure_init ( & sp - > on_accept , on_accept , sp ) ;
GPR_ASSERT ( sp - > socket ) ;
gpr_mu_unlock ( & s - > mu ) ;
}
* listener = sp ;
return sp ;
return GRPC_ERROR_NONE ;
}
int grpc_tcp_server_add_port ( grpc_tcp_server * s , const void * addr ,
size_t addr_len ) {
grpc_error * grpc_tcp_server_add_port ( grpc_tcp_server * s , const void * addr ,
size_t addr_len , int * port ) {
grpc_tcp_listener * sp ;
SOCKET sock ;
struct sockaddr_in6 addr6_v4mapped ;
@ -452,8 +449,9 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
struct sockaddr * allocated_addr = NULL ;
struct sockaddr_storage sockname_temp ;
socklen_t sockname_len ;
int port ;
unsigned port_index = 0 ;
grpc_error * error = GRPC_ERROR_NONE ;
if ( s - > tail ! = NULL ) {
port_index = s - > tail - > port_index + 1 ;
}
@ -465,11 +463,11 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
sockname_len = sizeof ( sockname_temp ) ;
if ( 0 = = getsockname ( sp - > socket - > socket ,
( struct sockaddr * ) & sockname_temp , & sockname_len ) ) {
port = grpc_sockaddr_get_port ( ( struct sockaddr * ) & sockname_temp ) ;
if ( port > 0 ) {
* port = grpc_sockaddr_get_port ( ( struct sockaddr * ) & sockname_temp ) ;
if ( * port > 0 ) {
allocated_addr = gpr_malloc ( addr_len ) ;
memcpy ( allocated_addr , addr , addr_len ) ;
grpc_sockaddr_set_port ( allocated_addr , port ) ;
grpc_sockaddr_set_port ( allocated_addr , * port ) ;
addr = allocated_addr ;
break ;
}
@ -483,8 +481,8 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
}
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
if ( grpc_sockaddr_is_wildcard ( addr , & port ) ) {
grpc_sockaddr_make_wildcard6 ( port , & wildcard ) ;
if ( grpc_sockaddr_is_wildcard ( addr , port ) ) {
grpc_sockaddr_make_wildcard6 ( * port , & wildcard ) ;
addr = ( struct sockaddr * ) & wildcard ;
addr_len = sizeof ( wildcard ) ;
@ -493,19 +491,21 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
sock = WSASocket ( AF_INET6 , SOCK_STREAM , IPPROTO_TCP , NULL , 0 ,
WSA_FLAG_OVERLAPPED ) ;
if ( sock = = INVALID_SOCKET ) {
char * utf8_message = gpr_format_message ( WSAGetLastError ( ) ) ;
gpr_log ( GPR_ERROR , " unable to create socket: %s " , utf8_message ) ;
gpr_free ( utf8_message ) ;
error = GRPC_WSA_ERROR ( WSAGetLastError ( ) , " WSASocket " ) ;
goto done ;
}
sp = add_socket_to_server ( s , sock , addr , addr_len , port_index ) ;
error = add_socket_to_server ( s , sock , addr , addr_len , port_index , & sp ) ;
done :
gpr_free ( allocated_addr ) ;
if ( sp ) {
return sp - > port ;
} else {
return - 1 ;
if ( error ! = GRPC_ERROR_NONE ) {
grpc_error * error_out = GRPC_ERROR_CREATE_REFERENCING ( " Failed to add port to server " , & error , 1 ) ;
GRPC_ERROR_UNREF ( error ) ;
error = error_out ;
}
return error ;
}
void grpc_tcp_server_start ( grpc_exec_ctx * exec_ctx , grpc_tcp_server * s ,
@ -520,7 +520,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
s - > on_accept_cb = on_accept_cb ;
s - > on_accept_cb_arg = on_accept_cb_arg ;
for ( sp = s - > head ; sp ; sp = sp - > next ) {
start_accept ( exec_ctx , sp ) ;
GPR_ASSERT ( GRPC_LOG_IF_ERROR ( " start_accept " , start_accept ( exec_ctx , sp ) ) ) ;
s - > active_ports + + ;
}
gpr_mu_unlock ( & s - > mu ) ;