diff --git a/CMakeLists.txt b/CMakeLists.txt index 3622ef0f6e4..e19edb187aa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2102,6 +2102,8 @@ add_library(grpc src/core/lib/event_engine/posix_engine/lockfree_event.cc src/core/lib/event_engine/posix_engine/posix_endpoint.cc src/core/lib/event_engine/posix_engine/posix_engine.cc + src/core/lib/event_engine/posix_engine/posix_engine_listener.cc + src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc src/core/lib/event_engine/posix_engine/timer.cc src/core/lib/event_engine/posix_engine/timer_heap.cc @@ -2763,6 +2765,8 @@ add_library(grpc_unsecure src/core/lib/event_engine/posix_engine/lockfree_event.cc src/core/lib/event_engine/posix_engine/posix_endpoint.cc src/core/lib/event_engine/posix_engine/posix_engine.cc + src/core/lib/event_engine/posix_engine/posix_engine_listener.cc + src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc src/core/lib/event_engine/posix_engine/timer.cc src/core/lib/event_engine/posix_engine/timer_heap.cc @@ -15981,7 +15985,6 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_executable(posix_engine_listener_utils_test - src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc test/core/event_engine/posix/posix_engine_listener_utils_test.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc @@ -16068,6 +16071,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) test/core/event_engine/test_suite/event_engine_test_utils.cc test/core/event_engine/test_suite/oracle_event_engine_posix.cc test/core/event_engine/test_suite/posix_event_engine_test.cc + test/core/event_engine/test_suite/server_test.cc test/core/event_engine/test_suite/timer_test.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc diff --git a/Makefile b/Makefile index 9b78c67fd57..c99f1c69e38 100644 --- a/Makefile +++ b/Makefile @@ -1401,6 +1401,8 @@ LIBGRPC_SRC = \ src/core/lib/event_engine/posix_engine/lockfree_event.cc \ src/core/lib/event_engine/posix_engine/posix_endpoint.cc \ src/core/lib/event_engine/posix_engine/posix_engine.cc \ + src/core/lib/event_engine/posix_engine/posix_engine_listener.cc \ + src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc \ src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc \ src/core/lib/event_engine/posix_engine/timer.cc \ src/core/lib/event_engine/posix_engine/timer_heap.cc \ @@ -1921,6 +1923,8 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/event_engine/posix_engine/lockfree_event.cc \ src/core/lib/event_engine/posix_engine/posix_endpoint.cc \ src/core/lib/event_engine/posix_engine/posix_engine.cc \ + src/core/lib/event_engine/posix_engine/posix_engine_listener.cc \ + src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc \ src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc \ src/core/lib/event_engine/posix_engine/timer.cc \ src/core/lib/event_engine/posix_engine/timer_heap.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index b37f790343b..cb7b95b9171 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -754,6 +754,8 @@ libs: - src/core/lib/event_engine/posix_engine/posix_endpoint.h - src/core/lib/event_engine/posix_engine/posix_engine.h - src/core/lib/event_engine/posix_engine/posix_engine_closure.h + - src/core/lib/event_engine/posix_engine/posix_engine_listener.h + - src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h - src/core/lib/event_engine/posix_engine/tcp_socket_utils.h - src/core/lib/event_engine/posix_engine/timer.h - src/core/lib/event_engine/posix_engine/timer_heap.h @@ -1485,6 +1487,8 @@ libs: - src/core/lib/event_engine/posix_engine/lockfree_event.cc - src/core/lib/event_engine/posix_engine/posix_endpoint.cc - src/core/lib/event_engine/posix_engine/posix_engine.cc + - src/core/lib/event_engine/posix_engine/posix_engine_listener.cc + - src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc - src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc - src/core/lib/event_engine/posix_engine/timer.cc - src/core/lib/event_engine/posix_engine/timer_heap.cc @@ -2025,6 +2029,8 @@ libs: - src/core/lib/event_engine/posix_engine/posix_endpoint.h - src/core/lib/event_engine/posix_engine/posix_engine.h - src/core/lib/event_engine/posix_engine/posix_engine_closure.h + - src/core/lib/event_engine/posix_engine/posix_engine_listener.h + - src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h - src/core/lib/event_engine/posix_engine/tcp_socket_utils.h - src/core/lib/event_engine/posix_engine/timer.h - src/core/lib/event_engine/posix_engine/timer_heap.h @@ -2400,6 +2406,8 @@ libs: - src/core/lib/event_engine/posix_engine/lockfree_event.cc - src/core/lib/event_engine/posix_engine/posix_endpoint.cc - src/core/lib/event_engine/posix_engine/posix_engine.cc + - src/core/lib/event_engine/posix_engine/posix_engine_listener.cc + - src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc - src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc - src/core/lib/event_engine/posix_engine/timer.cc - src/core/lib/event_engine/posix_engine/timer_heap.cc @@ -9607,10 +9615,8 @@ targets: gtest: true build: test language: c++ - headers: - - src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h + headers: [] src: - - src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc - test/core/event_engine/posix/posix_engine_listener_utils_test.cc deps: - grpc_test_util @@ -9651,6 +9657,7 @@ targets: - test/core/event_engine/test_suite/event_engine_test_utils.cc - test/core/event_engine/test_suite/oracle_event_engine_posix.cc - test/core/event_engine/test_suite/posix_event_engine_test.cc + - test/core/event_engine/test_suite/server_test.cc - test/core/event_engine/test_suite/timer_test.cc deps: - grpc_test_util diff --git a/config.m4 b/config.m4 index a4170679fed..8f3b1671aa1 100644 --- a/config.m4 +++ b/config.m4 @@ -483,6 +483,8 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/event_engine/posix_engine/lockfree_event.cc \ src/core/lib/event_engine/posix_engine/posix_endpoint.cc \ src/core/lib/event_engine/posix_engine/posix_engine.cc \ + src/core/lib/event_engine/posix_engine/posix_engine_listener.cc \ + src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc \ src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc \ src/core/lib/event_engine/posix_engine/timer.cc \ src/core/lib/event_engine/posix_engine/timer_heap.cc \ diff --git a/config.w32 b/config.w32 index 499b0427109..bd21e25c36b 100644 --- a/config.w32 +++ b/config.w32 @@ -449,6 +449,8 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\event_engine\\posix_engine\\lockfree_event.cc " + "src\\core\\lib\\event_engine\\posix_engine\\posix_endpoint.cc " + "src\\core\\lib\\event_engine\\posix_engine\\posix_engine.cc " + + "src\\core\\lib\\event_engine\\posix_engine\\posix_engine_listener.cc " + + "src\\core\\lib\\event_engine\\posix_engine\\posix_engine_listener_utils.cc " + "src\\core\\lib\\event_engine\\posix_engine\\tcp_socket_utils.cc " + "src\\core\\lib\\event_engine\\posix_engine\\timer.cc " + "src\\core\\lib\\event_engine\\posix_engine\\timer_heap.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index d6cd951978c..f3fbbf017e0 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -702,6 +702,8 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/posix_engine/posix_endpoint.h', 'src/core/lib/event_engine/posix_engine/posix_engine.h', 'src/core/lib/event_engine/posix_engine/posix_engine_closure.h', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener.h', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h', 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.h', 'src/core/lib/event_engine/posix_engine/timer.h', 'src/core/lib/event_engine/posix_engine/timer_heap.h', @@ -1587,6 +1589,8 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/posix_engine/posix_endpoint.h', 'src/core/lib/event_engine/posix_engine/posix_engine.h', 'src/core/lib/event_engine/posix_engine/posix_engine_closure.h', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener.h', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h', 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.h', 'src/core/lib/event_engine/posix_engine/timer.h', 'src/core/lib/event_engine/posix_engine/timer_heap.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 3a439bddefa..4d514c6aaa6 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1079,6 +1079,10 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/posix_engine/posix_engine.cc', 'src/core/lib/event_engine/posix_engine/posix_engine.h', 'src/core/lib/event_engine/posix_engine/posix_engine_closure.h', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener.cc', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener.h', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h', 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc', 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.h', 'src/core/lib/event_engine/posix_engine/timer.cc', @@ -2227,6 +2231,8 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/posix_engine/posix_endpoint.h', 'src/core/lib/event_engine/posix_engine/posix_engine.h', 'src/core/lib/event_engine/posix_engine/posix_engine_closure.h', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener.h', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h', 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.h', 'src/core/lib/event_engine/posix_engine/timer.h', 'src/core/lib/event_engine/posix_engine/timer_heap.h', diff --git a/grpc.gemspec b/grpc.gemspec index f4c99ecd4de..652b2eb789c 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -990,6 +990,10 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine.cc ) s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine.h ) s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine_closure.h ) + s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine_listener.cc ) + s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine_listener.h ) + s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc ) + s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h ) s.files += %w( src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc ) s.files += %w( src/core/lib/event_engine/posix_engine/tcp_socket_utils.h ) s.files += %w( src/core/lib/event_engine/posix_engine/timer.cc ) diff --git a/grpc.gyp b/grpc.gyp index 17717d51c11..5598ab5f5b2 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -813,6 +813,8 @@ 'src/core/lib/event_engine/posix_engine/lockfree_event.cc', 'src/core/lib/event_engine/posix_engine/posix_endpoint.cc', 'src/core/lib/event_engine/posix_engine/posix_engine.cc', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener.cc', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc', 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc', 'src/core/lib/event_engine/posix_engine/timer.cc', 'src/core/lib/event_engine/posix_engine/timer_heap.cc', @@ -1280,6 +1282,8 @@ 'src/core/lib/event_engine/posix_engine/lockfree_event.cc', 'src/core/lib/event_engine/posix_engine/posix_endpoint.cc', 'src/core/lib/event_engine/posix_engine/posix_engine.cc', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener.cc', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc', 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc', 'src/core/lib/event_engine/posix_engine/timer.cc', 'src/core/lib/event_engine/posix_engine/timer_heap.cc', diff --git a/package.xml b/package.xml index 4824cfc4fd7..301b340a7cc 100644 --- a/package.xml +++ b/package.xml @@ -972,6 +972,10 @@ + + + + diff --git a/src/core/BUILD b/src/core/BUILD index 327a6206355..8c8684e258b 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1732,6 +1732,37 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "posix_event_engine_listener", + srcs = [ + "lib/event_engine/posix_engine/posix_engine_listener.cc", + ], + hdrs = [ + "lib/event_engine/posix_engine/posix_engine_listener.h", + ], + external_deps = [ + "absl/base:core_headers", + "absl/functional:any_invocable", + "absl/status", + "absl/status:statusor", + "absl/strings", + "absl/synchronization", + "absl/types:optional", + ], + deps = [ + "iomgr_port", + "posix_event_engine_closure", + "posix_event_engine_endpoint", + "posix_event_engine_event_poller", + "posix_event_engine_listener_utils", + "posix_event_engine_tcp_socket_utils", + "socket_mutator", + "status_helper", + "//:event_engine_base_hdrs", + "//:gpr", + ], +) + grpc_cc_library( name = "posix_event_engine", srcs = ["lib/event_engine/posix_engine/posix_engine.cc"], @@ -1759,6 +1790,7 @@ grpc_cc_library( "posix_event_engine_closure", "posix_event_engine_endpoint", "posix_event_engine_event_poller", + "posix_event_engine_listener", "posix_event_engine_poller_posix_default", "posix_event_engine_tcp_socket_utils", "posix_event_engine_timer", diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc index 6071858b0dc..aade89b967e 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -50,6 +50,7 @@ #include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h" #include "src/core/lib/event_engine/posix_engine/posix_endpoint.h" +#include "src/core/lib/event_engine/posix_engine/posix_engine_listener.h" #endif // GRPC_POSIX_SOCKET_TCP // IWYU pragma: no_include @@ -62,6 +63,7 @@ namespace experimental { #ifdef GRPC_POSIX_SOCKET_TCP using ::grpc_event_engine::posix_engine::EventHandle; using ::grpc_event_engine::posix_engine::PosixEngineClosure; +using ::grpc_event_engine::posix_engine::PosixEngineListener; using ::grpc_event_engine::posix_engine::PosixEventPoller; using ::grpc_event_engine::posix_engine::PosixSocketWrapper; using ::grpc_event_engine::posix_engine::PosixTcpOptions; @@ -566,11 +568,19 @@ EventEngine::ConnectionHandle PosixEventEngine::Connect( absl::StatusOr> PosixEventEngine::CreateListener( - Listener::AcceptCallback /*on_accept*/, - absl::AnyInvocable /*on_shutdown*/, - const EndpointConfig& /*config*/, - std::unique_ptr /*memory_allocator_factory*/) { - GPR_ASSERT(false && "unimplemented"); + Listener::AcceptCallback on_accept, + absl::AnyInvocable on_shutdown, + const EndpointConfig& config, + std::unique_ptr memory_allocator_factory) { +#ifdef GRPC_POSIX_SOCKET_TCP + return std::make_unique( + std::move(on_accept), std::move(on_shutdown), config, + std::move(memory_allocator_factory), poller_manager_->Poller(), + shared_from_this()); +#else // GRPC_POSIX_SOCKET_TCP + GPR_ASSERT(false && + "EventEngine::CreateListener is not supported on this platform"); +#endif // GRPC_POSIX_SOCKET_TCP } } // namespace experimental diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc new file mode 100644 index 00000000000..42fed6baa28 --- /dev/null +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc @@ -0,0 +1,235 @@ +// Copyright 2022 gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/event_engine/posix_engine/posix_engine_listener.h" + +#include +#include + +#include "absl/functional/any_invocable.h" +#include "absl/status/status.h" +#include "absl/strings/str_cat.h" +#include "absl/types/optional.h" + +#include +#include +#include + +#include "src/core/lib/gprpp/status_helper.h" +#ifdef GRPC_POSIX_SOCKET_TCP +#include // IWYU pragma: keep +#include // IWYU pragma: keep +#include // IWYU pragma: keep + +#include "src/core/lib/event_engine/posix_engine/event_poller.h" +#include "src/core/lib/event_engine/posix_engine/posix_endpoint.h" +#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" +#include "src/core/lib/iomgr/socket_mutator.h" +#endif + +namespace grpc_event_engine { +namespace posix_engine { + +#ifdef GRPC_POSIX_SOCKET_TCP +PosixEngineListenerImpl::PosixEngineListenerImpl( + EventEngine::Listener::AcceptCallback on_accept, + absl::AnyInvocable on_shutdown, + const grpc_event_engine::experimental::EndpointConfig& config, + std::unique_ptr + memory_allocator_factory, + PosixEventPoller* poller, std::shared_ptr engine) + : poller_(poller), + options_(TcpOptionsFromEndpointConfig(config)), + engine_(std::move(engine)), + acceptors_(this), + on_accept_(std::move(on_accept)), + on_shutdown_(std::move(on_shutdown)), + memory_allocator_factory_(std::move(memory_allocator_factory)) {} + +absl::StatusOr PosixEngineListenerImpl::Bind( + const EventEngine::ResolvedAddress& addr) { + EventEngine::ResolvedAddress res_addr = addr; + EventEngine::ResolvedAddress addr6_v4mapped; + int requested_port = SockaddrGetPort(res_addr); + absl::MutexLock lock(&this->mu_); + GPR_ASSERT(!this->started_); + GPR_ASSERT(addr.size() <= EventEngine::ResolvedAddress::MAX_SIZE_BYTES); + UnlinkIfUnixDomainSocket(addr); + + /// Check if this is a wildcard port, and if so, try to keep the port the same + /// as some previously created listener socket. + for (auto it = acceptors_.begin(); + requested_port == 0 && it != acceptors_.end(); it++) { + EventEngine::ResolvedAddress sockname_temp; + socklen_t len = static_cast(sizeof(struct sockaddr_storage)); + if (0 == getsockname((*it)->Socket().sock.Fd(), + const_cast(sockname_temp.address()), + &len)) { + int used_port = SockaddrGetPort(sockname_temp); + if (used_port > 0) { + requested_port = used_port; + SockaddrSetPort(res_addr, requested_port); + break; + } + } + } + + auto used_port = SockaddrIsWildcard(res_addr); + if (used_port.has_value()) { + requested_port = *used_port; + return ListenerContainerAddWildcardAddresses(acceptors_, options_, + requested_port); + } + if (SockaddrToV4Mapped(&res_addr, &addr6_v4mapped)) { + res_addr = addr6_v4mapped; + } + + auto result = CreateAndPrepareListenerSocket(options_, res_addr); + GRPC_RETURN_IF_ERROR(result.status()); + acceptors_.Append(*result); + return result->port; +} + +void PosixEngineListenerImpl::AsyncConnectionAcceptor::Start() { + Ref(); + handle_->NotifyOnRead(notify_on_accept_); +} + +void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept( + absl::Status status) { + if (!status.ok()) { + // Shutting down the acceptor. Unref the ref grabbed in + // AsyncConnectionAcceptor::Start(). + Unref(); + return; + } + // loop until accept4 returns EAGAIN, and then re-arm notification. + for (;;) { + EventEngine::ResolvedAddress addr; + memset(const_cast(addr.address()), 0, addr.size()); + // Note: If we ever decide to return this address to the user, remember to + // strip off the ::ffff:0.0.0.0/96 prefix first. + int fd = Accept4(handle_->WrappedFd(), addr, 1, 1); + if (fd < 0) { + switch (errno) { + case EINTR: + continue; + case EAGAIN: + case ECONNABORTED: + handle_->NotifyOnRead(notify_on_accept_); + return; + default: + gpr_log(GPR_ERROR, "Closing acceptor. Failed accept4: %s", + strerror(errno)); + // Shutting down the acceptor. Unref the ref grabbed in + // AsyncConnectionAcceptor::Start(). + Unref(); + return; + } + } + + // For UNIX sockets, the accept call might not fill up the member + // sun_path of sockaddr_un, so explicitly call getsockname to get it. + if (addr.address()->sa_family == AF_UNIX) { + socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES; + if (getsockname(fd, const_cast(addr.address()), &len) < 0) { + gpr_log(GPR_ERROR, "Closing acceptor. Failed getsockname: %s", + strerror(errno)); + close(fd); + // Shutting down the acceptor. Unref the ref grabbed in + // AsyncConnectionAcceptor::Start(). + Unref(); + return; + } + } + + PosixSocketWrapper sock(fd); + (void)sock.SetSocketNoSigpipeIfPossible(); + auto result = sock.ApplySocketMutatorInOptions( + GRPC_FD_SERVER_CONNECTION_USAGE, listener_->options_); + if (!result.ok()) { + gpr_log(GPR_ERROR, "Closing acceptor. Failed to apply socket mutator: %s", + result.ToString().c_str()); + // Shutting down the acceptor. Unref the ref grabbed in + // AsyncConnectionAcceptor::Start(). + Unref(); + return; + } + + // Create an Endpoint here. + std::string peer_name = *SockaddrToString(&addr, true); + auto endpoint = CreatePosixEndpoint( + /*handle=*/listener_->poller_->CreateHandle( + fd, peer_name, listener_->poller_->CanTrackErrors()), + /*on_shutdown=*/nullptr, /*engine=*/listener_->engine_, + /*allocator=*/ + listener_->memory_allocator_factory_->CreateMemoryAllocator( + absl::StrCat("endpoint-tcp-server-connection: ", peer_name)), + /*options=*/listener_->options_); + // Call on_accept_ and then resume accepting new connections by continuing + // the parent for-loop. + listener_->on_accept_( + std::move(endpoint), + listener_->memory_allocator_factory_->CreateMemoryAllocator( + absl::StrCat("on-accept-tcp-server-connection: ", peer_name))); + } + GPR_UNREACHABLE_CODE(return); +} + +void PosixEngineListenerImpl::AsyncConnectionAcceptor::Shutdown() { + // The ShutdownHandle whould trigger any waiting notify_on_accept_ to get + // scheduled with the not-OK status. + handle_->ShutdownHandle(absl::InternalError("Shutting down acceptor")); + Unref(); +} + +absl::Status PosixEngineListenerImpl::Start() { + absl::MutexLock lock(&this->mu_); + // Start each asynchronous acceptor. + GPR_ASSERT(!this->started_); + this->started_ = true; + for (auto it = acceptors_.begin(); it != acceptors_.end(); it++) { + (*it)->Start(); + } + return absl::OkStatus(); +} + +void PosixEngineListenerImpl::TriggerShutdown() { + // This would get invoked from the destructor of the parent + // PosixEngineListener object. + absl::MutexLock lock(&this->mu_); + for (auto it = acceptors_.begin(); it != acceptors_.end(); it++) { + // Trigger shutdown of each asynchronous acceptor. This in-turn calls + // ShutdownHandle on the associated poller event handle. It may also + // immediately delete the asynchronous acceptor if the acceptor was never + // started. + (*it)->Shutdown(); + } +} + +PosixEngineListenerImpl::~PosixEngineListenerImpl() { + // This should get invoked only after all the AsyncConnectionAcceptor's have + // been destroyed. This is because each AsyncConnectionAcceptor has a + // shared_ptr ref to the parent PosixEngineListenerImpl. + if (on_shutdown_ != nullptr) { + on_shutdown_(absl::InternalError("Shutting down listener")); + } +} + +#endif // GRPC_POSIX_SOCKET_TCP + +} // namespace posix_engine +} // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener.h b/src/core/lib/event_engine/posix_engine/posix_engine_listener.h new file mode 100644 index 00000000000..dfb45642934 --- /dev/null +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener.h @@ -0,0 +1,223 @@ +// Copyright 2022 gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_H +#define GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_H + +#include + +#include + +#include +#include +#include +#include +#include + +#include "absl/base/thread_annotations.h" +#include "absl/functional/any_invocable.h" +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/synchronization/mutex.h" + +#include +#include +#include + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_SOCKET_TCP +#include "src/core/lib/event_engine/posix_engine/event_poller.h" +#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" +#include "src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h" +#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" +#endif + +namespace grpc_event_engine { +namespace posix_engine { + +#ifdef GRPC_POSIX_SOCKET_TCP +class PosixEngineListenerImpl + : public std::enable_shared_from_this { + public: + PosixEngineListenerImpl( + EventEngine::Listener::AcceptCallback on_accept, + absl::AnyInvocable on_shutdown, + const grpc_event_engine::experimental::EndpointConfig& config, + std::unique_ptr + memory_allocator_factory, + PosixEventPoller* poller, std::shared_ptr engine); + // Binds an address to the listener. This creates a ListenerSocket + // and sets its fields appropriately. + absl::StatusOr Bind(const EventEngine::ResolvedAddress& addr); + // Signals event manager to listen for connections on all created sockets. + absl::Status Start(); + // Trigger graceful shutdown of all asynchronous accept operations. + void TriggerShutdown(); + + ~PosixEngineListenerImpl(); + + private: + // This class represents accepting for one bind fd belonging to the listener. + // Each AsyncConnectionAcceptor takes a ref to the parent + // PosixEngineListenerImpl object. So the PosixEngineListenerImpl can be + // deleted only after all AsyncConnectionAcceptor's get destroyed. + class AsyncConnectionAcceptor { + public: + AsyncConnectionAcceptor(std::shared_ptr engine, + std::shared_ptr listener, + ListenerSocketsContainer::ListenerSocket socket) + : engine_(std::move(engine)), + listener_(std::move(listener)), + socket_(socket), + handle_(listener_->poller_->CreateHandle( + socket_.sock.Fd(), *SockaddrToString(&socket_.addr, true), + listener_->poller_->CanTrackErrors())), + notify_on_accept_(PosixEngineClosure::ToPermanentClosure( + [this](absl::Status status) { NotifyOnAccept(status); })){}; + // Start listening for incoming connections on the socket. + void Start(); + // Internal callback invoked when the socket has incoming connections to + // process. + void NotifyOnAccept(absl::Status status); + // Shutdown the poller handle associated with this socket. + void Shutdown(); + void Ref() { ref_count_.fetch_add(1, std::memory_order_relaxed); } + void Unref() { + if (ref_count_.fetch_sub(1, std::memory_order_acq_rel) == 1) { + delete this; + } + } + ListenerSocketsContainer::ListenerSocket& Socket() { return socket_; } + ~AsyncConnectionAcceptor() { + handle_->OrphanHandle(nullptr, nullptr, ""); + delete notify_on_accept_; + } + + private: + std::atomic ref_count_{1}; + std::shared_ptr engine_; + std::shared_ptr listener_; + ListenerSocketsContainer::ListenerSocket socket_; + EventHandle* handle_; + PosixEngineClosure* notify_on_accept_; + }; + class ListenerAsyncAcceptors : public ListenerSocketsContainer { + public: + explicit ListenerAsyncAcceptors(PosixEngineListenerImpl* listener) + : listener_(listener){}; + void Append(ListenerSocket socket) override { + acceptors_.push_back(new AsyncConnectionAcceptor( + listener_->engine_, listener_->shared_from_this(), socket)); + } + + absl::StatusOr Find( + const grpc_event_engine::experimental::EventEngine::ResolvedAddress& + addr) override { + for (auto acceptor = acceptors_.begin(); acceptor != acceptors_.end(); + ++acceptor) { + if ((*acceptor)->Socket().addr.size() == addr.size() && + memcmp((*acceptor)->Socket().addr.address(), addr.address(), + addr.size()) == 0) { + return (*acceptor)->Socket(); + } + } + return absl::NotFoundError("Socket not found!"); + } + + int Size() { return static_cast(acceptors_.size()); } + + std::list::const_iterator begin() { + return acceptors_.begin(); + } + std::list::const_iterator end() { + return acceptors_.end(); + } + + private: + std::list acceptors_; + PosixEngineListenerImpl* listener_; + }; + friend class ListenerAsyncAcceptors; + friend class AsyncConnectionAcceptor; + // The mutex ensures thread safety when multiple threads try to call Bind + // and Start in parallel. + absl::Mutex mu_; + PosixEventPoller* poller_; + PosixTcpOptions options_; + std::shared_ptr engine_; + // Linked list of sockets. One is created upon each successful bind + // operation. + ListenerAsyncAcceptors acceptors_ ABSL_GUARDED_BY(mu_); + // Callback to be invoked upon accepting a connection. + EventEngine::Listener::AcceptCallback on_accept_; + // Callback to be invoked upon shutdown of listener. + absl::AnyInvocable on_shutdown_; + // Set to true when the listener has started listening for new connections. + // Any further bind operations would fail. + bool started_ ABSL_GUARDED_BY(mu_) = false; + // Pointer to a slice allocator factory object which can generate + // unique slice allocators for each new incoming connection. + std::unique_ptr + memory_allocator_factory_; +}; + +class PosixEngineListener + : public grpc_event_engine::experimental::EventEngine::Listener { + public: + PosixEngineListener( + grpc_event_engine::experimental::EventEngine::Listener::AcceptCallback + on_accept, + absl::AnyInvocable on_shutdown, + const grpc_event_engine::experimental::EndpointConfig& config, + std::unique_ptr + memory_allocator_factory, + PosixEventPoller* poller, std::shared_ptr engine) + : impl_(std::make_shared( + std::move(on_accept), std::move(on_shutdown), config, + std::move(memory_allocator_factory), poller, std::move(engine))) {} + ~PosixEngineListener() override { impl_->TriggerShutdown(); }; + absl::StatusOr Bind( + const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr) + override { + return impl_->Bind(addr); + } + absl::Status Start() override { return impl_->Start(); } + + private: + std::shared_ptr impl_; +}; + +#else // GRPC_POSIX_SOCKET_TCP + +class PosixEngineListener + : public grpc_event_engine::experimental::EventEngine::Listener { + public: + PosixEngineListener() = default; + ~PosixEngineListener() override = default; + absl::StatusOr Bind(const grpc_event_engine::experimental::EventEngine:: + ResolvedAddress& /*addr*/) override { + GPR_ASSERT(false && + "EventEngine::Listener::Bind not supported on this platform"); + } + absl::Status Start() override { + GPR_ASSERT(false && + "EventEngine::Listener::Start not supported on this platform"); + } +}; + +#endif + +} // namespace posix_engine +} // namespace grpc_event_engine +#endif // GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_H diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc b/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc index 55c02408300..a87d6730bd5 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc @@ -19,7 +19,6 @@ #include #include #include -#include #include #include @@ -41,6 +40,7 @@ #include // IWYU pragma: keep #include // IWYU pragma: keep #include // IWYU pragma: keep +#include // IWYU pragma: keep #include "absl/strings/str_cat.h" #endif @@ -283,6 +283,9 @@ absl::StatusOr ListenerContainerAddAllLocalAddresses( return assigned_port; #else + (void)listener_sockets; + (void)options; + (void)requested_port; GPR_ASSERT(false && "System does not support ifaddrs"); #endif } diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index a09f7c3e5de..6d80d47ff69 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -458,6 +458,8 @@ CORE_SOURCE_FILES = [ 'src/core/lib/event_engine/posix_engine/lockfree_event.cc', 'src/core/lib/event_engine/posix_engine/posix_endpoint.cc', 'src/core/lib/event_engine/posix_engine/posix_engine.cc', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener.cc', + 'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc', 'src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc', 'src/core/lib/event_engine/posix_engine/timer.cc', 'src/core/lib/event_engine/posix_engine/timer_heap.cc', diff --git a/test/core/event_engine/test_suite/BUILD b/test/core/event_engine/test_suite/BUILD index e5ff970a95d..d690fad45d2 100644 --- a/test/core/event_engine/test_suite/BUILD +++ b/test/core/event_engine/test_suite/BUILD @@ -85,6 +85,7 @@ grpc_cc_test( deps = [ ":client", ":oracle_event_engine_posix", + ":server", "//src/core:posix_event_engine", "//test/core/event_engine/test_suite:timer", ], diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 0f1aabb3677..68dadd19e18 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1977,6 +1977,10 @@ src/core/lib/event_engine/posix_engine/posix_endpoint.h \ src/core/lib/event_engine/posix_engine/posix_engine.cc \ src/core/lib/event_engine/posix_engine/posix_engine.h \ src/core/lib/event_engine/posix_engine/posix_engine_closure.h \ +src/core/lib/event_engine/posix_engine/posix_engine_listener.cc \ +src/core/lib/event_engine/posix_engine/posix_engine_listener.h \ +src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc \ +src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h \ src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc \ src/core/lib/event_engine/posix_engine/tcp_socket_utils.h \ src/core/lib/event_engine/posix_engine/timer.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index b821183bd35..f9f529d5a5e 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1763,6 +1763,10 @@ src/core/lib/event_engine/posix_engine/posix_endpoint.h \ src/core/lib/event_engine/posix_engine/posix_engine.cc \ src/core/lib/event_engine/posix_engine/posix_engine.h \ src/core/lib/event_engine/posix_engine/posix_engine_closure.h \ +src/core/lib/event_engine/posix_engine/posix_engine_listener.cc \ +src/core/lib/event_engine/posix_engine/posix_engine_listener.h \ +src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc \ +src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h \ src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc \ src/core/lib/event_engine/posix_engine/tcp_socket_utils.h \ src/core/lib/event_engine/posix_engine/timer.cc \