|
|
@ -72,8 +72,7 @@ class Chttp2ServerListener : public Server::ListenerInterface { |
|
|
|
|
|
|
|
|
|
|
|
// Do not instantiate directly. Use one of the factory methods above.
|
|
|
|
// Do not instantiate directly. Use one of the factory methods above.
|
|
|
|
Chttp2ServerListener(Server* server, grpc_channel_args* args, |
|
|
|
Chttp2ServerListener(Server* server, grpc_channel_args* args, |
|
|
|
Chttp2ServerArgsModifier args_modifier, |
|
|
|
Chttp2ServerArgsModifier args_modifier); |
|
|
|
grpc_resource_quota* resource_quota); |
|
|
|
|
|
|
|
~Chttp2ServerListener() override; |
|
|
|
~Chttp2ServerListener() override; |
|
|
|
|
|
|
|
|
|
|
|
void Start(Server* server, |
|
|
|
void Start(Server* server, |
|
|
@ -111,8 +110,7 @@ class Chttp2ServerListener : public Server::ListenerInterface { |
|
|
|
HandshakingState(RefCountedPtr<ActiveConnection> connection_ref, |
|
|
|
HandshakingState(RefCountedPtr<ActiveConnection> connection_ref, |
|
|
|
grpc_pollset* accepting_pollset, |
|
|
|
grpc_pollset* accepting_pollset, |
|
|
|
grpc_tcp_server_acceptor* acceptor, |
|
|
|
grpc_tcp_server_acceptor* acceptor, |
|
|
|
grpc_channel_args* args, |
|
|
|
grpc_channel_args* args); |
|
|
|
grpc_resource_user* channel_resource_user); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
~HandshakingState() override; |
|
|
|
~HandshakingState() override; |
|
|
|
|
|
|
|
|
|
|
@ -138,13 +136,11 @@ class Chttp2ServerListener : public Server::ListenerInterface { |
|
|
|
grpc_closure on_timeout_ ABSL_GUARDED_BY(&connection_->mu_); |
|
|
|
grpc_closure on_timeout_ ABSL_GUARDED_BY(&connection_->mu_); |
|
|
|
grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_); |
|
|
|
grpc_closure on_receive_settings_ ABSL_GUARDED_BY(&connection_->mu_); |
|
|
|
grpc_pollset_set* const interested_parties_; |
|
|
|
grpc_pollset_set* const interested_parties_; |
|
|
|
grpc_resource_user* channel_resource_user_; |
|
|
|
|
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
ActiveConnection(grpc_pollset* accepting_pollset, |
|
|
|
ActiveConnection(grpc_pollset* accepting_pollset, |
|
|
|
grpc_tcp_server_acceptor* acceptor, |
|
|
|
grpc_tcp_server_acceptor* acceptor, |
|
|
|
grpc_channel_args* args, |
|
|
|
grpc_channel_args* args); |
|
|
|
grpc_resource_user* channel_resource_user); |
|
|
|
|
|
|
|
~ActiveConnection() override; |
|
|
|
~ActiveConnection() override; |
|
|
|
|
|
|
|
|
|
|
|
void Orphan() override; |
|
|
|
void Orphan() override; |
|
|
@ -238,7 +234,6 @@ class Chttp2ServerListener : public Server::ListenerInterface { |
|
|
|
grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_); |
|
|
|
grpc_closure tcp_server_shutdown_complete_ ABSL_GUARDED_BY(mu_); |
|
|
|
grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr; |
|
|
|
grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr; |
|
|
|
RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_; |
|
|
|
RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_; |
|
|
|
grpc_resource_quota* resource_quota_; |
|
|
|
|
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
//
|
|
|
@ -310,14 +305,13 @@ grpc_millis GetConnectionDeadline(const grpc_channel_args* args) { |
|
|
|
Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState( |
|
|
|
Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState( |
|
|
|
RefCountedPtr<ActiveConnection> connection_ref, |
|
|
|
RefCountedPtr<ActiveConnection> connection_ref, |
|
|
|
grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor, |
|
|
|
grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor, |
|
|
|
grpc_channel_args* args, grpc_resource_user* channel_resource_user) |
|
|
|
grpc_channel_args* args) |
|
|
|
: connection_(std::move(connection_ref)), |
|
|
|
: connection_(std::move(connection_ref)), |
|
|
|
accepting_pollset_(accepting_pollset), |
|
|
|
accepting_pollset_(accepting_pollset), |
|
|
|
acceptor_(acceptor), |
|
|
|
acceptor_(acceptor), |
|
|
|
handshake_mgr_(MakeRefCounted<HandshakeManager>()), |
|
|
|
handshake_mgr_(MakeRefCounted<HandshakeManager>()), |
|
|
|
deadline_(GetConnectionDeadline(args)), |
|
|
|
deadline_(GetConnectionDeadline(args)), |
|
|
|
interested_parties_(grpc_pollset_set_create()), |
|
|
|
interested_parties_(grpc_pollset_set_create()) { |
|
|
|
channel_resource_user_(channel_resource_user) { |
|
|
|
|
|
|
|
grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_); |
|
|
|
grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_); |
|
|
|
HandshakerRegistry::AddHandshakers(HANDSHAKER_SERVER, args, |
|
|
|
HandshakerRegistry::AddHandshakers(HANDSHAKER_SERVER, args, |
|
|
|
interested_parties_, handshake_mgr_.get()); |
|
|
|
interested_parties_, handshake_mgr_.get()); |
|
|
@ -326,9 +320,6 @@ Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState( |
|
|
|
Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() { |
|
|
|
Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() { |
|
|
|
grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_); |
|
|
|
grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_); |
|
|
|
grpc_pollset_set_destroy(interested_parties_); |
|
|
|
grpc_pollset_set_destroy(interested_parties_); |
|
|
|
if (channel_resource_user_ != nullptr) { |
|
|
|
|
|
|
|
grpc_resource_user_unref(channel_resource_user_); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
gpr_free(acceptor_); |
|
|
|
gpr_free(acceptor_); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -389,12 +380,16 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone( |
|
|
|
OrphanablePtr<HandshakingState> handshaking_state_ref; |
|
|
|
OrphanablePtr<HandshakingState> handshaking_state_ref; |
|
|
|
RefCountedPtr<HandshakeManager> handshake_mgr; |
|
|
|
RefCountedPtr<HandshakeManager> handshake_mgr; |
|
|
|
bool cleanup_connection = false; |
|
|
|
bool cleanup_connection = false; |
|
|
|
|
|
|
|
bool free_resource_quota = false; |
|
|
|
|
|
|
|
grpc_resource_user* resource_user = |
|
|
|
|
|
|
|
self->connection_->listener_->server_->default_resource_user(); |
|
|
|
{ |
|
|
|
{ |
|
|
|
MutexLock connection_lock(&self->connection_->mu_); |
|
|
|
MutexLock connection_lock(&self->connection_->mu_); |
|
|
|
if (error != GRPC_ERROR_NONE || self->connection_->shutdown_) { |
|
|
|
if (error != GRPC_ERROR_NONE || self->connection_->shutdown_) { |
|
|
|
std::string error_str = grpc_error_std_string(error); |
|
|
|
std::string error_str = grpc_error_std_string(error); |
|
|
|
gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str.c_str()); |
|
|
|
gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str.c_str()); |
|
|
|
cleanup_connection = true; |
|
|
|
cleanup_connection = true; |
|
|
|
|
|
|
|
free_resource_quota = true; |
|
|
|
if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) { |
|
|
|
if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) { |
|
|
|
// We were shut down or stopped serving after handshaking completed
|
|
|
|
// We were shut down or stopped serving after handshaking completed
|
|
|
|
// successfully, so destroy the endpoint here.
|
|
|
|
// successfully, so destroy the endpoint here.
|
|
|
@ -414,17 +409,12 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone( |
|
|
|
// code, so we can just clean up here without creating a transport.
|
|
|
|
// code, so we can just clean up here without creating a transport.
|
|
|
|
if (args->endpoint != nullptr) { |
|
|
|
if (args->endpoint != nullptr) { |
|
|
|
grpc_transport* transport = grpc_create_chttp2_transport( |
|
|
|
grpc_transport* transport = grpc_create_chttp2_transport( |
|
|
|
args->args, args->endpoint, false, |
|
|
|
args->args, args->endpoint, false, resource_user); |
|
|
|
grpc_resource_user_create( |
|
|
|
|
|
|
|
self->connection_->listener_->resource_quota_, |
|
|
|
|
|
|
|
absl::StrCat(grpc_endpoint_get_peer(args->endpoint), |
|
|
|
|
|
|
|
":chttp2_server_transport"))); |
|
|
|
|
|
|
|
grpc_error_handle channel_init_err = |
|
|
|
grpc_error_handle channel_init_err = |
|
|
|
self->connection_->listener_->server_->SetupTransport( |
|
|
|
self->connection_->listener_->server_->SetupTransport( |
|
|
|
transport, self->accepting_pollset_, args->args, |
|
|
|
transport, self->accepting_pollset_, args->args, |
|
|
|
grpc_chttp2_transport_get_socket_node(transport), |
|
|
|
grpc_chttp2_transport_get_socket_node(transport), |
|
|
|
self->channel_resource_user_, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); |
|
|
|
resource_user); |
|
|
|
self->channel_resource_user_ = nullptr; |
|
|
|
|
|
|
|
if (channel_init_err == GRPC_ERROR_NONE) { |
|
|
|
if (channel_init_err == GRPC_ERROR_NONE) { |
|
|
|
// Use notify_on_receive_settings callback to enforce the
|
|
|
|
// Use notify_on_receive_settings callback to enforce the
|
|
|
|
// handshake deadline.
|
|
|
|
// handshake deadline.
|
|
|
@ -472,10 +462,12 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone( |
|
|
|
grpc_slice_buffer_destroy_internal(args->read_buffer); |
|
|
|
grpc_slice_buffer_destroy_internal(args->read_buffer); |
|
|
|
gpr_free(args->read_buffer); |
|
|
|
gpr_free(args->read_buffer); |
|
|
|
cleanup_connection = true; |
|
|
|
cleanup_connection = true; |
|
|
|
|
|
|
|
free_resource_quota = true; |
|
|
|
grpc_channel_args_destroy(args->args); |
|
|
|
grpc_channel_args_destroy(args->args); |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
cleanup_connection = true; |
|
|
|
cleanup_connection = true; |
|
|
|
|
|
|
|
free_resource_quota = true; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
// Since the handshake manager is done, the connection no longer needs to
|
|
|
|
// Since the handshake manager is done, the connection no longer needs to
|
|
|
@ -488,9 +480,8 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone( |
|
|
|
gpr_free(self->acceptor_); |
|
|
|
gpr_free(self->acceptor_); |
|
|
|
self->acceptor_ = nullptr; |
|
|
|
self->acceptor_ = nullptr; |
|
|
|
OrphanablePtr<ActiveConnection> connection; |
|
|
|
OrphanablePtr<ActiveConnection> connection; |
|
|
|
if (self->channel_resource_user_ != nullptr) { |
|
|
|
if (free_resource_quota && resource_user != nullptr) { |
|
|
|
grpc_resource_user_free(self->channel_resource_user_, |
|
|
|
grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); |
|
|
|
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
if (cleanup_connection) { |
|
|
|
if (cleanup_connection) { |
|
|
|
MutexLock listener_lock(&self->connection_->listener_->mu_); |
|
|
|
MutexLock listener_lock(&self->connection_->listener_->mu_); |
|
|
@ -510,9 +501,9 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone( |
|
|
|
|
|
|
|
|
|
|
|
Chttp2ServerListener::ActiveConnection::ActiveConnection( |
|
|
|
Chttp2ServerListener::ActiveConnection::ActiveConnection( |
|
|
|
grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor, |
|
|
|
grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor, |
|
|
|
grpc_channel_args* args, grpc_resource_user* channel_resource_user) |
|
|
|
grpc_channel_args* args) |
|
|
|
: handshaking_state_(MakeOrphanable<HandshakingState>( |
|
|
|
: handshaking_state_(MakeOrphanable<HandshakingState>( |
|
|
|
Ref(), accepting_pollset, acceptor, args, channel_resource_user)) { |
|
|
|
Ref(), accepting_pollset, acceptor, args)) { |
|
|
|
GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this, |
|
|
|
GRPC_CLOSURE_INIT(&on_close_, ActiveConnection::OnClose, this, |
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
} |
|
|
|
} |
|
|
@ -596,14 +587,9 @@ grpc_error_handle Chttp2ServerListener::Create( |
|
|
|
// easier without using goto.
|
|
|
|
// easier without using goto.
|
|
|
|
grpc_error_handle error = [&]() { |
|
|
|
grpc_error_handle error = [&]() { |
|
|
|
// Create Chttp2ServerListener.
|
|
|
|
// Create Chttp2ServerListener.
|
|
|
|
listener = new Chttp2ServerListener( |
|
|
|
listener = new Chttp2ServerListener(server, args, args_modifier); |
|
|
|
server, args, args_modifier, |
|
|
|
error = grpc_tcp_server_create(&listener->tcp_server_shutdown_complete_, |
|
|
|
grpc_resource_quota_from_channel_args(args, true)); |
|
|
|
args, &listener->tcp_server_); |
|
|
|
grpc_resource_quota_ref_internal(listener->resource_quota_); |
|
|
|
|
|
|
|
error = grpc_tcp_server_create( |
|
|
|
|
|
|
|
&listener->tcp_server_shutdown_complete_, args, |
|
|
|
|
|
|
|
grpc_slice_allocator_factory_create(listener->resource_quota_), |
|
|
|
|
|
|
|
&listener->tcp_server_); |
|
|
|
|
|
|
|
if (error != GRPC_ERROR_NONE) return error; |
|
|
|
if (error != GRPC_ERROR_NONE) return error; |
|
|
|
if (server->config_fetcher() != nullptr) { |
|
|
|
if (server->config_fetcher() != nullptr) { |
|
|
|
listener->resolved_address_ = *addr; |
|
|
|
listener->resolved_address_ = *addr; |
|
|
@ -644,14 +630,10 @@ grpc_error_handle Chttp2ServerListener::Create( |
|
|
|
grpc_error_handle Chttp2ServerListener::CreateWithAcceptor( |
|
|
|
grpc_error_handle Chttp2ServerListener::CreateWithAcceptor( |
|
|
|
Server* server, const char* name, grpc_channel_args* args, |
|
|
|
Server* server, const char* name, grpc_channel_args* args, |
|
|
|
Chttp2ServerArgsModifier args_modifier) { |
|
|
|
Chttp2ServerArgsModifier args_modifier) { |
|
|
|
Chttp2ServerListener* listener = new Chttp2ServerListener( |
|
|
|
Chttp2ServerListener* listener = |
|
|
|
server, args, args_modifier, |
|
|
|
new Chttp2ServerListener(server, args, args_modifier); |
|
|
|
grpc_resource_quota_from_channel_args(args, true)); |
|
|
|
|
|
|
|
grpc_resource_quota_ref_internal(listener->resource_quota_); |
|
|
|
|
|
|
|
grpc_error_handle error = grpc_tcp_server_create( |
|
|
|
grpc_error_handle error = grpc_tcp_server_create( |
|
|
|
&listener->tcp_server_shutdown_complete_, args, |
|
|
|
&listener->tcp_server_shutdown_complete_, args, &listener->tcp_server_); |
|
|
|
grpc_slice_allocator_factory_create(listener->resource_quota_), |
|
|
|
|
|
|
|
&listener->tcp_server_); |
|
|
|
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
delete listener; |
|
|
|
delete listener; |
|
|
|
return error; |
|
|
|
return error; |
|
|
@ -666,11 +648,8 @@ grpc_error_handle Chttp2ServerListener::CreateWithAcceptor( |
|
|
|
|
|
|
|
|
|
|
|
Chttp2ServerListener::Chttp2ServerListener( |
|
|
|
Chttp2ServerListener::Chttp2ServerListener( |
|
|
|
Server* server, grpc_channel_args* args, |
|
|
|
Server* server, grpc_channel_args* args, |
|
|
|
Chttp2ServerArgsModifier args_modifier, grpc_resource_quota* resource_quota) |
|
|
|
Chttp2ServerArgsModifier args_modifier) |
|
|
|
: server_(server), |
|
|
|
: server_(server), args_modifier_(args_modifier), args_(args) { |
|
|
|
args_modifier_(args_modifier), |
|
|
|
|
|
|
|
args_(args), |
|
|
|
|
|
|
|
resource_quota_(resource_quota) { |
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete, |
|
|
|
GRPC_CLOSURE_INIT(&tcp_server_shutdown_complete_, TcpServerShutdownComplete, |
|
|
|
this, grpc_schedule_on_exec_ctx); |
|
|
|
this, grpc_schedule_on_exec_ctx); |
|
|
|
} |
|
|
|
} |
|
|
@ -683,7 +662,6 @@ Chttp2ServerListener::~Chttp2ServerListener() { |
|
|
|
ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, GRPC_ERROR_NONE); |
|
|
|
ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, GRPC_ERROR_NONE); |
|
|
|
ExecCtx::Get()->Flush(); |
|
|
|
ExecCtx::Get()->Flush(); |
|
|
|
} |
|
|
|
} |
|
|
|
grpc_resource_quota_unref_internal(resource_quota_); |
|
|
|
|
|
|
|
grpc_channel_args_destroy(args_); |
|
|
|
grpc_channel_args_destroy(args_); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -766,11 +744,8 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp, |
|
|
|
return; |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
grpc_resource_user* channel_resource_user = grpc_resource_user_create( |
|
|
|
auto connection = |
|
|
|
self->resource_quota_, |
|
|
|
MakeOrphanable<ActiveConnection>(accepting_pollset, acceptor, args); |
|
|
|
absl::StrCat(grpc_endpoint_get_peer(tcp), ":server_channel")); |
|
|
|
|
|
|
|
auto connection = MakeOrphanable<ActiveConnection>( |
|
|
|
|
|
|
|
accepting_pollset, acceptor, args, channel_resource_user); |
|
|
|
|
|
|
|
// We no longer own acceptor
|
|
|
|
// We no longer own acceptor
|
|
|
|
acceptor = nullptr; |
|
|
|
acceptor = nullptr; |
|
|
|
// Hold a ref to connection to allow starting handshake outside the
|
|
|
|
// Hold a ref to connection to allow starting handshake outside the
|
|
|
@ -781,7 +756,10 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp, |
|
|
|
MutexLock lock(&self->mu_); |
|
|
|
MutexLock lock(&self->mu_); |
|
|
|
// Shutdown the the connection if listener's stopped serving.
|
|
|
|
// Shutdown the the connection if listener's stopped serving.
|
|
|
|
if (!self->shutdown_ && self->is_serving_) { |
|
|
|
if (!self->shutdown_ && self->is_serving_) { |
|
|
|
if (!grpc_resource_user_safe_alloc(channel_resource_user, |
|
|
|
grpc_resource_user* resource_user = |
|
|
|
|
|
|
|
self->server_->default_resource_user(); |
|
|
|
|
|
|
|
if (resource_user != nullptr && |
|
|
|
|
|
|
|
!grpc_resource_user_safe_alloc(resource_user, |
|
|
|
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) { |
|
|
|
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) { |
|
|
|
gpr_log( |
|
|
|
gpr_log( |
|
|
|
GPR_INFO, |
|
|
|
GPR_INFO, |
|
|
|