|
|
|
@ -42,6 +42,7 @@ |
|
|
|
|
#include <grpc/grpc.h> |
|
|
|
|
#include <grpc/grpc_posix.h> |
|
|
|
|
#include <grpc/impl/channel_arg_names.h> |
|
|
|
|
#include <grpc/passive_listener.h> |
|
|
|
|
#include <grpc/slice_buffer.h> |
|
|
|
|
#include <grpc/support/alloc.h> |
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
@ -58,6 +59,8 @@ |
|
|
|
|
#include "src/core/lib/config/core_configuration.h" |
|
|
|
|
#include "src/core/lib/debug/trace.h" |
|
|
|
|
#include "src/core/lib/event_engine/channel_args_endpoint_config.h" |
|
|
|
|
#include "src/core/lib/event_engine/extensions/supports_fd.h" |
|
|
|
|
#include "src/core/lib/event_engine/query_extensions.h" |
|
|
|
|
#include "src/core/lib/gprpp/debug_location.h" |
|
|
|
|
#include "src/core/lib/gprpp/orphanable.h" |
|
|
|
|
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
|
|
|
@ -67,6 +70,7 @@ |
|
|
|
|
#include "src/core/lib/gprpp/unique_type_name.h" |
|
|
|
|
#include "src/core/lib/iomgr/closure.h" |
|
|
|
|
#include "src/core/lib/iomgr/endpoint.h" |
|
|
|
|
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h" |
|
|
|
|
#include "src/core/lib/iomgr/iomgr_fwd.h" |
|
|
|
|
#include "src/core/lib/iomgr/pollset_set.h" |
|
|
|
|
#include "src/core/lib/iomgr/resolve_address.h" |
|
|
|
@ -93,9 +97,11 @@ |
|
|
|
|
#endif // GPR_SUPPORT_CHANNELS_FROM_FD
|
|
|
|
|
|
|
|
|
|
namespace grpc_core { |
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
|
using ::grpc_event_engine::experimental::EventEngine; |
|
|
|
|
using grpc_event_engine::experimental::ChannelArgsEndpointConfig; |
|
|
|
|
using grpc_event_engine::experimental::EventEngine; |
|
|
|
|
using grpc_event_engine::experimental::EventEngineSupportsFdExtension; |
|
|
|
|
using grpc_event_engine::experimental::QueryExtension; |
|
|
|
|
|
|
|
|
|
const char kUnixUriPrefix[] = "unix:"; |
|
|
|
|
const char kUnixAbstractUriPrefix[] = "unix-abstract:"; |
|
|
|
@ -112,14 +118,23 @@ class Chttp2ServerListener : public Server::ListenerInterface { |
|
|
|
|
Server* server, const char* name, const ChannelArgs& args, |
|
|
|
|
Chttp2ServerArgsModifier args_modifier); |
|
|
|
|
|
|
|
|
|
static Chttp2ServerListener* CreateForPassiveListener( |
|
|
|
|
Server* server, const ChannelArgs& args, |
|
|
|
|
std::shared_ptr<experimental::PassiveListenerImpl> passive_listener); |
|
|
|
|
|
|
|
|
|
// Do not instantiate directly. Use one of the factory methods above.
|
|
|
|
|
Chttp2ServerListener(Server* server, const ChannelArgs& args, |
|
|
|
|
Chttp2ServerArgsModifier args_modifier); |
|
|
|
|
Chttp2ServerArgsModifier args_modifier, |
|
|
|
|
grpc_server_config_fetcher* config_fetcher, |
|
|
|
|
std::shared_ptr<experimental::PassiveListenerImpl> |
|
|
|
|
passive_listener = nullptr); |
|
|
|
|
~Chttp2ServerListener() override; |
|
|
|
|
|
|
|
|
|
void Start(Server* server, |
|
|
|
|
const std::vector<grpc_pollset*>* pollsets) override; |
|
|
|
|
|
|
|
|
|
void AcceptConnectedEndpoint(std::unique_ptr<EventEngine::Endpoint> endpoint); |
|
|
|
|
|
|
|
|
|
channelz::ListenSocketNode* channelz_listen_socket_node() const override { |
|
|
|
|
return channelz_listen_socket_.get(); |
|
|
|
|
} |
|
|
|
@ -129,6 +144,8 @@ class Chttp2ServerListener : public Server::ListenerInterface { |
|
|
|
|
void Orphan() override; |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
friend class experimental::PassiveListenerImpl; |
|
|
|
|
|
|
|
|
|
class ConfigFetcherWatcher |
|
|
|
|
: public grpc_server_config_fetcher::WatcherInterface { |
|
|
|
|
public: |
|
|
|
@ -235,34 +252,8 @@ class Chttp2ServerListener : public Server::ListenerInterface { |
|
|
|
|
static void DestroyListener(Server* /*server*/, void* arg, |
|
|
|
|
grpc_closure* destroy_done); |
|
|
|
|
|
|
|
|
|
// The interface required by RefCountedPtr<> has been manually implemented
|
|
|
|
|
// here to take a ref on tcp_server_ instead. Note that, the handshaker
|
|
|
|
|
// needs tcp_server_ to exist for the lifetime of the handshake since it's
|
|
|
|
|
// needed by acceptor. Sharing refs between the listener and tcp_server_ is
|
|
|
|
|
// just an optimization to avoid taking additional refs on the listener,
|
|
|
|
|
// since TcpServerShutdownComplete already holds a ref to the listener.
|
|
|
|
|
void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); } |
|
|
|
|
void IncrementRefCount(const DebugLocation& /* location */, |
|
|
|
|
const char* /* reason */) { |
|
|
|
|
IncrementRefCount(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GRPC_MUST_USE_RESULT RefCountedPtr<Chttp2ServerListener> Ref() { |
|
|
|
|
IncrementRefCount(); |
|
|
|
|
return RefCountedPtr<Chttp2ServerListener>(this); |
|
|
|
|
} |
|
|
|
|
GRPC_MUST_USE_RESULT RefCountedPtr<Chttp2ServerListener> Ref( |
|
|
|
|
const DebugLocation& /* location */, const char* /* reason */) { |
|
|
|
|
return Ref(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Unref() { grpc_tcp_server_unref(tcp_server_); } |
|
|
|
|
void Unref(const DebugLocation& /* location */, const char* /* reason */) { |
|
|
|
|
Unref(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Server* const server_; |
|
|
|
|
grpc_tcp_server* tcp_server_; |
|
|
|
|
Server* const server_ = nullptr; |
|
|
|
|
grpc_tcp_server* tcp_server_ = nullptr; |
|
|
|
|
grpc_resolved_address resolved_address_; |
|
|
|
|
Chttp2ServerArgsModifier const args_modifier_; |
|
|
|
|
ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr; |
|
|
|
@ -285,6 +276,10 @@ class Chttp2ServerListener : public Server::ListenerInterface { |
|
|
|
|
RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_; |
|
|
|
|
MemoryQuotaRefPtr memory_quota_; |
|
|
|
|
ConnectionQuotaRefPtr connection_quota_; |
|
|
|
|
grpc_server_config_fetcher* config_fetcher_ = nullptr; |
|
|
|
|
// TODO(yashykt): consider using absl::variant<> to minimize memory usage for
|
|
|
|
|
// disjoint cases where different fields are used.
|
|
|
|
|
std::shared_ptr<experimental::PassiveListenerImpl> passive_listener_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -381,13 +376,17 @@ Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState( |
|
|
|
|
handshake_mgr_(MakeRefCounted<HandshakeManager>()), |
|
|
|
|
deadline_(GetConnectionDeadline(args)), |
|
|
|
|
interested_parties_(grpc_pollset_set_create()) { |
|
|
|
|
grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_); |
|
|
|
|
if (accepting_pollset != nullptr) { |
|
|
|
|
grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_); |
|
|
|
|
} |
|
|
|
|
CoreConfiguration::Get().handshaker_registry().AddHandshakers( |
|
|
|
|
HANDSHAKER_SERVER, args, interested_parties_, handshake_mgr_.get()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() { |
|
|
|
|
grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_); |
|
|
|
|
if (accepting_pollset_ != nullptr) { |
|
|
|
|
grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_); |
|
|
|
|
} |
|
|
|
|
grpc_pollset_set_destroy(interested_parties_); |
|
|
|
|
gpr_free(acceptor_); |
|
|
|
|
} |
|
|
|
@ -589,7 +588,11 @@ Chttp2ServerListener::ActiveConnection::ActiveConnection( |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Chttp2ServerListener::ActiveConnection::~ActiveConnection() {} |
|
|
|
|
Chttp2ServerListener::ActiveConnection::~ActiveConnection() { |
|
|
|
|
if (listener_ != nullptr && listener_->tcp_server_ != nullptr) { |
|
|
|
|
grpc_tcp_server_unref(listener_->tcp_server_); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Chttp2ServerListener::ActiveConnection::Orphan() { |
|
|
|
|
OrphanablePtr<HandshakingState> handshaking_state; |
|
|
|
@ -637,6 +640,9 @@ void Chttp2ServerListener::ActiveConnection::Start( |
|
|
|
|
const ChannelArgs& args) { |
|
|
|
|
RefCountedPtr<HandshakingState> handshaking_state_ref; |
|
|
|
|
listener_ = std::move(listener); |
|
|
|
|
if (listener_->tcp_server_ != nullptr) { |
|
|
|
|
grpc_tcp_server_ref(listener_->tcp_server_); |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
|
ReleasableMutexLock lock(&mu_); |
|
|
|
|
if (shutdown_) { |
|
|
|
@ -709,83 +715,82 @@ void Chttp2ServerListener::ActiveConnection::OnDrainGraceTimeExpiry() { |
|
|
|
|
grpc_error_handle Chttp2ServerListener::Create( |
|
|
|
|
Server* server, grpc_resolved_address* addr, const ChannelArgs& args, |
|
|
|
|
Chttp2ServerArgsModifier args_modifier, int* port_num) { |
|
|
|
|
Chttp2ServerListener* listener = nullptr; |
|
|
|
|
// The bulk of this method is inside of a lambda to make cleanup
|
|
|
|
|
// easier without using goto.
|
|
|
|
|
grpc_error_handle error = [&]() { |
|
|
|
|
grpc_error_handle error; |
|
|
|
|
// Create Chttp2ServerListener.
|
|
|
|
|
listener = new Chttp2ServerListener(server, args, args_modifier); |
|
|
|
|
error = grpc_tcp_server_create( |
|
|
|
|
&listener->tcp_server_shutdown_complete_, |
|
|
|
|
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), |
|
|
|
|
OnAccept, listener, &listener->tcp_server_); |
|
|
|
|
// Create Chttp2ServerListener.
|
|
|
|
|
OrphanablePtr<Chttp2ServerListener> listener = |
|
|
|
|
MakeOrphanable<Chttp2ServerListener>(server, args, args_modifier, |
|
|
|
|
server->config_fetcher()); |
|
|
|
|
// The tcp_server will be unreffed when the listener is orphaned, which could
|
|
|
|
|
// be at the end of this function if the listener was not added to the
|
|
|
|
|
// server's set of listeners.
|
|
|
|
|
grpc_error_handle error = grpc_tcp_server_create( |
|
|
|
|
&listener->tcp_server_shutdown_complete_, ChannelArgsEndpointConfig(args), |
|
|
|
|
OnAccept, listener.get(), &listener->tcp_server_); |
|
|
|
|
if (!error.ok()) return error; |
|
|
|
|
if (listener->config_fetcher_ != nullptr) { |
|
|
|
|
listener->resolved_address_ = *addr; |
|
|
|
|
// TODO(yashykt): Consider binding so as to be able to return the port
|
|
|
|
|
// number.
|
|
|
|
|
} else { |
|
|
|
|
error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num); |
|
|
|
|
if (!error.ok()) return error; |
|
|
|
|
if (server->config_fetcher() != nullptr) { |
|
|
|
|
listener->resolved_address_ = *addr; |
|
|
|
|
// TODO(yashykt): Consider binding so as to be able to return the port
|
|
|
|
|
// number.
|
|
|
|
|
} else { |
|
|
|
|
error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num); |
|
|
|
|
if (!error.ok()) return error; |
|
|
|
|
} |
|
|
|
|
// Create channelz node.
|
|
|
|
|
if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ) |
|
|
|
|
.value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) { |
|
|
|
|
auto string_address = grpc_sockaddr_to_uri(addr); |
|
|
|
|
if (!string_address.ok()) { |
|
|
|
|
return GRPC_ERROR_CREATE(string_address.status().ToString()); |
|
|
|
|
} |
|
|
|
|
listener->channelz_listen_socket_ = |
|
|
|
|
MakeRefCounted<channelz::ListenSocketNode>( |
|
|
|
|
*string_address, |
|
|
|
|
absl::StrCat("chttp2 listener ", *string_address)); |
|
|
|
|
} |
|
|
|
|
// Register with the server only upon success
|
|
|
|
|
server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener)); |
|
|
|
|
return absl::OkStatus(); |
|
|
|
|
}(); |
|
|
|
|
if (!error.ok()) { |
|
|
|
|
if (listener != nullptr) { |
|
|
|
|
if (listener->tcp_server_ != nullptr) { |
|
|
|
|
// listener is deleted when tcp_server_ is shutdown.
|
|
|
|
|
grpc_tcp_server_unref(listener->tcp_server_); |
|
|
|
|
} else { |
|
|
|
|
delete listener; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Create channelz node.
|
|
|
|
|
if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ) |
|
|
|
|
.value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) { |
|
|
|
|
auto string_address = grpc_sockaddr_to_uri(addr); |
|
|
|
|
if (!string_address.ok()) { |
|
|
|
|
return GRPC_ERROR_CREATE(string_address.status().ToString()); |
|
|
|
|
} |
|
|
|
|
listener->channelz_listen_socket_ = |
|
|
|
|
MakeRefCounted<channelz::ListenSocketNode>( |
|
|
|
|
*string_address, absl::StrCat("chttp2 listener ", *string_address)); |
|
|
|
|
} |
|
|
|
|
return error; |
|
|
|
|
// Register with the server only upon success
|
|
|
|
|
server->AddListener(std::move(listener)); |
|
|
|
|
return absl::OkStatus(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_error_handle Chttp2ServerListener::CreateWithAcceptor( |
|
|
|
|
Server* server, const char* name, const ChannelArgs& args, |
|
|
|
|
Chttp2ServerArgsModifier args_modifier) { |
|
|
|
|
Chttp2ServerListener* listener = |
|
|
|
|
new Chttp2ServerListener(server, args, args_modifier); |
|
|
|
|
auto listener = MakeOrphanable<Chttp2ServerListener>( |
|
|
|
|
server, args, args_modifier, server->config_fetcher()); |
|
|
|
|
grpc_error_handle error = grpc_tcp_server_create( |
|
|
|
|
&listener->tcp_server_shutdown_complete_, |
|
|
|
|
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), |
|
|
|
|
OnAccept, listener, &listener->tcp_server_); |
|
|
|
|
if (!error.ok()) { |
|
|
|
|
delete listener; |
|
|
|
|
return error; |
|
|
|
|
} |
|
|
|
|
&listener->tcp_server_shutdown_complete_, ChannelArgsEndpointConfig(args), |
|
|
|
|
OnAccept, listener.get(), &listener->tcp_server_); |
|
|
|
|
if (!error.ok()) return error; |
|
|
|
|
// TODO(yangg) channelz
|
|
|
|
|
TcpServerFdHandler** arg_val = args.GetPointer<TcpServerFdHandler*>(name); |
|
|
|
|
*arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_); |
|
|
|
|
server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener)); |
|
|
|
|
server->AddListener(std::move(listener)); |
|
|
|
|
return absl::OkStatus(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Chttp2ServerListener* Chttp2ServerListener::CreateForPassiveListener( |
|
|
|
|
Server* server, const ChannelArgs& args, |
|
|
|
|
std::shared_ptr<experimental::PassiveListenerImpl> passive_listener) { |
|
|
|
|
// TODO(hork): figure out how to handle channelz in this case
|
|
|
|
|
auto listener = MakeOrphanable<Chttp2ServerListener>( |
|
|
|
|
server, args, /*args_modifier=*/ |
|
|
|
|
[](const ChannelArgs& args, grpc_error_handle*) { return args; }, nullptr, |
|
|
|
|
std::move(passive_listener)); |
|
|
|
|
auto listener_ptr = listener.get(); |
|
|
|
|
server->AddListener(std::move(listener)); |
|
|
|
|
return listener_ptr; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Chttp2ServerListener::Chttp2ServerListener( |
|
|
|
|
Server* server, const ChannelArgs& args, |
|
|
|
|
Chttp2ServerArgsModifier args_modifier) |
|
|
|
|
Chttp2ServerArgsModifier args_modifier, |
|
|
|
|
grpc_server_config_fetcher* config_fetcher, |
|
|
|
|
std::shared_ptr<experimental::PassiveListenerImpl> passive_listener) |
|
|
|
|
: server_(server), |
|
|
|
|
args_modifier_(args_modifier), |
|
|
|
|
args_(args), |
|
|
|
|
memory_quota_(args.GetObject<ResourceQuota>()->memory_quota()), |
|
|
|
|
connection_quota_(MakeRefCounted<ConnectionQuota>()) { |
|
|
|
|
connection_quota_(MakeRefCounted<ConnectionQuota>()), |
|
|
|
|
config_fetcher_(config_fetcher), |
|
|
|
|
passive_listener_(std::move(passive_listener)) { |
|
|
|
|
auto max_allowed_incoming_connections = |
|
|
|
|
args.GetInt(GRPC_ARG_MAX_ALLOWED_INCOMING_CONNECTIONS); |
|
|
|
|
if (max_allowed_incoming_connections.has_value()) { |
|
|
|
@ -800,6 +805,9 @@ Chttp2ServerListener::~Chttp2ServerListener() { |
|
|
|
|
// Flush queued work before destroying handshaker factory, since that
|
|
|
|
|
// may do a synchronous unref.
|
|
|
|
|
ExecCtx::Get()->Flush(); |
|
|
|
|
if (passive_listener_ != nullptr) { |
|
|
|
|
passive_listener_->ListenerDestroyed(); |
|
|
|
|
} |
|
|
|
|
if (on_destroy_done_ != nullptr) { |
|
|
|
|
ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, absl::OkStatus()); |
|
|
|
|
ExecCtx::Get()->Flush(); |
|
|
|
@ -809,10 +817,11 @@ Chttp2ServerListener::~Chttp2ServerListener() { |
|
|
|
|
// Server callback: start listening on our ports
|
|
|
|
|
void Chttp2ServerListener::Start( |
|
|
|
|
Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) { |
|
|
|
|
if (server_->config_fetcher() != nullptr) { |
|
|
|
|
auto watcher = std::make_unique<ConfigFetcherWatcher>(Ref()); |
|
|
|
|
if (config_fetcher_ != nullptr) { |
|
|
|
|
auto watcher = std::make_unique<ConfigFetcherWatcher>( |
|
|
|
|
RefAsSubclass<Chttp2ServerListener>()); |
|
|
|
|
config_fetcher_watcher_ = watcher.get(); |
|
|
|
|
server_->config_fetcher()->StartWatch( |
|
|
|
|
config_fetcher_->StartWatch( |
|
|
|
|
grpc_sockaddr_to_string(&resolved_address_, false).value(), |
|
|
|
|
std::move(watcher)); |
|
|
|
|
} else { |
|
|
|
@ -826,7 +835,9 @@ void Chttp2ServerListener::Start( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Chttp2ServerListener::StartListening() { |
|
|
|
|
grpc_tcp_server_start(tcp_server_, &server_->pollsets()); |
|
|
|
|
if (tcp_server_ != nullptr) { |
|
|
|
|
grpc_tcp_server_start(tcp_server_, &server_->pollsets()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) { |
|
|
|
@ -834,6 +845,12 @@ void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) { |
|
|
|
|
on_destroy_done_ = on_destroy_done; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Chttp2ServerListener::AcceptConnectedEndpoint( |
|
|
|
|
std::unique_ptr<EventEngine::Endpoint> endpoint) { |
|
|
|
|
OnAccept(this, grpc_event_engine_endpoint_create(std::move(endpoint)), |
|
|
|
|
/*accepting_pollset=*/nullptr, /*acceptor=*/nullptr); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp, |
|
|
|
|
grpc_pollset* accepting_pollset, |
|
|
|
|
grpc_tcp_server_acceptor* acceptor) { |
|
|
|
@ -858,7 +875,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp, |
|
|
|
|
endpoint_cleanup(error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (self->server_->config_fetcher() != nullptr) { |
|
|
|
|
if (self->config_fetcher_ != nullptr) { |
|
|
|
|
if (connection_manager == nullptr) { |
|
|
|
|
grpc_error_handle error = GRPC_ERROR_CREATE( |
|
|
|
|
"No ConnectionManager configured. Closing connection."); |
|
|
|
@ -899,7 +916,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp, |
|
|
|
|
// heap-use-after-free issues where `Ref()` is invoked when the ref of
|
|
|
|
|
// tcp_server_ has already reached 0. (Ref() implementation of
|
|
|
|
|
// Chttp2ServerListener is grpc_tcp_server_ref().)
|
|
|
|
|
listener_ref = self->Ref(); |
|
|
|
|
listener_ref = self->RefAsSubclass<Chttp2ServerListener>(); |
|
|
|
|
self->connections_.emplace(connection.get(), std::move(connection)); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -914,7 +931,7 @@ void Chttp2ServerListener::TcpServerShutdownComplete( |
|
|
|
|
void* arg, grpc_error_handle /*error*/) { |
|
|
|
|
Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg); |
|
|
|
|
self->channelz_listen_socket_.reset(); |
|
|
|
|
delete self; |
|
|
|
|
self->Unref(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Server callback: destroy the tcp listener (so we don't generate further
|
|
|
|
@ -923,7 +940,8 @@ void Chttp2ServerListener::Orphan() { |
|
|
|
|
// Cancel the watch before shutting down so as to avoid holding a ref to the
|
|
|
|
|
// listener in the watcher.
|
|
|
|
|
if (config_fetcher_watcher_ != nullptr) { |
|
|
|
|
server_->config_fetcher()->CancelWatch(config_fetcher_watcher_); |
|
|
|
|
CHECK_NE(config_fetcher_, nullptr); |
|
|
|
|
config_fetcher_->CancelWatch(config_fetcher_watcher_); |
|
|
|
|
} |
|
|
|
|
std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections; |
|
|
|
|
grpc_tcp_server* tcp_server; |
|
|
|
@ -941,12 +959,14 @@ void Chttp2ServerListener::Orphan() { |
|
|
|
|
} |
|
|
|
|
tcp_server = tcp_server_; |
|
|
|
|
} |
|
|
|
|
grpc_tcp_server_shutdown_listeners(tcp_server); |
|
|
|
|
grpc_tcp_server_unref(tcp_server); |
|
|
|
|
if (tcp_server != nullptr) { |
|
|
|
|
grpc_tcp_server_shutdown_listeners(tcp_server); |
|
|
|
|
grpc_tcp_server_unref(tcp_server); |
|
|
|
|
} else { |
|
|
|
|
Unref(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// Chttp2ServerAddPort()
|
|
|
|
|
//
|
|
|
|
@ -1047,6 +1067,50 @@ ChannelArgs ModifyArgsForConnection(const ChannelArgs& args, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
namespace experimental { |
|
|
|
|
|
|
|
|
|
absl::Status PassiveListenerImpl::AcceptConnectedEndpoint( |
|
|
|
|
std::unique_ptr<EventEngine::Endpoint> endpoint) { |
|
|
|
|
CHECK_NE(server_.get(), nullptr); |
|
|
|
|
RefCountedPtr<Chttp2ServerListener> listener; |
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&mu_); |
|
|
|
|
if (listener_ != nullptr) { |
|
|
|
|
listener = |
|
|
|
|
listener_->RefIfNonZero().TakeAsSubclass<Chttp2ServerListener>(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (listener == nullptr) { |
|
|
|
|
return absl::UnavailableError("passive listener already shut down"); |
|
|
|
|
} |
|
|
|
|
ExecCtx exec_ctx; |
|
|
|
|
listener->AcceptConnectedEndpoint(std::move(endpoint)); |
|
|
|
|
return absl::OkStatus(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
absl::Status PassiveListenerImpl::AcceptConnectedFd(int fd) { |
|
|
|
|
CHECK_NE(server_.get(), nullptr); |
|
|
|
|
ExecCtx exec_ctx; |
|
|
|
|
auto& args = server_->channel_args(); |
|
|
|
|
auto* supports_fd = QueryExtension<EventEngineSupportsFdExtension>( |
|
|
|
|
/*engine=*/args.GetObjectRef<EventEngine>().get()); |
|
|
|
|
if (supports_fd == nullptr) { |
|
|
|
|
return absl::UnimplementedError( |
|
|
|
|
"The server's EventEngine does not support adding endpoints from " |
|
|
|
|
"connected file descriptors."); |
|
|
|
|
} |
|
|
|
|
auto endpoint = |
|
|
|
|
supports_fd->CreateEndpointFromFd(fd, ChannelArgsEndpointConfig(args)); |
|
|
|
|
return AcceptConnectedEndpoint(std::move(endpoint)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void PassiveListenerImpl::ListenerDestroyed() { |
|
|
|
|
MutexLock lock(&mu_); |
|
|
|
|
listener_ = nullptr; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} // namespace experimental
|
|
|
|
|
} // namespace grpc_core
|
|
|
|
|
|
|
|
|
|
int grpc_server_add_http2_port(grpc_server* server, const char* addr, |
|
|
|
@ -1144,3 +1208,31 @@ void grpc_server_add_channel_from_fd(grpc_server* /* server */, int /* fd */, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#endif // GPR_SUPPORT_CHANNELS_FROM_FD
|
|
|
|
|
|
|
|
|
|
absl::Status grpc_server_add_passive_listener( |
|
|
|
|
grpc_core::Server* server, grpc_server_credentials* credentials, |
|
|
|
|
std::shared_ptr<grpc_core::experimental::PassiveListenerImpl> |
|
|
|
|
passive_listener) { |
|
|
|
|
grpc_core::ExecCtx exec_ctx; |
|
|
|
|
GRPC_API_TRACE("grpc_server_add_passive_listener(server=%p, credentials=%p)", |
|
|
|
|
2, (server, credentials)); |
|
|
|
|
// Create security context.
|
|
|
|
|
if (credentials == nullptr) { |
|
|
|
|
return absl::UnavailableError( |
|
|
|
|
"No credentials specified for passive listener"); |
|
|
|
|
} |
|
|
|
|
auto sc = credentials->create_security_connector(grpc_core::ChannelArgs()); |
|
|
|
|
if (sc == nullptr) { |
|
|
|
|
return absl::UnavailableError( |
|
|
|
|
absl::StrCat("Unable to create secure server with credentials of type ", |
|
|
|
|
credentials->type().name())); |
|
|
|
|
} |
|
|
|
|
auto args = server->channel_args() |
|
|
|
|
.SetObject(credentials->Ref()) |
|
|
|
|
.SetObject(std::move(sc)); |
|
|
|
|
passive_listener->listener_ = |
|
|
|
|
grpc_core::Chttp2ServerListener::CreateForPassiveListener( |
|
|
|
|
server, args, passive_listener); |
|
|
|
|
passive_listener->server_ = server->Ref(); |
|
|
|
|
return absl::OkStatus(); |
|
|
|
|
} |
|
|
|
|