From c0208416b6424ea2c2ee8d6c024916d19b1adf8e Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Thu, 9 May 2024 12:51:50 -0700 Subject: [PATCH] Automated rollback of commit 82e5116fb0d7b66570c3a8e8bdd7651646ab7a31. PiperOrigin-RevId: 632241053 --- BUILD | 7 +- CMakeLists.txt | 6 + Makefile | 1 + Package.swift | 1 + build_autogenerated.yaml | 7 + gRPC-C++.podspec | 1 + gRPC-Core.podspec | 1 + grpc.gemspec | 1 + include/grpc/module.modulemap | 1 + include/grpc/passive_listener.h | 62 ++++ include/grpcpp/passive_listener.h | 27 ++ include/grpcpp/security/server_credentials.h | 1 + include/grpcpp/server_builder.h | 28 ++ package.xml | 1 + src/core/BUILD | 2 + .../transport/binder/server/binder_server.cc | 7 +- .../server/chaotic_good_server.cc | 7 +- .../chaotic_good/server/chaotic_good_server.h | 4 +- .../transport/chttp2/server/chttp2_server.cc | 296 ++++++++++++------ .../transport/chttp2/server/chttp2_server.h | 33 ++ .../lib/event_engine/extensions/supports_fd.h | 7 + .../event_engine/posix_engine/posix_engine.cc | 16 + .../event_engine/posix_engine/posix_engine.h | 2 + src/core/server/server.h | 14 +- src/cpp/server/server_builder.cc | 60 ++++ .../event_engine/event_engine_test_utils.h | 65 ++++ test/cpp/server/BUILD | 1 + test/cpp/server/server_builder_test.cc | 55 ++++ .../distrib/check_namespace_qualification.py | 4 + .../check_redundant_namespace_qualifiers.py | 9 + tools/doxygen/Doxyfile.c++ | 2 + tools/doxygen/Doxyfile.c++.internal | 2 + tools/doxygen/Doxyfile.core | 1 + tools/doxygen/Doxyfile.core.internal | 1 + 34 files changed, 619 insertions(+), 114 deletions(-) create mode 100644 include/grpc/passive_listener.h create mode 100644 include/grpcpp/passive_listener.h diff --git a/BUILD b/BUILD index 81e1ad65cd1..55bb4f4a347 100644 --- a/BUILD +++ b/BUILD @@ -296,6 +296,7 @@ GRPC_PUBLIC_HDRS = [ "include/grpc/grpc_posix.h", "include/grpc/grpc_security.h", "include/grpc/grpc_security_constants.h", + "include/grpc/passive_listener.h", "include/grpc/slice.h", "include/grpc/slice_buffer.h", "include/grpc/status.h", @@ -457,6 +458,7 @@ GRPCXX_PUBLIC_HDRS = [ "include/grpcpp/impl/service_type.h", "include/grpcpp/impl/status.h", "include/grpcpp/impl/sync.h", + "include/grpcpp/passive_listener.h", "include/grpcpp/resource_quota.h", "include/grpcpp/security/audit_logging.h", "include/grpcpp/security/tls_crl_provider.h", @@ -883,7 +885,7 @@ grpc_cc_library( grpc_cc_library( name = "grpc_public_hdrs", - hdrs = GRPC_PUBLIC_HDRS, + hdrs = GRPC_PUBLIC_HDRS + GRPC_PUBLIC_EVENT_ENGINE_HDRS, external_deps = [ "absl/status:statusor", "absl/strings", @@ -2508,6 +2510,7 @@ grpc_cc_library( "//src/core:grpc_backend_metric_provider", "//src/core:grpc_crl_provider", "//src/core:grpc_service_config", + "//src/core:grpc_transport_chttp2_server", "//src/core:grpc_transport_inproc", "//src/core:json", "//src/core:json_reader", @@ -2566,6 +2569,7 @@ grpc_cc_library( "grpc_security_base", "grpc_service_config_impl", "grpc_trace", + "grpc_transport_chttp2", "grpc_unsecure", "grpcpp_backend_metric_recorder", "grpcpp_call_metric_recorder", @@ -2587,6 +2591,7 @@ grpc_cc_library( "//src/core:grpc_backend_metric_provider", "//src/core:grpc_insecure_credentials", "//src/core:grpc_service_config", + "//src/core:grpc_transport_chttp2_server", "//src/core:grpc_transport_inproc", "//src/core:ref_counted", "//src/core:resource_quota", diff --git a/CMakeLists.txt b/CMakeLists.txt index 787db67baa0..449d17ec767 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2746,6 +2746,7 @@ foreach(_hdr include/grpc/impl/propagation_bits.h include/grpc/impl/slice_type.h include/grpc/load_reporting.h + include/grpc/passive_listener.h include/grpc/slice.h include/grpc/slice_buffer.h include/grpc/status.h @@ -3446,6 +3447,7 @@ foreach(_hdr include/grpc/impl/propagation_bits.h include/grpc/impl/slice_type.h include/grpc/load_reporting.h + include/grpc/passive_listener.h include/grpc/slice.h include/grpc/slice_buffer.h include/grpc/status.h @@ -4314,6 +4316,7 @@ foreach(_hdr include/grpcpp/impl/service_type.h include/grpcpp/impl/status.h include/grpcpp/impl/sync.h + include/grpcpp/passive_listener.h include/grpcpp/resource_quota.h include/grpcpp/security/audit_logging.h include/grpcpp/security/auth_context.h @@ -5054,6 +5057,7 @@ foreach(_hdr include/grpcpp/impl/service_type.h include/grpcpp/impl/status.h include/grpcpp/impl/sync.h + include/grpcpp/passive_listener.h include/grpcpp/resource_quota.h include/grpcpp/security/audit_logging.h include/grpcpp/security/auth_context.h @@ -5504,6 +5508,7 @@ foreach(_hdr include/grpc/impl/propagation_bits.h include/grpc/impl/slice_type.h include/grpc/load_reporting.h + include/grpc/passive_listener.h include/grpc/slice.h include/grpc/slice_buffer.h include/grpc/status.h @@ -26833,6 +26838,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.h + test/core/event_engine/event_engine_test_utils.cc test/core/test_util/cmdline.cc test/core/test_util/fuzzer_util.cc test/core/test_util/grpc_profiler.cc diff --git a/Makefile b/Makefile index a62edd7fca3..25ebda3f986 100644 --- a/Makefile +++ b/Makefile @@ -1775,6 +1775,7 @@ PUBLIC_HEADERS_C += \ include/grpc/impl/propagation_bits.h \ include/grpc/impl/slice_type.h \ include/grpc/load_reporting.h \ + include/grpc/passive_listener.h \ include/grpc/slice.h \ include/grpc/slice_buffer.h \ include/grpc/status.h \ diff --git a/Package.swift b/Package.swift index 9a1f76a9dc2..81f99552ac2 100644 --- a/Package.swift +++ b/Package.swift @@ -93,6 +93,7 @@ let package = Package( "include/grpc/impl/propagation_bits.h", "include/grpc/impl/slice_type.h", "include/grpc/load_reporting.h", + "include/grpc/passive_listener.h", "include/grpc/slice.h", "include/grpc/slice_buffer.h", "include/grpc/status.h", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index dd845a6d959..d15f905f347 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -198,6 +198,7 @@ libs: - include/grpc/impl/propagation_bits.h - include/grpc/impl/slice_type.h - include/grpc/load_reporting.h + - include/grpc/passive_listener.h - include/grpc/slice.h - include/grpc/slice_buffer.h - include/grpc/status.h @@ -2182,6 +2183,7 @@ libs: - include/grpc/impl/propagation_bits.h - include/grpc/impl/slice_type.h - include/grpc/load_reporting.h + - include/grpc/passive_listener.h - include/grpc/slice.h - include/grpc/slice_buffer.h - include/grpc/status.h @@ -3786,6 +3788,7 @@ libs: - include/grpcpp/impl/service_type.h - include/grpcpp/impl/status.h - include/grpcpp/impl/sync.h + - include/grpcpp/passive_listener.h - include/grpcpp/resource_quota.h - include/grpcpp/security/audit_logging.h - include/grpcpp/security/auth_context.h @@ -4213,6 +4216,7 @@ libs: - include/grpcpp/impl/service_type.h - include/grpcpp/impl/status.h - include/grpcpp/impl/sync.h + - include/grpcpp/passive_listener.h - include/grpcpp/resource_quota.h - include/grpcpp/security/audit_logging.h - include/grpcpp/security/auth_context.h @@ -4361,6 +4365,7 @@ libs: - include/grpc/impl/propagation_bits.h - include/grpc/impl/slice_type.h - include/grpc/load_reporting.h + - include/grpc/passive_listener.h - include/grpc/slice.h - include/grpc/slice_buffer.h - include/grpc/status.h @@ -17784,6 +17789,7 @@ targets: build: test language: c++ headers: + - test/core/event_engine/event_engine_test_utils.h - test/core/test_util/cmdline.h - test/core/test_util/evaluate_args_test_util.h - test/core/test_util/fuzzer_util.h @@ -17799,6 +17805,7 @@ targets: - src/proto/grpc/testing/echo_messages.proto - src/proto/grpc/testing/simple_messages.proto - src/proto/grpc/testing/xds/v3/orca_load_report.proto + - test/core/event_engine/event_engine_test_utils.cc - test/core/test_util/cmdline.cc - test/core/test_util/fuzzer_util.cc - test/core/test_util/grpc_profiler.cc diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 94651c55feb..ec37a7d2469 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -176,6 +176,7 @@ Pod::Spec.new do |s| 'include/grpcpp/impl/service_type.h', 'include/grpcpp/impl/status.h', 'include/grpcpp/impl/sync.h', + 'include/grpcpp/passive_listener.h', 'include/grpcpp/resource_quota.h', 'include/grpcpp/security/audit_logging.h', 'include/grpcpp/security/auth_context.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 4e6d835dc2a..5459da7206f 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -168,6 +168,7 @@ Pod::Spec.new do |s| 'include/grpc/impl/propagation_bits.h', 'include/grpc/impl/slice_type.h', 'include/grpc/load_reporting.h', + 'include/grpc/passive_listener.h', 'include/grpc/slice.h', 'include/grpc/slice_buffer.h', 'include/grpc/status.h', diff --git a/grpc.gemspec b/grpc.gemspec index 53f5845e405..44f58db3185 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -99,6 +99,7 @@ Gem::Specification.new do |s| s.files += %w( include/grpc/impl/propagation_bits.h ) s.files += %w( include/grpc/impl/slice_type.h ) s.files += %w( include/grpc/load_reporting.h ) + s.files += %w( include/grpc/passive_listener.h ) s.files += %w( include/grpc/slice.h ) s.files += %w( include/grpc/slice_buffer.h ) s.files += %w( include/grpc/status.h ) diff --git a/include/grpc/module.modulemap b/include/grpc/module.modulemap index c9219532826..30427e21d05 100644 --- a/include/grpc/module.modulemap +++ b/include/grpc/module.modulemap @@ -38,6 +38,7 @@ header "byte_buffer.h" header "impl/propagation_bits.h" header "impl/slice_type.h" header "load_reporting.h" + header "passive_listener.h" header "slice.h" header "slice_buffer.h" header "status.h" diff --git a/include/grpc/passive_listener.h b/include/grpc/passive_listener.h new file mode 100644 index 00000000000..fffc789dba0 --- /dev/null +++ b/include/grpc/passive_listener.h @@ -0,0 +1,62 @@ +// Copyright 2024 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_PASSIVE_LISTENER_H +#define GRPC_PASSIVE_LISTENER_H + +#include + +#include +#include +// #include + +namespace grpc_core { +class Server; + +namespace experimental { +class PassiveListenerImpl; + +/// -- EXPERIMENTAL API -- +/// Interface for used for Server Endpoint injection. +class PassiveListener { + public: + virtual ~PassiveListener() = default; + /// -- EXPERIMENTAL API -- + /// + /// Takes an Endpoint for an established connection, and treats it as if the + /// connection had been accepted by the server. + /// + /// The server must be started before endpoints can be accepted. + virtual absl::Status AcceptConnectedEndpoint( + std::unique_ptr + endpoint) = 0; + + /// -- EXPERIMENTAL API -- + /// + /// Takes a connected file descriptor, and treats it as if the server had + /// accepted the connection itself. + /// + /// Returns a failure status if the server's active EventEngine does not + /// support Endpoint creation from fds. + virtual absl::Status AcceptConnectedFd(int fd) = 0; +}; + +} // namespace experimental +} // namespace grpc_core + +absl::Status grpc_server_add_passive_listener( + grpc_core::Server* server, grpc_server_credentials* credentials, + std::shared_ptr + passive_listener); + +#endif /* GRPC_PASSIVE_LISTENER_H */ diff --git a/include/grpcpp/passive_listener.h b/include/grpcpp/passive_listener.h new file mode 100644 index 00000000000..8def9592b5e --- /dev/null +++ b/include/grpcpp/passive_listener.h @@ -0,0 +1,27 @@ +// Copyright 2024 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPCPP_PASSIVE_LISTENER_H +#define GRPCPP_PASSIVE_LISTENER_H + +#include + +namespace grpc { +namespace experimental { + +using grpc_core::experimental::PassiveListener; + +} // namespace experimental +} // namespace grpc + +#endif // GRPCPP_PASSIVE_LISTENER_H diff --git a/include/grpcpp/security/server_credentials.h b/include/grpcpp/security/server_credentials.h index fe8c546190d..e3f5c2bbbfe 100644 --- a/include/grpcpp/security/server_credentials.h +++ b/include/grpcpp/security/server_credentials.h @@ -84,6 +84,7 @@ class ServerCredentials : private grpc::internal::GrpcLibrary { // Needed for access to AddPortToServer. friend class Server; // Needed for access to c_creds_. + friend class ServerBuilder; friend std::shared_ptr grpc::XdsServerCredentials( const std::shared_ptr& fallback_credentials); diff --git a/include/grpcpp/server_builder.h b/include/grpcpp/server_builder.h index e6266a90d98..e66ca4301cd 100644 --- a/include/grpcpp/server_builder.h +++ b/include/grpcpp/server_builder.h @@ -25,13 +25,17 @@ #include #include +#include +#include #include #include #include #include #include #include +#include #include +#include #include #include #include @@ -291,6 +295,18 @@ class ServerBuilder { void EnableCallMetricRecording( experimental::ServerMetricRecorder* server_metric_recorder = nullptr); + // Creates a passive listener for Server Endpoint injection. + /// + /// \a PasiveListener lets applications provide pre-established connections + /// to gRPC Servers. The server will behave as if it accepted the connection + /// itself on its own listening addresses. + /// + /// This can be called multiple times to create passive listeners with + /// different server credentials. + ServerBuilder& AddPassiveListener( + std::shared_ptr creds, + std::unique_ptr& passive_listener); + private: ServerBuilder* builder_; }; @@ -364,6 +380,17 @@ class ServerBuilder { private: friend class grpc::testing::ServerBuilderPluginTest; + struct UnstartedPassiveListener { + std::weak_ptr + passive_listener; + std::shared_ptr credentials; + UnstartedPassiveListener( + std::weak_ptr listener, + std::shared_ptr creds) + : passive_listener(std::move(listener)), + credentials(std::move(creds)) {} + }; + struct SyncServerSettings { SyncServerSettings() : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {} @@ -388,6 +415,7 @@ class ServerBuilder { std::vector> options_; std::vector> services_; std::vector ports_; + std::vector unstarted_passive_listeners_; SyncServerSettings sync_server_settings_; diff --git a/package.xml b/package.xml index e7600613939..70f64915ef8 100644 --- a/package.xml +++ b/package.xml @@ -81,6 +81,7 @@ + diff --git a/src/core/BUILD b/src/core/BUILD index ec15ddb027c..ddb7e1d6669 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -6892,6 +6892,8 @@ grpc_cc_library( "connection_quota", "error", "error_utils", + "event_engine_extensions", + "event_engine_query_extensions", "grpc_insecure_credentials", "handshaker_registry", "iomgr_fwd", diff --git a/src/core/ext/transport/binder/server/binder_server.cc b/src/core/ext/transport/binder/server/binder_server.cc index faf1641d7d1..6f4d932f436 100644 --- a/src/core/ext/transport/binder/server/binder_server.cc +++ b/src/core/ext/transport/binder/server/binder_server.cc @@ -160,7 +160,7 @@ class BinderServerListener : public Server::ListenerInterface { on_destroy_done_ = on_destroy_done; } - void Orphan() override { delete this; } + void Orphan() override { Unref(); } ~BinderServerListener() override { ExecCtx::Get()->Flush(); @@ -240,9 +240,8 @@ bool AddBinderPort(const std::string& addr, grpc_server* server, } std::string conn_id = addr.substr(kBinderUriScheme.size()); Server* core_server = Server::FromC(server); - core_server->AddListener( - OrphanablePtr(new BinderServerListener( - core_server, conn_id, std::move(factory), security_policy))); + core_server->AddListener(MakeOrphanable( + core_server, conn_id, std::move(factory), security_policy)); return true; } diff --git a/src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc b/src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc index e00bcc1b379..7dd94de3090 100644 --- a/src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc +++ b/src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc @@ -103,8 +103,8 @@ absl::StatusOr ChaoticGoodServerListener::Bind( str.ok() ? str->c_str() : str.status().ToString().c_str()); } EventEngine::Listener::AcceptCallback accept_cb = - [self = Ref()](std::unique_ptr ep, - MemoryAllocator) { + [self = RefAsSubclass()]( + std::unique_ptr ep, MemoryAllocator) { ExecCtx exec_ctx; MutexLock lock(&self->mu_); if (self->shutdown_) return; @@ -149,7 +149,8 @@ absl::Status ChaoticGoodServerListener::StartListening() { ChaoticGoodServerListener::ActiveConnection::ActiveConnection( RefCountedPtr listener, std::unique_ptr endpoint) - : memory_allocator_(listener->memory_allocator_), listener_(listener) { + : memory_allocator_(listener->memory_allocator_), + listener_(std::move(listener)) { handshaking_state_ = MakeRefCounted(Ref()); handshaking_state_->Start(std::move(endpoint)); } diff --git a/src/core/ext/transport/chaotic_good/server/chaotic_good_server.h b/src/core/ext/transport/chaotic_good/server/chaotic_good_server.h index 404bbbf946d..0479016c15b 100644 --- a/src/core/ext/transport/chaotic_good/server/chaotic_good_server.h +++ b/src/core/ext/transport/chaotic_good/server/chaotic_good_server.h @@ -49,9 +49,7 @@ namespace grpc_core { namespace chaotic_good { -class ChaoticGoodServerListener final - : public Server::ListenerInterface, - public RefCounted { +class ChaoticGoodServerListener final : public Server::ListenerInterface { public: static absl::AnyInvocable DefaultConnectionIDGenerator() { return [bitgen = absl::BitGen()]() mutable { diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 9b7770e4930..1c2ab26dc24 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -42,6 +42,7 @@ #include #include #include +#include #include #include #include @@ -58,6 +59,8 @@ #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/event_engine/extensions/supports_fd.h" +#include "src/core/lib/event_engine/query_extensions.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -67,6 +70,7 @@ #include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/event_engine_shims/endpoint.h" #include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -93,9 +97,11 @@ #endif // GPR_SUPPORT_CHANNELS_FROM_FD namespace grpc_core { -namespace { -using ::grpc_event_engine::experimental::EventEngine; +using grpc_event_engine::experimental::ChannelArgsEndpointConfig; +using grpc_event_engine::experimental::EventEngine; +using grpc_event_engine::experimental::EventEngineSupportsFdExtension; +using grpc_event_engine::experimental::QueryExtension; const char kUnixUriPrefix[] = "unix:"; const char kUnixAbstractUriPrefix[] = "unix-abstract:"; @@ -112,14 +118,23 @@ class Chttp2ServerListener : public Server::ListenerInterface { Server* server, const char* name, const ChannelArgs& args, Chttp2ServerArgsModifier args_modifier); + static Chttp2ServerListener* CreateForPassiveListener( + Server* server, const ChannelArgs& args, + std::shared_ptr passive_listener); + // Do not instantiate directly. Use one of the factory methods above. Chttp2ServerListener(Server* server, const ChannelArgs& args, - Chttp2ServerArgsModifier args_modifier); + Chttp2ServerArgsModifier args_modifier, + grpc_server_config_fetcher* config_fetcher, + std::shared_ptr + passive_listener = nullptr); ~Chttp2ServerListener() override; void Start(Server* server, const std::vector* pollsets) override; + void AcceptConnectedEndpoint(std::unique_ptr endpoint); + channelz::ListenSocketNode* channelz_listen_socket_node() const override { return channelz_listen_socket_.get(); } @@ -129,6 +144,8 @@ class Chttp2ServerListener : public Server::ListenerInterface { void Orphan() override; private: + friend class experimental::PassiveListenerImpl; + class ConfigFetcherWatcher : public grpc_server_config_fetcher::WatcherInterface { public: @@ -235,34 +252,8 @@ class Chttp2ServerListener : public Server::ListenerInterface { static void DestroyListener(Server* /*server*/, void* arg, grpc_closure* destroy_done); - // The interface required by RefCountedPtr<> has been manually implemented - // here to take a ref on tcp_server_ instead. Note that, the handshaker - // needs tcp_server_ to exist for the lifetime of the handshake since it's - // needed by acceptor. Sharing refs between the listener and tcp_server_ is - // just an optimization to avoid taking additional refs on the listener, - // since TcpServerShutdownComplete already holds a ref to the listener. - void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); } - void IncrementRefCount(const DebugLocation& /* location */, - const char* /* reason */) { - IncrementRefCount(); - } - - GRPC_MUST_USE_RESULT RefCountedPtr Ref() { - IncrementRefCount(); - return RefCountedPtr(this); - } - GRPC_MUST_USE_RESULT RefCountedPtr Ref( - const DebugLocation& /* location */, const char* /* reason */) { - return Ref(); - } - - void Unref() { grpc_tcp_server_unref(tcp_server_); } - void Unref(const DebugLocation& /* location */, const char* /* reason */) { - Unref(); - } - - Server* const server_; - grpc_tcp_server* tcp_server_; + Server* const server_ = nullptr; + grpc_tcp_server* tcp_server_ = nullptr; grpc_resolved_address resolved_address_; Chttp2ServerArgsModifier const args_modifier_; ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr; @@ -285,6 +276,10 @@ class Chttp2ServerListener : public Server::ListenerInterface { RefCountedPtr channelz_listen_socket_; MemoryQuotaRefPtr memory_quota_; ConnectionQuotaRefPtr connection_quota_; + grpc_server_config_fetcher* config_fetcher_ = nullptr; + // TODO(yashykt): consider using absl::variant<> to minimize memory usage for + // disjoint cases where different fields are used. + std::shared_ptr passive_listener_; }; // @@ -381,13 +376,17 @@ Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState( handshake_mgr_(MakeRefCounted()), deadline_(GetConnectionDeadline(args)), interested_parties_(grpc_pollset_set_create()) { - grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_); + if (accepting_pollset != nullptr) { + grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_); + } CoreConfiguration::Get().handshaker_registry().AddHandshakers( HANDSHAKER_SERVER, args, interested_parties_, handshake_mgr_.get()); } Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() { - grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_); + if (accepting_pollset_ != nullptr) { + grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_); + } grpc_pollset_set_destroy(interested_parties_); gpr_free(acceptor_); } @@ -589,7 +588,11 @@ Chttp2ServerListener::ActiveConnection::ActiveConnection( grpc_schedule_on_exec_ctx); } -Chttp2ServerListener::ActiveConnection::~ActiveConnection() {} +Chttp2ServerListener::ActiveConnection::~ActiveConnection() { + if (listener_ != nullptr && listener_->tcp_server_ != nullptr) { + grpc_tcp_server_unref(listener_->tcp_server_); + } +} void Chttp2ServerListener::ActiveConnection::Orphan() { OrphanablePtr handshaking_state; @@ -637,6 +640,9 @@ void Chttp2ServerListener::ActiveConnection::Start( const ChannelArgs& args) { RefCountedPtr handshaking_state_ref; listener_ = std::move(listener); + if (listener_->tcp_server_ != nullptr) { + grpc_tcp_server_ref(listener_->tcp_server_); + } { ReleasableMutexLock lock(&mu_); if (shutdown_) { @@ -709,83 +715,82 @@ void Chttp2ServerListener::ActiveConnection::OnDrainGraceTimeExpiry() { grpc_error_handle Chttp2ServerListener::Create( Server* server, grpc_resolved_address* addr, const ChannelArgs& args, Chttp2ServerArgsModifier args_modifier, int* port_num) { - Chttp2ServerListener* listener = nullptr; - // The bulk of this method is inside of a lambda to make cleanup - // easier without using goto. - grpc_error_handle error = [&]() { - grpc_error_handle error; - // Create Chttp2ServerListener. - listener = new Chttp2ServerListener(server, args, args_modifier); - error = grpc_tcp_server_create( - &listener->tcp_server_shutdown_complete_, - grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), - OnAccept, listener, &listener->tcp_server_); + // Create Chttp2ServerListener. + OrphanablePtr listener = + MakeOrphanable(server, args, args_modifier, + server->config_fetcher()); + // The tcp_server will be unreffed when the listener is orphaned, which could + // be at the end of this function if the listener was not added to the + // server's set of listeners. + grpc_error_handle error = grpc_tcp_server_create( + &listener->tcp_server_shutdown_complete_, ChannelArgsEndpointConfig(args), + OnAccept, listener.get(), &listener->tcp_server_); + if (!error.ok()) return error; + if (listener->config_fetcher_ != nullptr) { + listener->resolved_address_ = *addr; + // TODO(yashykt): Consider binding so as to be able to return the port + // number. + } else { + error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num); if (!error.ok()) return error; - if (server->config_fetcher() != nullptr) { - listener->resolved_address_ = *addr; - // TODO(yashykt): Consider binding so as to be able to return the port - // number. - } else { - error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num); - if (!error.ok()) return error; - } - // Create channelz node. - if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ) - .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) { - auto string_address = grpc_sockaddr_to_uri(addr); - if (!string_address.ok()) { - return GRPC_ERROR_CREATE(string_address.status().ToString()); - } - listener->channelz_listen_socket_ = - MakeRefCounted( - *string_address, - absl::StrCat("chttp2 listener ", *string_address)); - } - // Register with the server only upon success - server->AddListener(OrphanablePtr(listener)); - return absl::OkStatus(); - }(); - if (!error.ok()) { - if (listener != nullptr) { - if (listener->tcp_server_ != nullptr) { - // listener is deleted when tcp_server_ is shutdown. - grpc_tcp_server_unref(listener->tcp_server_); - } else { - delete listener; - } + } + // Create channelz node. + if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ) + .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) { + auto string_address = grpc_sockaddr_to_uri(addr); + if (!string_address.ok()) { + return GRPC_ERROR_CREATE(string_address.status().ToString()); } + listener->channelz_listen_socket_ = + MakeRefCounted( + *string_address, absl::StrCat("chttp2 listener ", *string_address)); } - return error; + // Register with the server only upon success + server->AddListener(std::move(listener)); + return absl::OkStatus(); } grpc_error_handle Chttp2ServerListener::CreateWithAcceptor( Server* server, const char* name, const ChannelArgs& args, Chttp2ServerArgsModifier args_modifier) { - Chttp2ServerListener* listener = - new Chttp2ServerListener(server, args, args_modifier); + auto listener = MakeOrphanable( + server, args, args_modifier, server->config_fetcher()); grpc_error_handle error = grpc_tcp_server_create( - &listener->tcp_server_shutdown_complete_, - grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), - OnAccept, listener, &listener->tcp_server_); - if (!error.ok()) { - delete listener; - return error; - } + &listener->tcp_server_shutdown_complete_, ChannelArgsEndpointConfig(args), + OnAccept, listener.get(), &listener->tcp_server_); + if (!error.ok()) return error; // TODO(yangg) channelz TcpServerFdHandler** arg_val = args.GetPointer(name); *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_); - server->AddListener(OrphanablePtr(listener)); + server->AddListener(std::move(listener)); return absl::OkStatus(); } +Chttp2ServerListener* Chttp2ServerListener::CreateForPassiveListener( + Server* server, const ChannelArgs& args, + std::shared_ptr passive_listener) { + // TODO(hork): figure out how to handle channelz in this case + auto listener = MakeOrphanable( + server, args, /*args_modifier=*/ + [](const ChannelArgs& args, grpc_error_handle*) { return args; }, nullptr, + std::move(passive_listener)); + auto listener_ptr = listener.get(); + server->AddListener(std::move(listener)); + return listener_ptr; +} + Chttp2ServerListener::Chttp2ServerListener( Server* server, const ChannelArgs& args, - Chttp2ServerArgsModifier args_modifier) + Chttp2ServerArgsModifier args_modifier, + grpc_server_config_fetcher* config_fetcher, + std::shared_ptr passive_listener) : server_(server), args_modifier_(args_modifier), args_(args), memory_quota_(args.GetObject()->memory_quota()), - connection_quota_(MakeRefCounted()) { + connection_quota_(MakeRefCounted()), + config_fetcher_(config_fetcher), + passive_listener_(std::move(passive_listener)) { auto max_allowed_incoming_connections = args.GetInt(GRPC_ARG_MAX_ALLOWED_INCOMING_CONNECTIONS); if (max_allowed_incoming_connections.has_value()) { @@ -800,6 +805,9 @@ Chttp2ServerListener::~Chttp2ServerListener() { // Flush queued work before destroying handshaker factory, since that // may do a synchronous unref. ExecCtx::Get()->Flush(); + if (passive_listener_ != nullptr) { + passive_listener_->ListenerDestroyed(); + } if (on_destroy_done_ != nullptr) { ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, absl::OkStatus()); ExecCtx::Get()->Flush(); @@ -809,10 +817,11 @@ Chttp2ServerListener::~Chttp2ServerListener() { // Server callback: start listening on our ports void Chttp2ServerListener::Start( Server* /*server*/, const std::vector* /* pollsets */) { - if (server_->config_fetcher() != nullptr) { - auto watcher = std::make_unique(Ref()); + if (config_fetcher_ != nullptr) { + auto watcher = std::make_unique( + RefAsSubclass()); config_fetcher_watcher_ = watcher.get(); - server_->config_fetcher()->StartWatch( + config_fetcher_->StartWatch( grpc_sockaddr_to_string(&resolved_address_, false).value(), std::move(watcher)); } else { @@ -826,7 +835,9 @@ void Chttp2ServerListener::Start( } void Chttp2ServerListener::StartListening() { - grpc_tcp_server_start(tcp_server_, &server_->pollsets()); + if (tcp_server_ != nullptr) { + grpc_tcp_server_start(tcp_server_, &server_->pollsets()); + } } void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) { @@ -834,6 +845,12 @@ void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) { on_destroy_done_ = on_destroy_done; } +void Chttp2ServerListener::AcceptConnectedEndpoint( + std::unique_ptr endpoint) { + OnAccept(this, grpc_event_engine_endpoint_create(std::move(endpoint)), + /*accepting_pollset=*/nullptr, /*acceptor=*/nullptr); +} + void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp, grpc_pollset* accepting_pollset, grpc_tcp_server_acceptor* acceptor) { @@ -858,7 +875,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp, endpoint_cleanup(error); return; } - if (self->server_->config_fetcher() != nullptr) { + if (self->config_fetcher_ != nullptr) { if (connection_manager == nullptr) { grpc_error_handle error = GRPC_ERROR_CREATE( "No ConnectionManager configured. Closing connection."); @@ -899,7 +916,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp, // heap-use-after-free issues where `Ref()` is invoked when the ref of // tcp_server_ has already reached 0. (Ref() implementation of // Chttp2ServerListener is grpc_tcp_server_ref().) - listener_ref = self->Ref(); + listener_ref = self->RefAsSubclass(); self->connections_.emplace(connection.get(), std::move(connection)); } } @@ -914,7 +931,7 @@ void Chttp2ServerListener::TcpServerShutdownComplete( void* arg, grpc_error_handle /*error*/) { Chttp2ServerListener* self = static_cast(arg); self->channelz_listen_socket_.reset(); - delete self; + self->Unref(); } // Server callback: destroy the tcp listener (so we don't generate further @@ -923,7 +940,8 @@ void Chttp2ServerListener::Orphan() { // Cancel the watch before shutting down so as to avoid holding a ref to the // listener in the watcher. if (config_fetcher_watcher_ != nullptr) { - server_->config_fetcher()->CancelWatch(config_fetcher_watcher_); + CHECK_NE(config_fetcher_, nullptr); + config_fetcher_->CancelWatch(config_fetcher_watcher_); } std::map> connections; grpc_tcp_server* tcp_server; @@ -941,12 +959,14 @@ void Chttp2ServerListener::Orphan() { } tcp_server = tcp_server_; } - grpc_tcp_server_shutdown_listeners(tcp_server); - grpc_tcp_server_unref(tcp_server); + if (tcp_server != nullptr) { + grpc_tcp_server_shutdown_listeners(tcp_server); + grpc_tcp_server_unref(tcp_server); + } else { + Unref(); + } } -} // namespace - // // Chttp2ServerAddPort() // @@ -1047,6 +1067,50 @@ ChannelArgs ModifyArgsForConnection(const ChannelArgs& args, } } // namespace + +namespace experimental { + +absl::Status PassiveListenerImpl::AcceptConnectedEndpoint( + std::unique_ptr endpoint) { + CHECK_NE(server_.get(), nullptr); + RefCountedPtr listener; + { + MutexLock lock(&mu_); + if (listener_ != nullptr) { + listener = + listener_->RefIfNonZero().TakeAsSubclass(); + } + } + if (listener == nullptr) { + return absl::UnavailableError("passive listener already shut down"); + } + ExecCtx exec_ctx; + listener->AcceptConnectedEndpoint(std::move(endpoint)); + return absl::OkStatus(); +} + +absl::Status PassiveListenerImpl::AcceptConnectedFd(int fd) { + CHECK_NE(server_.get(), nullptr); + ExecCtx exec_ctx; + auto& args = server_->channel_args(); + auto* supports_fd = QueryExtension( + /*engine=*/args.GetObjectRef().get()); + if (supports_fd == nullptr) { + return absl::UnimplementedError( + "The server's EventEngine does not support adding endpoints from " + "connected file descriptors."); + } + auto endpoint = + supports_fd->CreateEndpointFromFd(fd, ChannelArgsEndpointConfig(args)); + return AcceptConnectedEndpoint(std::move(endpoint)); +} + +void PassiveListenerImpl::ListenerDestroyed() { + MutexLock lock(&mu_); + listener_ = nullptr; +} + +} // namespace experimental } // namespace grpc_core int grpc_server_add_http2_port(grpc_server* server, const char* addr, @@ -1144,3 +1208,31 @@ void grpc_server_add_channel_from_fd(grpc_server* /* server */, int /* fd */, } #endif // GPR_SUPPORT_CHANNELS_FROM_FD + +absl::Status grpc_server_add_passive_listener( + grpc_core::Server* server, grpc_server_credentials* credentials, + std::shared_ptr + passive_listener) { + grpc_core::ExecCtx exec_ctx; + GRPC_API_TRACE("grpc_server_add_passive_listener(server=%p, credentials=%p)", + 2, (server, credentials)); + // Create security context. + if (credentials == nullptr) { + return absl::UnavailableError( + "No credentials specified for passive listener"); + } + auto sc = credentials->create_security_connector(grpc_core::ChannelArgs()); + if (sc == nullptr) { + return absl::UnavailableError( + absl::StrCat("Unable to create secure server with credentials of type ", + credentials->type().name())); + } + auto args = server->channel_args() + .SetObject(credentials->Ref()) + .SetObject(std::move(sc)); + passive_listener->listener_ = + grpc_core::Chttp2ServerListener::CreateForPassiveListener( + server, args, passive_listener); + passive_listener->server_ = server->Ref(); + return absl::OkStatus(); +} diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h index 26c178be917..25d38236aed 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.h +++ b/src/core/ext/transport/chttp2/server/chttp2_server.h @@ -21,6 +21,7 @@ #include +#include #include #include "src/core/lib/channel/channel_args.h" @@ -42,6 +43,38 @@ grpc_error_handle Chttp2ServerAddPort( Server* server, const char* addr, const ChannelArgs& args, Chttp2ServerArgsModifier connection_args_modifier, int* port_num); +class Chttp2ServerListener; +namespace experimental { + +// An implementation of the public C++ passive listener interface. +// The server builder holds a weak_ptr to one of these objects, and the +// application owns the instance. +class PassiveListenerImpl final : public PassiveListener { + public: + absl::Status AcceptConnectedEndpoint( + std::unique_ptr + endpoint) override ABSL_LOCKS_EXCLUDED(mu_); + + absl::Status AcceptConnectedFd(GRPC_UNUSED int fd) override + ABSL_LOCKS_EXCLUDED(mu_); + + void ListenerDestroyed() ABSL_LOCKS_EXCLUDED(mu_); + + private: + // note: the grpc_core::Server redundant namespace qualification is + // required for older gcc versions. + friend absl::Status(::grpc_server_add_passive_listener)( + grpc_core::Server* server, grpc_server_credentials* credentials, + std::shared_ptr + passive_listener); + + Mutex mu_; + // Data members will be populated when initialized. + RefCountedPtr server_; + Chttp2ServerListener* listener_; +}; + +} // namespace experimental } // namespace grpc_core #endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H diff --git a/src/core/lib/event_engine/extensions/supports_fd.h b/src/core/lib/event_engine/extensions/supports_fd.h index 66ea1c2b345..30f0d2ad0e6 100644 --- a/src/core/lib/event_engine/extensions/supports_fd.h +++ b/src/core/lib/event_engine/extensions/supports_fd.h @@ -112,6 +112,13 @@ class EventEngineSupportsFdExtension { int fd, const EndpointConfig& config, MemoryAllocator memory_allocator) = 0; + /// Creates an EventEngine::Endpoint from an fd which is already assumed to be + /// connected to a remote peer. See \a CreatePosixEndpointFromFd for details. + /// This has the same behavior, but the \a memory_allocator is taken from the + /// EndpointConfig's resource quota. + virtual std::unique_ptr CreateEndpointFromFd( + int fd, const EndpointConfig& config) = 0; + /// Called when the posix listener has accepted a new client connection. /// \a listener_fd - The listening socket fd that accepted the new client /// connection. diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc index e96c969617c..e4d5deb73dc 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -677,6 +677,22 @@ PosixEventEngine::CreatePosixEndpointFromFd(int fd, #endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING } +std::unique_ptr PosixEventEngine::CreateEndpointFromFd( + int fd, const EndpointConfig& config) { + auto options = TcpOptionsFromEndpointConfig(config); + MemoryAllocator allocator; + if (options.memory_allocator_factory != nullptr) { + return CreatePosixEndpointFromFd( + fd, config, + options.memory_allocator_factory->CreateMemoryAllocator( + absl::StrCat("allocator:", fd))); + } + return CreatePosixEndpointFromFd( + fd, config, + options.resource_quota->memory_quota()->CreateMemoryAllocator( + absl::StrCat("allocator:", fd))); +} + absl::StatusOr> PosixEventEngine::CreateListener( Listener::AcceptCallback on_accept, diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.h b/src/core/lib/event_engine/posix_engine/posix_engine.h index 257cd6b34b5..ea426c379c9 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.h +++ b/src/core/lib/event_engine/posix_engine/posix_engine.h @@ -172,6 +172,8 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport, std::unique_ptr CreatePosixEndpointFromFd( int fd, const EndpointConfig& config, MemoryAllocator memory_allocator) override; + std::unique_ptr CreateEndpointFromFd( + int fd, const EndpointConfig& config) override; absl::StatusOr> CreateListener( Listener::AcceptCallback on_accept, diff --git a/src/core/server/server.h b/src/core/server/server.h index 486892b14d8..a4353794a37 100644 --- a/src/core/server/server.h +++ b/src/core/server/server.h @@ -39,6 +39,7 @@ #include "absl/types/optional.h" #include +#include #include #include #include @@ -74,6 +75,9 @@ "grpc.server.max_pending_requests_hard_limit" namespace grpc_core { +namespace experimental { +class PassiveListenerImpl; +} // namespace experimental extern TraceFlag grpc_server_channel_trace; @@ -112,7 +116,7 @@ class Server : public ServerInterface, /// Interface for listeners. /// Implementations must override the Orphan() method, which should stop /// listening and initiate destruction of the listener. - class ListenerInterface : public Orphanable { + class ListenerInterface : public InternallyRefCounted { public: ~ListenerInterface() override = default; @@ -212,6 +216,14 @@ class Server : public ServerInterface, void SendGoaways() ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_); private: + // note: the grpc_core::Server redundant namespace qualification is + // required for older gcc versions. + // TODO(yashykt): eliminate this friend statement as part of your upcoming + // server listener refactoring. + friend absl::Status(::grpc_server_add_passive_listener)( + grpc_core::Server* server, grpc_server_credentials* credentials, + std::shared_ptr + passive_listener); struct RequestedCall; class RequestMatcherInterface; diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index f989703fe1a..b9f71915981 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -47,11 +48,38 @@ #include #include +#include "src/core/ext/transport/chttp2/server/chttp2_server.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/server/server.h" #include "src/cpp/server/external_connection_acceptor_impl.h" namespace grpc { +namespace { + +// A PIMPL wrapper class that owns the only ref to the passive listener +// implementation. This is returned to the application. +class PassiveListenerOwner final + : public grpc_core::experimental::PassiveListener { + public: + explicit PassiveListenerOwner(std::shared_ptr listener) + : listener_(std::move(listener)) {} + + absl::Status AcceptConnectedEndpoint( + std::unique_ptr + endpoint) override { + return listener_->AcceptConnectedEndpoint(std::move(endpoint)); + } + + absl::Status AcceptConnectedFd(int fd) override { + return listener_->AcceptConnectedFd(fd); + } + + private: + std::shared_ptr listener_; +}; + +} // namespace static std::vector (*)()>* g_plugin_factory_list; @@ -225,6 +253,18 @@ ServerBuilder& ServerBuilder::SetResourceQuota( return *this; } +ServerBuilder& ServerBuilder::experimental_type::AddPassiveListener( + std::shared_ptr creds, + std::unique_ptr& passive_listener) { + auto core_passive_listener = + std::make_shared(); + builder_->unstarted_passive_listeners_.emplace_back(core_passive_listener, + std::move(creds)); + passive_listener = + std::make_unique(std::move(core_passive_listener)); + return *builder_; +} + ServerBuilder& ServerBuilder::AddListeningPort( const std::string& addr_uri, std::shared_ptr creds, int* selected_port) { @@ -398,6 +438,26 @@ std::unique_ptr ServerBuilder::BuildAndStart() { cq->RegisterServer(server.get()); } + for (auto& unstarted_listener : unstarted_passive_listeners_) { + has_frequently_polled_cqs = true; + auto passive_listener = unstarted_listener.passive_listener.lock(); + auto* core_server = grpc_core::Server::FromC(server->c_server()); + if (passive_listener != nullptr) { + auto* creds = unstarted_listener.credentials->c_creds(); + if (creds == nullptr) { + gpr_log(GPR_ERROR, "Credentials missing for PassiveListener"); + return nullptr; + } + auto success = grpc_server_add_passive_listener( + core_server, creds, std::move(passive_listener)); + if (!success.ok()) { + gpr_log(GPR_ERROR, "Failed to create a passive listener: %s", + success.ToString().c_str()); + return nullptr; + } + } + } + if (!has_frequently_polled_cqs) { gpr_log(GPR_ERROR, "At least one of the completion queues must be frequently polled"); diff --git a/test/core/event_engine/event_engine_test_utils.h b/test/core/event_engine/event_engine_test_utils.h index 67bbe42d35d..debef7a0449 100644 --- a/test/core/event_engine/event_engine_test_utils.h +++ b/test/core/event_engine/event_engine_test_utils.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -159,6 +160,70 @@ class NotifyOnDelete { grpc_core::Notification* signal_; }; +// An endpoint implementation that supports Read and Write via std::threads. +// Passing a grpc_core::Notification will allow owners to know when all +// in-flight callbacks have been run, and all endpoint state has been destroyed. +class ThreadedNoopEndpoint : public EventEngine::Endpoint { + public: + explicit ThreadedNoopEndpoint(grpc_core::Notification* destroyed) + : state_(std::make_shared(destroyed)) {} + ~ThreadedNoopEndpoint() override { + std::thread deleter([state = state_]() { + CleanupThread(state->read); + CleanupThread(state->write); + }); + deleter.detach(); + } + + bool Read(absl::AnyInvocable on_read, SliceBuffer* buffer, + const ReadArgs* /* args */) override { + buffer->Clear(); + CleanupThread(state_->read); + state_->read = new std::thread([cb = std::move(on_read)]() mutable { + cb(absl::UnknownError("test")); + }); + return false; + } + + bool Write(absl::AnyInvocable on_writable, + SliceBuffer* data, const WriteArgs* /* args */) override { + data->Clear(); + CleanupThread(state_->write); + state_->write = new std::thread([cb = std::move(on_writable)]() mutable { + cb(absl::UnknownError("test")); + }); + return false; + } + + const EventEngine::ResolvedAddress& GetPeerAddress() const override { + return peer_; + } + + const EventEngine::ResolvedAddress& GetLocalAddress() const override { + return local_; + } + + private: + struct EndpointState { + explicit EndpointState(grpc_core::Notification* deleter) + : delete_notifier_(deleter) {} + std::thread* read = nullptr; + std::thread* write = nullptr; + NotifyOnDelete delete_notifier_; + }; + + static void CleanupThread(std::thread* thd) { + if (thd != nullptr) { + thd->join(); + delete thd; + } + } + + std::shared_ptr state_; + EventEngine::ResolvedAddress peer_; + EventEngine::ResolvedAddress local_; +}; + } // namespace experimental } // namespace grpc_event_engine diff --git a/test/cpp/server/BUILD b/test/cpp/server/BUILD index ba2f115ee7a..b65ec5c9e86 100644 --- a/test/cpp/server/BUILD +++ b/test/cpp/server/BUILD @@ -28,6 +28,7 @@ grpc_cc_test( deps = [ "//:grpc++_unsecure", "//src/proto/grpc/testing:echo_proto", + "//test/core/event_engine:event_engine_test_utils", "//test/core/test_util:grpc_test_util_base", "//test/core/test_util:grpc_test_util_unsecure", ], diff --git a/test/cpp/server/server_builder_test.cc b/test/cpp/server/server_builder_test.cc index 1765a1d59bf..089f7c38ecf 100644 --- a/test/cpp/server/server_builder_test.cc +++ b/test/cpp/server/server_builder_test.cc @@ -16,14 +16,19 @@ // // +#include + #include +#include #include #include #include #include +#include "src/core/lib/gprpp/notification.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/event_engine/event_engine_test_utils.h" #include "test/core/test_util/port.h" #include "test/core/test_util/test_config.h" @@ -83,6 +88,56 @@ TEST_F(ServerBuilderTest, CreateServerRepeatedPortWithDisallowedReusePort) { nullptr); } +TEST_F(ServerBuilderTest, AddPassiveListener) { + std::unique_ptr passive_listener; + auto server = + ServerBuilder() + .experimental() + .AddPassiveListener(InsecureServerCredentials(), passive_listener) + .BuildAndStart(); + server->Shutdown(); +} + +TEST_F(ServerBuilderTest, PassiveListenerAcceptConnectedFd) { + std::unique_ptr passive_listener; + ServerBuilder builder; + auto cq = builder.AddCompletionQueue(); + // TODO(hork): why is the service necessary? Queue isn't drained otherwise. + auto server = + builder.RegisterService(&g_service) + .experimental() + .AddPassiveListener(InsecureServerCredentials(), passive_listener) + .BuildAndStart(); + ASSERT_NE(server.get(), nullptr); +#ifdef GPR_SUPPORT_CHANNELS_FROM_FD + int fd = socket(AF_INET, SOCK_STREAM, 0); + auto accept_status = passive_listener->AcceptConnectedFd(fd); + ASSERT_TRUE(accept_status.ok()) << accept_status; +#else + int fd = -1; + auto accept_status = passive_listener->AcceptConnectedFd(fd); + ASSERT_FALSE(accept_status.ok()) << accept_status; +#endif + server->Shutdown(); +} + +TEST_F(ServerBuilderTest, PassiveListenerAcceptConnectedEndpoint) { + std::unique_ptr passive_listener; + auto server = + ServerBuilder() + .experimental() + .AddPassiveListener(InsecureServerCredentials(), passive_listener) + .BuildAndStart(); + grpc_core::Notification endpoint_destroyed; + auto success = passive_listener->AcceptConnectedEndpoint( + std::make_unique( + &endpoint_destroyed)); + ASSERT_TRUE(success.ok()) + << "AcceptConnectedEndpoint failure: " << success.ToString(); + endpoint_destroyed.WaitForNotification(); + server->Shutdown(); +} + } // namespace } // namespace grpc diff --git a/tools/distrib/check_namespace_qualification.py b/tools/distrib/check_namespace_qualification.py index 6634d8b8830..51ccb71aeb8 100755 --- a/tools/distrib/check_namespace_qualification.py +++ b/tools/distrib/check_namespace_qualification.py @@ -77,6 +77,10 @@ IGNORED_FILES = [ "src/core/lib/gprpp/global_config_env.h", "src/core/lib/profiling/timers.h", "src/core/lib/gprpp/crash.h", + # The grpc_core::Server redundant namespace qualification is required for + # older gcc versions. + "src/core/ext/transport/chttp2/server/chttp2_server.h", + "src/core/server/server.h", ] # find our home diff --git a/tools/distrib/check_redundant_namespace_qualifiers.py b/tools/distrib/check_redundant_namespace_qualifiers.py index 0322332209b..72a5d8b7a3c 100755 --- a/tools/distrib/check_redundant_namespace_qualifiers.py +++ b/tools/distrib/check_redundant_namespace_qualifiers.py @@ -21,6 +21,13 @@ import os import re import sys +IGNORED_FILES = [ + # note: the grpc_core::Server redundant namespace qualification is required + # for older gcc versions. + "src/core/ext/transport/chttp2/server/chttp2_server.h", + "src/core/server/server.h", +] + def find_closing_mustache(contents, initial_depth): """Find the closing mustache for a given number of open mustaches.""" @@ -166,6 +173,8 @@ for config in _CONFIGURATION: for file in files: if file.endswith(".cc") or file.endswith(".h"): path = os.path.join(root, file) + if path in IGNORED_FILES: + continue try: with open(path) as f: contents = f.read() diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index 6a50207e16d..299c480565d 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -928,6 +928,7 @@ include/grpc/impl/grpc_types.h \ include/grpc/impl/propagation_bits.h \ include/grpc/impl/slice_type.h \ include/grpc/load_reporting.h \ +include/grpc/passive_listener.h \ include/grpc/slice.h \ include/grpc/slice_buffer.h \ include/grpc/status.h \ @@ -1040,6 +1041,7 @@ include/grpcpp/impl/server_initializer.h \ include/grpcpp/impl/service_type.h \ include/grpcpp/impl/status.h \ include/grpcpp/impl/sync.h \ +include/grpcpp/passive_listener.h \ include/grpcpp/resource_quota.h \ include/grpcpp/security/audit_logging.h \ include/grpcpp/security/auth_context.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 8dd3bb12622..02efb7460b9 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -928,6 +928,7 @@ include/grpc/impl/grpc_types.h \ include/grpc/impl/propagation_bits.h \ include/grpc/impl/slice_type.h \ include/grpc/load_reporting.h \ +include/grpc/passive_listener.h \ include/grpc/slice.h \ include/grpc/slice_buffer.h \ include/grpc/status.h \ @@ -1040,6 +1041,7 @@ include/grpcpp/impl/server_initializer.h \ include/grpcpp/impl/service_type.h \ include/grpcpp/impl/status.h \ include/grpcpp/impl/sync.h \ +include/grpcpp/passive_listener.h \ include/grpcpp/resource_quota.h \ include/grpcpp/security/audit_logging.h \ include/grpcpp/security/auth_context.h \ diff --git a/tools/doxygen/Doxyfile.core b/tools/doxygen/Doxyfile.core index 592e357f54f..20b553dadfe 100644 --- a/tools/doxygen/Doxyfile.core +++ b/tools/doxygen/Doxyfile.core @@ -861,6 +861,7 @@ include/grpc/impl/grpc_types.h \ include/grpc/impl/propagation_bits.h \ include/grpc/impl/slice_type.h \ include/grpc/load_reporting.h \ +include/grpc/passive_listener.h \ include/grpc/slice.h \ include/grpc/slice_buffer.h \ include/grpc/status.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 74fd1131517..f98a7a95a21 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -861,6 +861,7 @@ include/grpc/impl/grpc_types.h \ include/grpc/impl/propagation_bits.h \ include/grpc/impl/slice_type.h \ include/grpc/load_reporting.h \ +include/grpc/passive_listener.h \ include/grpc/slice.h \ include/grpc/slice_buffer.h \ include/grpc/status.h \