@ -90,98 +90,211 @@ class Chttp2ServerListener : public Server::ListenerInterface {
class ConfigFetcherWatcher
: public grpc_server_config_fetcher : : WatcherInterface {
public :
explicit ConfigFetcherWatcher ( Chttp2ServerListener * listener )
: listener_ ( listener ) { }
void UpdateConfig ( grpc_channel_args * args ) override {
{
MutexLock lock ( & listener_ - > mu_ ) ;
grpc_channel_args_destroy ( listener_ - > args_ ) ;
grpc_error * error = GRPC_ERROR_NONE ;
args = listener_ - > args_modifier_ ( args , & error ) ;
if ( error ! = GRPC_ERROR_NONE ) {
// TODO(yashykt): Set state to close down connections immediately
// after accepting.
GPR_ASSERT ( 0 ) ;
}
listener_ - > args_ = args ;
if ( ! listener_ - > shutdown_ ) return ; // Already started listening.
}
int port_temp ;
grpc_error * error = grpc_tcp_server_add_port (
listener_ - > tcp_server_ , & listener_ - > resolved_address_ , & port_temp ) ;
if ( error ! = GRPC_ERROR_NONE ) {
GRPC_ERROR_UNREF ( error ) ;
gpr_log ( GPR_ERROR , " Error adding port to server: %s " ,
grpc_error_string ( error ) ) ;
// TODO(yashykt): We wouldn't need to assert here if we bound to the
// port earlier during AddPort.
GPR_ASSERT ( 0 ) ;
}
listener_ - > StartListening ( ) ;
}
explicit ConfigFetcherWatcher ( RefCountedPtr < Chttp2ServerListener > listener )
: listener_ ( std : : move ( listener ) ) { }
void UpdateConfig ( grpc_channel_args * args ) override ;
void StopServing ( ) override ;
private :
Chttp2ServerListener * listener_ ;
RefCountedPtr < Chttp2ServerListener > listener_ ;
} ;
class ConnectionState : public RefCounted < ConnectionState > {
class ActiveConnection : public InternallyRefCounted < ActiveConnection > {
public :
ConnectionState ( Chttp2ServerListener * listener ,
grpc_pollset * accepting_pollset ,
grpc_tcp_server_acceptor * acceptor ,
RefCountedPtr < HandshakeManager > handshake_mgr ,
grpc_channel_args * args , grpc_endpoint * endpoint ) ;
class HandshakingState : public InternallyRefCounted < HandshakingState > {
public :
HandshakingState ( RefCountedPtr < ActiveConnection > connection_ref ,
grpc_pollset * accepting_pollset ,
grpc_tcp_server_acceptor * acceptor ,
grpc_channel_args * args ) ;
~ HandshakingState ( ) override ;
void Orphan ( ) override ;
~ ConnectionState ( ) override ;
void Start ( grpc_endpoint * endpoint , grpc_channel_args * args ) ;
// Needed to be able to grab an external ref in ActiveConnection::Start()
using InternallyRefCounted < HandshakingState > : : Ref ;
private :
static void OnTimeout ( void * arg , grpc_error * error ) ;
static void OnReceiveSettings ( void * arg , grpc_error * /* error */ ) ;
static void OnHandshakeDone ( void * arg , grpc_error * error ) ;
RefCountedPtr < ActiveConnection > const connection_ ;
grpc_pollset * const accepting_pollset_ ;
grpc_tcp_server_acceptor * const acceptor_ ;
RefCountedPtr < HandshakeManager > handshake_mgr_
ABSL_GUARDED_BY ( & connection_ - > mu_ ) ;
// State for enforcing handshake timeout on receiving HTTP/2 settings.
grpc_millis const deadline_ ;
grpc_timer timer_ ABSL_GUARDED_BY ( & connection_ - > mu_ ) ;
grpc_closure on_timeout_ ABSL_GUARDED_BY ( & connection_ - > mu_ ) ;
grpc_closure on_receive_settings_ ABSL_GUARDED_BY ( & connection_ - > mu_ ) ;
grpc_pollset_set * const interested_parties_ ;
} ;
ActiveConnection ( grpc_pollset * accepting_pollset ,
grpc_tcp_server_acceptor * acceptor ,
grpc_channel_args * args ) ;
~ ActiveConnection ( ) override ;
void Orphan ( ) override ;
void SendGoAway ( ) ;
void Start ( RefCountedPtr < Chttp2ServerListener > listener ,
grpc_endpoint * endpoint , grpc_channel_args * args ) ;
// Needed to be able to grab an external ref in
// Chttp2ServerListener::OnAccept()
using InternallyRefCounted < ActiveConnection > : : Ref ;
private :
static void OnTimeout ( void * arg , grpc_error * error ) ;
static void OnReceiveSettings ( void * arg , grpc_error * error ) ;
static void OnHandshakeDone ( void * arg , grpc_error * error ) ;
Chttp2ServerListener * const listener_ ;
grpc_pollset * const accepting_pollset_ ;
grpc_tcp_server_acceptor * const acceptor_ ;
RefCountedPtr < HandshakeManager > handshake_mgr_ ;
// State for enforcing handshake timeout on receiving HTTP/2 settings.
grpc_chttp2_transport * transport_ = nullptr ;
grpc_millis deadline_ ;
grpc_timer timer_ ;
grpc_closure on_timeout_ ;
grpc_closure on_receive_settings_ ;
grpc_pollset_set * const interested_parties_ ;
static void OnClose ( void * arg , grpc_error * error ) ;
RefCountedPtr < Chttp2ServerListener > listener_ ;
Mutex mu_ ABSL_ACQUIRED_AFTER ( & listener_ - > mu_ ) ;
// Set by HandshakingState before the handshaking begins and reset when
// handshaking is done.
OrphanablePtr < HandshakingState > handshaking_state_ ABSL_GUARDED_BY ( & mu_ ) ;
// Set by HandshakingState when handshaking is done and a valid transport is
// created.
grpc_chttp2_transport * transport_ ABSL_GUARDED_BY ( & mu_ ) = nullptr ;
grpc_closure on_close_ ;
bool shutdown_ ABSL_GUARDED_BY ( & mu_ ) = false ;
} ;
// To allow access to RefCounted<> like interface.
friend class RefCountedPtr < Chttp2ServerListener > ;
// Should only be called once so as to start the TCP server.
void StartListening ( ) ;
static void OnAccept ( void * arg , grpc_endpoint * tcp ,
grpc_pollset * accepting_pollset ,
grpc_tcp_server_acceptor * acceptor ) ;
RefCountedPtr < HandshakeManager > CreateHandshakeManager ( ) ;
static void TcpServerShutdownComplete ( void * arg , grpc_error * error ) ;
static void DestroyListener ( Server * /*server*/ , void * arg ,
grpc_closure * destroy_done ) ;
// The interface required by RefCountedPtr<> has been manually implemented
// here to take a ref on tcp_server_ instead. Note that, the handshaker needs
// tcp_server_ to exist for the lifetime of the handshake since it's needed by
// acceptor. Sharing refs between the listener and tcp_server_ is just an
// optimization to avoid taking additional refs on the listener, since
// TcpServerShutdownComplete already holds a ref to the listener.
void IncrementRefCount ( ) { grpc_tcp_server_ref ( tcp_server_ ) ; }
void IncrementRefCount ( const DebugLocation & /* location */ ,
const char * /* reason */ ) {
IncrementRefCount ( ) ;
}
RefCountedPtr < Chttp2ServerListener > Ref ( ) GRPC_MUST_USE_RESULT {
IncrementRefCount ( ) ;
return RefCountedPtr < Chttp2ServerListener > ( this ) ;
}
RefCountedPtr < Chttp2ServerListener > Ref ( const DebugLocation & /* location */ ,
const char * /* reason */ )
GRPC_MUST_USE_RESULT {
return Ref ( ) ;
}
void Unref ( ) { grpc_tcp_server_unref ( tcp_server_ ) ; }
void Unref ( const DebugLocation & /* location */ , const char * /* reason */ ) {
Unref ( ) ;
}
Server * const server_ ;
grpc_tcp_server * tcp_server_ ;
grpc_resolved_address resolved_address_ ;
Chttp2ServerArgsModifier args_modifier_ ;
Mutex mu_ ;
grpc_channel_args * args_ ; // guarded by mu_
Chttp2ServerArgsModifier const args_modifier_ ;
ConfigFetcherWatcher * config_fetcher_watcher_ = nullptr ;
bool shutdown_ = true ;
grpc_closure tcp_server_shutdown_complete_ ;
grpc_closure * on_destroy_done_ = nullptr ;
HandshakeManager * pending_handshake_mgrs_ = nullptr ;
Mutex channel_args_mu_ ;
grpc_channel_args * args_ ABSL_GUARDED_BY ( channel_args_mu_ ) ;
Mutex mu_ ;
// Signals whether grpc_tcp_server_start() has been called.
bool started_ ABSL_GUARDED_BY ( mu_ ) = false ;
// Signals whether grpc_tcp_server_start() has completed.
CondVar started_cv_ ABSL_GUARDED_BY ( mu_ ) ;
// Signals whether new requests/connections are to be accepted.
bool is_serving_ ABSL_GUARDED_BY ( mu_ ) = false ;
// Signals whether the application has triggered shutdown.
bool shutdown_ ABSL_GUARDED_BY ( mu_ ) = false ;
std : : map < ActiveConnection * , OrphanablePtr < ActiveConnection > > connections_
ABSL_GUARDED_BY ( mu_ ) ;
grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY ( mu_ ) ;
grpc_closure * on_destroy_done_ ABSL_GUARDED_BY ( mu_ ) = nullptr ;
RefCountedPtr < channelz : : ListenSocketNode > channelz_listen_socket_ ;
} ;
//
// Chttp2ServerListener::ConnectionState
// Chttp2ServerListener::ConfigFetcherWatcher
//
void Chttp2ServerListener : : ConfigFetcherWatcher : : UpdateConfig (
grpc_channel_args * args ) {
grpc_error * error = GRPC_ERROR_NONE ;
args = listener_ - > args_modifier_ ( args , & error ) ;
if ( error ! = GRPC_ERROR_NONE ) {
// TODO(yashykt): Set state to close down connections immediately
// after accepting.
GPR_ASSERT ( 0 ) ;
}
grpc_channel_args * args_to_destroy = nullptr ;
{
MutexLock lock ( & listener_ - > channel_args_mu_ ) ;
args_to_destroy = listener_ - > args_ ;
listener_ - > args_ = args ;
}
grpc_channel_args_destroy ( args_to_destroy ) ;
{
MutexLock lock ( & listener_ - > mu_ ) ;
if ( listener_ - > shutdown_ ) {
return ;
}
listener_ - > is_serving_ = true ;
if ( listener_ - > started_ ) return ;
}
int port_temp ;
error = grpc_tcp_server_add_port ( listener_ - > tcp_server_ ,
& listener_ - > resolved_address_ , & port_temp ) ;
if ( error ! = GRPC_ERROR_NONE ) {
GRPC_ERROR_UNREF ( error ) ;
gpr_log ( GPR_ERROR , " Error adding port to server: %s " ,
grpc_error_string ( error ) ) ;
// TODO(yashykt): We wouldn't need to assert here if we bound to the
// port earlier during AddPort.
GPR_ASSERT ( 0 ) ;
}
listener_ - > StartListening ( ) ;
{
MutexLock lock ( & listener_ - > mu_ ) ;
listener_ - > started_ = true ;
listener_ - > started_cv_ . SignalAll ( ) ;
}
}
void Chttp2ServerListener : : ConfigFetcherWatcher : : StopServing ( ) {
std : : map < ActiveConnection * , OrphanablePtr < ActiveConnection > > connections ;
{
MutexLock lock ( & listener_ - > mu_ ) ;
listener_ - > is_serving_ = false ;
connections = std : : move ( listener_ - > connections_ ) ;
}
// Send GOAWAYs on the transports so that they disconnected when existing RPCs
// finish.
for ( auto & connection : connections ) {
connection . first - > SendGoAway ( ) ;
}
}
//
// Chttp2ServerListener::ActiveConnection::HandshakingState
//
grpc_millis GetConnectionDeadline ( const grpc_channel_args * args ) {
@ -191,73 +304,96 @@ grpc_millis GetConnectionDeadline(const grpc_channel_args* args) {
return ExecCtx : : Get ( ) - > Now ( ) + timeout_ms ;
}
Chttp2ServerListener : : ConnectionState : : ConnectionState (
Chttp2ServerListener * listener , grpc_pollset * accepting_pollset ,
grpc_tcp_server_acceptor * acceptor ,
RefCountedPtr < HandshakeManager > handshake_mgr , grpc_channel_args * args ,
grpc_endpoint * endpoint )
: listener_ ( listener ) ,
Chttp2ServerListener : : ActiveConnection : : HandshakingState : : HandshakingState (
RefCountedPtr < ActiveConnection > connection_ref ,
grpc_pollset * accepting_pollset , grpc_tcp_server_acceptor * acceptor ,
grpc_channel_args * args )
: connection_ ( std : : move ( connection_ref ) ) ,
accepting_pollset_ ( accepting_pollset ) ,
acceptor_ ( acceptor ) ,
handshake_mgr_ ( std : : move ( handshake_mgr ) ) ,
handshake_mgr_ ( MakeRefCounted < HandshakeManager > ( ) ) ,
deadline_ ( GetConnectionDeadline ( args ) ) ,
interested_parties_ ( grpc_pollset_set_create ( ) ) {
grpc_pollset_set_add_pollset ( interested_parties_ , accepting_pollset_ ) ;
HandshakerRegistry : : AddHandshakers ( HANDSHAKER_SERVER , args ,
interested_parties_ , handshake_mgr_ . get ( ) ) ;
handshake_mgr_ - > DoHandshake ( endpoint , args , deadline_ , acceptor_ ,
OnHandshakeDone , this ) ;
}
Chttp2ServerListener : : ConnectionState : : ~ ConnectionState ( ) {
if ( transport_ ! = nullptr ) {
GRPC_CHTTP2_UNREF_TRANSPORT ( transport_ , " receive settings timeout " ) ;
}
Chttp2ServerListener : : ActiveConnection : : HandshakingState : : ~ HandshakingState ( ) {
grpc_pollset_set_del_pollset ( interested_parties_ , accepting_pollset_ ) ;
grpc_pollset_set_destroy ( interested_parties_ ) ;
}
void Chttp2ServerListener : : ConnectionState : : OnTimeout ( void * arg ,
grpc_error * error ) {
ConnectionState * self = static_cast < ConnectionState * > ( arg ) ;
void Chttp2ServerListener : : ActiveConnection : : HandshakingState : : Orphan ( ) {
{
MutexLock lock ( & connection_ - > mu_ ) ;
if ( handshake_mgr_ ! = nullptr ) {
handshake_mgr_ - > Shutdown (
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Listener stopped serving. " ) ) ;
}
}
Unref ( ) ;
}
void Chttp2ServerListener : : ActiveConnection : : HandshakingState : : Start (
grpc_endpoint * endpoint , grpc_channel_args * args ) {
Ref ( ) . release ( ) ; // Held by OnHandshakeDone
RefCountedPtr < HandshakeManager > handshake_mgr ;
{
MutexLock lock ( & connection_ - > mu_ ) ;
if ( handshake_mgr_ = = nullptr ) return ;
handshake_mgr = handshake_mgr_ ;
}
handshake_mgr - > DoHandshake ( endpoint , args , deadline_ , acceptor_ ,
OnHandshakeDone , this ) ;
}
void Chttp2ServerListener : : ActiveConnection : : HandshakingState : : OnTimeout (
void * arg , grpc_error * error ) {
HandshakingState * self = static_cast < HandshakingState * > ( arg ) ;
// Note that we may be called with GRPC_ERROR_NONE when the timer fires
// or with an error indicating that the timer system is being shut down.
if ( error ! = GRPC_ERROR_CANCELLED ) {
grpc_transport_op * op = grpc_make_transport_op ( nullptr ) ;
op - > disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" Did not receive HTTP/2 settings before handshake timeout " ) ;
grpc_transport_perform_op ( & self - > transport_ - > base , op ) ;
grpc_chttp2_transport * transport = nullptr ;
{
MutexLock lock ( & self - > connection_ - > mu_ ) ;
transport = self - > connection_ - > transport_ ;
}
grpc_transport_perform_op ( & transport - > base , op ) ;
}
self - > Unref ( ) ;
}
void Chttp2ServerListener : : ConnectionState : : OnReceiveSettings (
void * arg , grpc_error * error ) {
ConnectionState * self = static_cast < ConnectionState * > ( arg ) ;
if ( error = = GRPC_ERROR_NONE ) {
grpc_timer_cancel ( & self - > timer_ ) ;
}
void Chttp2ServerListener : : ActiveConnection : : HandshakingState : :
OnReceiveSettings ( void * arg , grpc_error * /* error */ ) {
HandshakingState * self = static_cast < HandshakingState * > ( arg ) ;
grpc_timer_cancel ( & self - > timer_ ) ;
self - > Unref ( ) ;
}
void Chttp2ServerListener : : ConnectionState : : OnHandshakeDone ( void * arg ,
grpc_error * error ) {
void Chttp2ServerListener : : Active Connection: : Handshaking State: : OnHandshakeDone (
void * arg , grpc_error * error ) {
auto * args = static_cast < HandshakerArgs * > ( arg ) ;
ConnectionState * self = static_cast < ConnectionState * > ( args - > user_data ) ;
HandshakingState * self = static_cast < HandshakingState * > ( args - > user_data ) ;
OrphanablePtr < HandshakingState > handshaking_state_ref ;
RefCountedPtr < HandshakeManager > handshake_mgr ;
bool cleanup_connection = false ;
bool free_resource_quota = false ;
grpc_resource_user * resource_user =
self - > connection_ - > listener_ - > server_ - > default_resource_user ( ) ;
{
MutexLock lock ( & self - > listener_ - > mu_ ) ;
grpc_resource_user * resource_user =
self - > listener_ - > server_ - > default_resource_user ( ) ;
if ( error ! = GRPC_ERROR_NONE | | self - > listener_ - > shutdown_ ) {
MutexLock connection_lock ( & self - > connection_ - > mu_ ) ;
if ( error ! = GRPC_ERROR_NONE | | self - > connection_ - > shutdown_ ) {
const char * error_str = grpc_error_string ( error ) ;
gpr_log ( GPR_DEBUG , " Handshaking failed: %s " , error_str ) ;
if ( resource_user ! = nullptr ) {
grpc_resource_user_free ( resource_user ,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE ) ;
}
cleanup_connection = true ;
free_resource_quota = true ;
if ( error = = GRPC_ERROR_NONE & & args - > endpoint ! = nullptr ) {
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
// We were shut down or stopped serving after handshaking completed
// successfully, so destroy the endpoint here.
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
@ -275,9 +411,11 @@ void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
if ( args - > endpoint ! = nullptr ) {
grpc_transport * transport = grpc_create_chttp2_transport (
args - > args , args - > endpoint , false , resource_user ) ;
grpc_error * channel_init_err = self - > listener_ - > server_ - > SetupTransport (
transport , self - > accepting_pollset_ , args - > args ,
grpc_chttp2_transport_get_socket_node ( transport ) , resource_user ) ;
grpc_error * channel_init_err =
self - > connection_ - > listener_ - > server_ - > SetupTransport (
transport , self - > accepting_pollset_ , args - > args ,
grpc_chttp2_transport_get_socket_node ( transport ) ,
resource_user ) ;
if ( channel_init_err = = GRPC_ERROR_NONE ) {
// Use notify_on_receive_settings callback to enforce the
// handshake deadline.
@ -287,18 +425,32 @@ void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
// static_cast<> to a derived class.
// TODO(roth): Change to static_cast<> when we C++-ify the
// transport API.
self - > transport_ =
self - > connection_ - > transport_ =
reinterpret_cast < grpc_chttp2_transport * > ( transport ) ;
GRPC_CHTTP2_REF_TRANSPORT ( self - > connection_ - > transport_ ,
" ActiveConnection " ) ; // Held by connection_
self - > Ref ( ) . release ( ) ; // Held by OnReceiveSettings().
GRPC_CLOSURE_INIT ( & self - > on_receive_settings_ , OnReceiveSettings ,
self , grpc_schedule_on_exec_ctx ) ;
// If the listener has been configured with a config fetcher, we need
// to watch on the transport being closed so that we can an updated
// list of active connections.
grpc_closure * on_close = nullptr ;
if ( self - > connection_ - > listener_ - > config_fetcher_watcher_ ! =
nullptr ) {
// Refs helds by OnClose()
self - > connection_ - > Ref ( ) . release ( ) ;
on_close = & self - > connection_ - > on_close_ ;
} else {
// Remove the connection from the connections_ map since OnClose()
// will not be invoked when a config fetcher is set.
cleanup_connection = true ;
}
grpc_chttp2_transport_start_reading ( transport , args - > read_buffer ,
& self - > on_receive_settings_ ) ;
& self - > on_receive_settings_ ,
on_close ) ;
grpc_channel_args_destroy ( args - > args ) ;
self - > Ref ( ) . release ( ) ; // Held by OnTimeout().
GRPC_CHTTP2_REF_TRANSPORT (
reinterpret_cast < grpc_chttp2_transport * > ( transport ) ,
" receive settings timeout " ) ;
GRPC_CLOSURE_INIT ( & self - > on_timeout_ , OnTimeout , self ,
grpc_schedule_on_exec_ctx ) ;
grpc_timer_init ( & self - > timer_ , self - > deadline_ , & self - > on_timeout_ ) ;
@ -310,25 +462,116 @@ void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
grpc_transport_destroy ( transport ) ;
grpc_slice_buffer_destroy_internal ( args - > read_buffer ) ;
gpr_free ( args - > read_buffer ) ;
if ( resource_user ! = nullptr ) {
grpc_resource_user_free ( resource_user ,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE ) ;
}
cleanup_connection = true ;
free_resource_quota = true ;
grpc_channel_args_destroy ( args - > args ) ;
}
} else {
if ( resource_user ! = nullptr ) {
grpc_resource_user_free ( resource_user ,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE ) ;
}
cleanup_connection = true ;
free_resource_quota = true ;
}
}
self - > handshake_mgr_ - > RemoveFromPendingMgrList (
& self - > listener_ - > pending_handshake_mgrs_ ) ;
// Since the handshake manager is done, the connection no longer needs to
// shutdown the handshake when the listener needs to stop serving.
// Avoid calling the destructor of HandshakeManager and HandshakingState
// from within the critical region.
handshake_mgr = std : : move ( self - > handshake_mgr_ ) ;
handshaking_state_ref = std : : move ( self - > connection_ - > handshaking_state_ ) ;
}
self - > handshake_mgr_ . reset ( ) ;
gpr_free ( self - > acceptor_ ) ;
grpc_tcp_server_unref ( self - > listener_ - > tcp_server_ ) ;
OrphanablePtr < ActiveConnection > connection ;
if ( free_resource_quota & & resource_user ! = nullptr ) {
grpc_resource_user_free ( resource_user , GRPC_RESOURCE_QUOTA_CHANNEL_SIZE ) ;
}
if ( cleanup_connection ) {
MutexLock listener_lock ( & self - > connection_ - > listener_ - > mu_ ) ;
auto it = self - > connection_ - > listener_ - > connections_ . find (
self - > connection_ . get ( ) ) ;
if ( it ! = self - > connection_ - > listener_ - > connections_ . end ( ) ) {
connection = std : : move ( it - > second ) ;
self - > connection_ - > listener_ - > connections_ . erase ( it ) ;
}
}
self - > Unref ( ) ;
}
//
// Chttp2ServerListener::ActiveConnection
//
Chttp2ServerListener : : ActiveConnection : : ActiveConnection (
grpc_pollset * accepting_pollset , grpc_tcp_server_acceptor * acceptor ,
grpc_channel_args * args )
: handshaking_state_ ( MakeOrphanable < HandshakingState > (
Ref ( ) , accepting_pollset , acceptor , args ) ) {
GRPC_CLOSURE_INIT ( & on_close_ , ActiveConnection : : OnClose , this ,
grpc_schedule_on_exec_ctx ) ;
}
Chttp2ServerListener : : ActiveConnection : : ~ ActiveConnection ( ) {
if ( transport_ ! = nullptr ) {
GRPC_CHTTP2_UNREF_TRANSPORT ( transport_ , " ActiveConnection " ) ;
}
}
void Chttp2ServerListener : : ActiveConnection : : Orphan ( ) {
OrphanablePtr < HandshakingState > handshaking_state ;
{
MutexLock lock ( & mu_ ) ;
shutdown_ = true ;
// Reset handshaking_state_ since we have been orphaned by the listener
// signaling that the listener has stopped serving.
handshaking_state = std : : move ( handshaking_state_ ) ;
}
Unref ( ) ;
}
void Chttp2ServerListener : : ActiveConnection : : SendGoAway ( ) {
grpc_chttp2_transport * transport = nullptr ;
{
MutexLock lock ( & mu_ ) ;
transport = transport_ ;
}
if ( transport ! = nullptr ) {
grpc_transport_op * op = grpc_make_transport_op ( nullptr ) ;
op - > goaway_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" Server is stopping to serve requests. " ) ;
grpc_transport_perform_op ( & transport - > base , op ) ;
}
}
void Chttp2ServerListener : : ActiveConnection : : Start (
RefCountedPtr < Chttp2ServerListener > listener , grpc_endpoint * endpoint ,
grpc_channel_args * args ) {
RefCountedPtr < HandshakingState > handshaking_state_ref ;
listener_ = std : : move ( listener ) ;
{
MutexLock lock ( & mu_ ) ;
if ( shutdown_ ) return ;
// Hold a ref to HandshakingState to allow starting the handshake outside
// the critical region.
handshaking_state_ref = handshaking_state_ - > Ref ( ) ;
}
handshaking_state_ref - > Start ( endpoint , args ) ;
}
void Chttp2ServerListener : : ActiveConnection : : OnClose ( void * arg ,
grpc_error * /* error */ ) {
ActiveConnection * self = static_cast < ActiveConnection * > ( arg ) ;
OrphanablePtr < ActiveConnection > connection ;
{
MutexLock listener_lock ( & self - > listener_ - > mu_ ) ;
MutexLock connection_lock ( & self - > mu_ ) ;
// The node was already deleted from the connections_ list if the connection
// is shutdown.
if ( ! self - > shutdown_ ) {
auto it = self - > listener_ - > connections_ . find ( self ) ;
if ( it ! = self - > listener_ - > connections_ . end ( ) ) {
connection = std : : move ( it - > second ) ;
self - > listener_ - > connections_ . erase ( it ) ;
}
}
}
self - > Unref ( ) ;
}
@ -414,6 +657,13 @@ Chttp2ServerListener::Chttp2ServerListener(
}
Chttp2ServerListener : : ~ Chttp2ServerListener ( ) {
// Flush queued work before destroying handshaker factory, since that
// may do a synchronous unref.
ExecCtx : : Get ( ) - > Flush ( ) ;
if ( on_destroy_done_ ! = nullptr ) {
ExecCtx : : Run ( DEBUG_LOCATION , on_destroy_done_ , GRPC_ERROR_NONE ) ;
ExecCtx : : Get ( ) - > Flush ( ) ;
}
grpc_channel_args_destroy ( args_ ) ;
}
@ -422,24 +672,27 @@ void Chttp2ServerListener::Start(
Server * /*server*/ , const std : : vector < grpc_pollset * > * /* pollsets */ ) {
if ( server_ - > config_fetcher ( ) ! = nullptr ) {
grpc_channel_args * args = nullptr ;
auto watcher = absl : : make_unique < ConfigFetcherWatcher > ( this ) ;
auto watcher = absl : : make_unique < ConfigFetcherWatcher > ( Ref ( ) ) ;
config_fetcher_watcher_ = watcher . get ( ) ;
{
MutexLock lock ( & mu_ ) ;
config_fetcher_watcher_ = watcher . get ( ) ;
MutexLock lock ( & channel_args_mu_ ) ;
args = grpc_channel_args_copy ( args_ ) ;
}
server_ - > config_fetcher ( ) - > StartWatch (
grpc_sockaddr_to_string ( & resolved_address_ , false ) , args ,
std : : move ( watcher ) ) ;
} else {
{
MutexLock lock ( & mu_ ) ;
started_ = true ;
is_serving_ = true ;
}
StartListening ( ) ;
}
}
void Chttp2ServerListener : : StartListening ( ) {
grpc_tcp_server_start ( tcp_server_ , & server_ - > pollsets ( ) , OnAccept , this ) ;
MutexLock lock ( & mu_ ) ;
shutdown_ = false ;
}
void Chttp2ServerListener : : SetOnDestroyDone ( grpc_closure * on_destroy_done ) {
@ -447,67 +700,59 @@ void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
on_destroy_done_ = on_destroy_done ;
}
RefCountedPtr < HandshakeManager > Chttp2ServerListener : : CreateHandshakeManager ( ) {
MutexLock lock ( & mu_ ) ;
if ( shutdown_ ) return nullptr ;
grpc_resource_user * resource_user = server_ - > default_resource_user ( ) ;
if ( resource_user ! = nullptr & &
! grpc_resource_user_safe_alloc ( resource_user ,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE ) ) {
gpr_log ( GPR_ERROR ,
" Memory quota exhausted, rejecting connection, no handshaking. " ) ;
return nullptr ;
}
auto handshake_mgr = MakeRefCounted < HandshakeManager > ( ) ;
handshake_mgr - > AddToPendingMgrList ( & pending_handshake_mgrs_ ) ;
grpc_tcp_server_ref ( tcp_server_ ) ; // Ref held by ConnectionState.
return handshake_mgr ;
}
void Chttp2ServerListener : : OnAccept ( void * arg , grpc_endpoint * tcp ,
grpc_pollset * accepting_pollset ,
grpc_tcp_server_acceptor * acceptor ) {
Chttp2ServerListener * self = static_cast < Chttp2ServerListener * > ( arg ) ;
RefCountedPtr < HandshakeManager > handshake_mgr =
self - > CreateHandshakeManager ( ) ;
if ( handshake_mgr = = nullptr ) {
grpc_endpoint_shutdown ( tcp , GRPC_ERROR_NONE ) ;
grpc_endpoint_destroy ( tcp ) ;
gpr_free ( acceptor ) ;
return ;
}
grpc_channel_args * args = nullptr ;
{
MutexLock lock ( & self - > mu_ ) ;
MutexLock lock ( & self - > channel_args_mu_ ) ;
args = grpc_channel_args_copy ( self - > args_ ) ;
}
// Deletes itself when done.
new ConnectionState ( self , accepting_pollset , acceptor ,
std : : move ( handshake_mgr ) , args , tcp ) ;
auto connection =
MakeOrphanable < ActiveConnection > ( accepting_pollset , acceptor , args ) ;
// Hold a ref to connection to allow starting handshake outside the
// critical region
RefCountedPtr < ActiveConnection > connection_ref = connection - > Ref ( ) ;
RefCountedPtr < Chttp2ServerListener > listener_ref ;
{
MutexLock lock ( & self - > mu_ ) ;
// Shutdown the the connection if listener's stopped serving.
if ( ! self - > shutdown_ & & self - > is_serving_ ) {
grpc_resource_user * resource_user =
self - > server_ - > default_resource_user ( ) ;
if ( resource_user ! = nullptr & &
! grpc_resource_user_safe_alloc ( resource_user ,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE ) ) {
gpr_log (
GPR_ERROR ,
" Memory quota exhausted, rejecting connection, no handshaking. " ) ;
} else {
// This ref needs to be taken in the critical region after having made
// sure that the listener has not been Orphaned, so as to avoid
// heap-use-after-free issues where `Ref()` is invoked when the ref of
// tcp_server_ has already reached 0. (Ref() implementation of
// Chttp2ServerListener is grpc_tcp_server_ref().)
listener_ref = self - > Ref ( ) ;
self - > connections_ . emplace ( connection . get ( ) , std : : move ( connection ) ) ;
}
}
}
if ( connection ! = nullptr ) {
grpc_endpoint_shutdown ( tcp , GRPC_ERROR_NONE ) ;
grpc_endpoint_destroy ( tcp ) ;
gpr_free ( acceptor ) ;
} else {
connection_ref - > Start ( std : : move ( listener_ref ) , tcp , args ) ;
}
grpc_channel_args_destroy ( args ) ;
}
void Chttp2ServerListener : : TcpServerShutdownComplete ( void * arg ,
grpc_error * error ) {
Chttp2ServerListener * self = static_cast < Chttp2ServerListener * > ( arg ) ;
/* ensure all threads have unlocked */
grpc_closure * destroy_done = nullptr ;
{
MutexLock lock ( & self - > mu_ ) ;
destroy_done = self - > on_destroy_done_ ;
GPR_ASSERT ( self - > shutdown_ ) ;
if ( self - > pending_handshake_mgrs_ ! = nullptr ) {
self - > pending_handshake_mgrs_ - > ShutdownAllPending ( GRPC_ERROR_REF ( error ) ) ;
}
self - > channelz_listen_socket_ . reset ( ) ;
}
// Flush queued work before destroying handshaker factory, since that
// may do a synchronous unref.
ExecCtx : : Get ( ) - > Flush ( ) ;
if ( destroy_done ! = nullptr ) {
ExecCtx : : Run ( DEBUG_LOCATION , destroy_done , GRPC_ERROR_REF ( error ) ) ;
ExecCtx : : Get ( ) - > Flush ( ) ;
}
self - > channelz_listen_socket_ . reset ( ) ;
GRPC_ERROR_UNREF ( error ) ;
delete self ;
}
@ -519,10 +764,20 @@ void Chttp2ServerListener::Orphan() {
if ( config_fetcher_watcher_ ! = nullptr ) {
server_ - > config_fetcher ( ) - > CancelWatch ( config_fetcher_watcher_ ) ;
}
std : : map < ActiveConnection * , OrphanablePtr < ActiveConnection > > connections ;
grpc_tcp_server * tcp_server ;
{
MutexLock lock ( & mu_ ) ;
shutdown_ = true ;
is_serving_ = false ;
// Orphan the connections so that they can start cleaning up.
connections = std : : move ( connections_ ) ;
// If the listener is currently set to be serving but has not been started
// yet, it means that `grpc_tcp_server_start` is in progress. Wait for the
// operation to finish to avoid causing races.
while ( is_serving_ & & ! started_ ) {
started_cv_ . Wait ( & mu_ ) ;
}
tcp_server = tcp_server_ ;
}
grpc_tcp_server_shutdown_listeners ( tcp_server ) ;