Posix Event Engine listener implementation (#31513)

* Util functions to help with posix engine listener implementation

* sanity

* update comments in posix_engine_listener_utils.h

* review comments

* iwyu

* revert prev commit

* iwyu

* update build

* update

* regenerate projects

* regenerate projects

* minor fixes

* update BUILD

* sanity

* update build

* regenerate projects

* fix unused parameter

* sanity

* update

* sanity

* regenerate_projects

* remove unused variable
pull/31678/head
Vignesh Babu 2 years ago committed by GitHub
parent 0d4000dd48
commit c5a66bb08d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      CMakeLists.txt
  2. 4
      Makefile
  3. 13
      build_autogenerated.yaml
  4. 2
      config.m4
  5. 2
      config.w32
  6. 4
      gRPC-C++.podspec
  7. 6
      gRPC-Core.podspec
  8. 4
      grpc.gemspec
  9. 4
      grpc.gyp
  10. 4
      package.xml
  11. 32
      src/core/BUILD
  12. 20
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  13. 235
      src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
  14. 223
      src/core/lib/event_engine/posix_engine/posix_engine_listener.h
  15. 5
      src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
  16. 2
      src/python/grpcio/grpc_core_dependencies.py
  17. 1
      test/core/event_engine/test_suite/BUILD
  18. 4
      tools/doxygen/Doxyfile.c++.internal
  19. 4
      tools/doxygen/Doxyfile.core.internal

6
CMakeLists.txt generated

@ -2102,6 +2102,8 @@ add_library(grpc
src/core/lib/event_engine/posix_engine/lockfree_event.cc
src/core/lib/event_engine/posix_engine/posix_endpoint.cc
src/core/lib/event_engine/posix_engine/posix_engine.cc
src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
src/core/lib/event_engine/posix_engine/timer.cc
src/core/lib/event_engine/posix_engine/timer_heap.cc
@ -2763,6 +2765,8 @@ add_library(grpc_unsecure
src/core/lib/event_engine/posix_engine/lockfree_event.cc
src/core/lib/event_engine/posix_engine/posix_endpoint.cc
src/core/lib/event_engine/posix_engine/posix_engine.cc
src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
src/core/lib/event_engine/posix_engine/timer.cc
src/core/lib/event_engine/posix_engine/timer_heap.cc
@ -15981,7 +15985,6 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(posix_engine_listener_utils_test
src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
test/core/event_engine/posix/posix_engine_listener_utils_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
@ -16068,6 +16071,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
test/core/event_engine/test_suite/event_engine_test_utils.cc
test/core/event_engine/test_suite/oracle_event_engine_posix.cc
test/core/event_engine/test_suite/posix_event_engine_test.cc
test/core/event_engine/test_suite/server_test.cc
test/core/event_engine/test_suite/timer_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc

4
Makefile generated

@ -1401,6 +1401,8 @@ LIBGRPC_SRC = \
src/core/lib/event_engine/posix_engine/lockfree_event.cc \
src/core/lib/event_engine/posix_engine/posix_endpoint.cc \
src/core/lib/event_engine/posix_engine/posix_engine.cc \
src/core/lib/event_engine/posix_engine/posix_engine_listener.cc \
src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc \
src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/posix_engine/timer.cc \
src/core/lib/event_engine/posix_engine/timer_heap.cc \
@ -1921,6 +1923,8 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/event_engine/posix_engine/lockfree_event.cc \
src/core/lib/event_engine/posix_engine/posix_endpoint.cc \
src/core/lib/event_engine/posix_engine/posix_engine.cc \
src/core/lib/event_engine/posix_engine/posix_engine_listener.cc \
src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc \
src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/posix_engine/timer.cc \
src/core/lib/event_engine/posix_engine/timer_heap.cc \

@ -754,6 +754,8 @@ libs:
- src/core/lib/event_engine/posix_engine/posix_endpoint.h
- src/core/lib/event_engine/posix_engine/posix_engine.h
- src/core/lib/event_engine/posix_engine/posix_engine_closure.h
- src/core/lib/event_engine/posix_engine/posix_engine_listener.h
- src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h
- src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
- src/core/lib/event_engine/posix_engine/timer.h
- src/core/lib/event_engine/posix_engine/timer_heap.h
@ -1485,6 +1487,8 @@ libs:
- src/core/lib/event_engine/posix_engine/lockfree_event.cc
- src/core/lib/event_engine/posix_engine/posix_endpoint.cc
- src/core/lib/event_engine/posix_engine/posix_engine.cc
- src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
- src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
- src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/posix_engine/timer.cc
- src/core/lib/event_engine/posix_engine/timer_heap.cc
@ -2025,6 +2029,8 @@ libs:
- src/core/lib/event_engine/posix_engine/posix_endpoint.h
- src/core/lib/event_engine/posix_engine/posix_engine.h
- src/core/lib/event_engine/posix_engine/posix_engine_closure.h
- src/core/lib/event_engine/posix_engine/posix_engine_listener.h
- src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h
- src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
- src/core/lib/event_engine/posix_engine/timer.h
- src/core/lib/event_engine/posix_engine/timer_heap.h
@ -2400,6 +2406,8 @@ libs:
- src/core/lib/event_engine/posix_engine/lockfree_event.cc
- src/core/lib/event_engine/posix_engine/posix_endpoint.cc
- src/core/lib/event_engine/posix_engine/posix_engine.cc
- src/core/lib/event_engine/posix_engine/posix_engine_listener.cc
- src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
- src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/posix_engine/timer.cc
- src/core/lib/event_engine/posix_engine/timer_heap.cc
@ -9607,10 +9615,8 @@ targets:
gtest: true
build: test
language: c++
headers:
- src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h
headers: []
src:
- src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc
- test/core/event_engine/posix/posix_engine_listener_utils_test.cc
deps:
- grpc_test_util
@ -9651,6 +9657,7 @@ targets:
- test/core/event_engine/test_suite/event_engine_test_utils.cc
- test/core/event_engine/test_suite/oracle_event_engine_posix.cc
- test/core/event_engine/test_suite/posix_event_engine_test.cc
- test/core/event_engine/test_suite/server_test.cc
- test/core/event_engine/test_suite/timer_test.cc
deps:
- grpc_test_util

2
config.m4 generated

@ -483,6 +483,8 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/event_engine/posix_engine/lockfree_event.cc \
src/core/lib/event_engine/posix_engine/posix_endpoint.cc \
src/core/lib/event_engine/posix_engine/posix_engine.cc \
src/core/lib/event_engine/posix_engine/posix_engine_listener.cc \
src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc \
src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/posix_engine/timer.cc \
src/core/lib/event_engine/posix_engine/timer_heap.cc \

2
config.w32 generated

@ -449,6 +449,8 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\event_engine\\posix_engine\\lockfree_event.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\posix_endpoint.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\posix_engine.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\posix_engine_listener.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\posix_engine_listener_utils.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\tcp_socket_utils.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\timer.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\timer_heap.cc " +

4
gRPC-C++.podspec generated

@ -702,6 +702,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/posix_endpoint.h',
'src/core/lib/event_engine/posix_engine/posix_engine.h',
'src/core/lib/event_engine/posix_engine/posix_engine_closure.h',
'src/core/lib/event_engine/posix_engine/posix_engine_listener.h',
'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h',
'src/core/lib/event_engine/posix_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/posix_engine/timer.h',
'src/core/lib/event_engine/posix_engine/timer_heap.h',
@ -1587,6 +1589,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/posix_endpoint.h',
'src/core/lib/event_engine/posix_engine/posix_engine.h',
'src/core/lib/event_engine/posix_engine/posix_engine_closure.h',
'src/core/lib/event_engine/posix_engine/posix_engine_listener.h',
'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h',
'src/core/lib/event_engine/posix_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/posix_engine/timer.h',
'src/core/lib/event_engine/posix_engine/timer_heap.h',

6
gRPC-Core.podspec generated

@ -1079,6 +1079,10 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/posix_engine.cc',
'src/core/lib/event_engine/posix_engine/posix_engine.h',
'src/core/lib/event_engine/posix_engine/posix_engine_closure.h',
'src/core/lib/event_engine/posix_engine/posix_engine_listener.cc',
'src/core/lib/event_engine/posix_engine/posix_engine_listener.h',
'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc',
'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h',
'src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/posix_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/posix_engine/timer.cc',
@ -2227,6 +2231,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/posix_engine/posix_endpoint.h',
'src/core/lib/event_engine/posix_engine/posix_engine.h',
'src/core/lib/event_engine/posix_engine/posix_engine_closure.h',
'src/core/lib/event_engine/posix_engine/posix_engine_listener.h',
'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h',
'src/core/lib/event_engine/posix_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/posix_engine/timer.h',
'src/core/lib/event_engine/posix_engine/timer_heap.h',

4
grpc.gemspec generated

@ -990,6 +990,10 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine.cc )
s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine.h )
s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine_closure.h )
s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine_listener.cc )
s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine_listener.h )
s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc )
s.files += %w( src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h )
s.files += %w( src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc )
s.files += %w( src/core/lib/event_engine/posix_engine/tcp_socket_utils.h )
s.files += %w( src/core/lib/event_engine/posix_engine/timer.cc )

4
grpc.gyp generated

@ -813,6 +813,8 @@
'src/core/lib/event_engine/posix_engine/lockfree_event.cc',
'src/core/lib/event_engine/posix_engine/posix_endpoint.cc',
'src/core/lib/event_engine/posix_engine/posix_engine.cc',
'src/core/lib/event_engine/posix_engine/posix_engine_listener.cc',
'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc',
'src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/posix_engine/timer.cc',
'src/core/lib/event_engine/posix_engine/timer_heap.cc',
@ -1280,6 +1282,8 @@
'src/core/lib/event_engine/posix_engine/lockfree_event.cc',
'src/core/lib/event_engine/posix_engine/posix_endpoint.cc',
'src/core/lib/event_engine/posix_engine/posix_engine.cc',
'src/core/lib/event_engine/posix_engine/posix_engine_listener.cc',
'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc',
'src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/posix_engine/timer.cc',
'src/core/lib/event_engine/posix_engine/timer_heap.cc',

4
package.xml generated

@ -972,6 +972,10 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/posix_engine.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/posix_engine.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/posix_engine_closure.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/posix_engine_listener.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/posix_engine_listener.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/posix_engine/timer.cc" role="src" />

@ -1732,6 +1732,37 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "posix_event_engine_listener",
srcs = [
"lib/event_engine/posix_engine/posix_engine_listener.cc",
],
hdrs = [
"lib/event_engine/posix_engine/posix_engine_listener.h",
],
external_deps = [
"absl/base:core_headers",
"absl/functional:any_invocable",
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/synchronization",
"absl/types:optional",
],
deps = [
"iomgr_port",
"posix_event_engine_closure",
"posix_event_engine_endpoint",
"posix_event_engine_event_poller",
"posix_event_engine_listener_utils",
"posix_event_engine_tcp_socket_utils",
"socket_mutator",
"status_helper",
"//:event_engine_base_hdrs",
"//:gpr",
],
)
grpc_cc_library(
name = "posix_event_engine",
srcs = ["lib/event_engine/posix_engine/posix_engine.cc"],
@ -1759,6 +1790,7 @@ grpc_cc_library(
"posix_event_engine_closure",
"posix_event_engine_endpoint",
"posix_event_engine_event_poller",
"posix_event_engine_listener",
"posix_event_engine_poller_posix_default",
"posix_event_engine_tcp_socket_utils",
"posix_event_engine_timer",

@ -50,6 +50,7 @@
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
#include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_listener.h"
#endif // GRPC_POSIX_SOCKET_TCP
// IWYU pragma: no_include <ratio>
@ -62,6 +63,7 @@ namespace experimental {
#ifdef GRPC_POSIX_SOCKET_TCP
using ::grpc_event_engine::posix_engine::EventHandle;
using ::grpc_event_engine::posix_engine::PosixEngineClosure;
using ::grpc_event_engine::posix_engine::PosixEngineListener;
using ::grpc_event_engine::posix_engine::PosixEventPoller;
using ::grpc_event_engine::posix_engine::PosixSocketWrapper;
using ::grpc_event_engine::posix_engine::PosixTcpOptions;
@ -566,11 +568,19 @@ EventEngine::ConnectionHandle PosixEventEngine::Connect(
absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
PosixEventEngine::CreateListener(
Listener::AcceptCallback /*on_accept*/,
absl::AnyInvocable<void(absl::Status)> /*on_shutdown*/,
const EndpointConfig& /*config*/,
std::unique_ptr<MemoryAllocatorFactory> /*memory_allocator_factory*/) {
GPR_ASSERT(false && "unimplemented");
Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
#ifdef GRPC_POSIX_SOCKET_TCP
return std::make_unique<PosixEngineListener>(
std::move(on_accept), std::move(on_shutdown), config,
std::move(memory_allocator_factory), poller_manager_->Poller(),
shared_from_this());
#else // GRPC_POSIX_SOCKET_TCP
GPR_ASSERT(false &&
"EventEngine::CreateListener is not supported on this platform");
#endif // GRPC_POSIX_SOCKET_TCP
}
} // namespace experimental

@ -0,0 +1,235 @@
// Copyright 2022 gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/posix_engine/posix_engine_listener.h"
#include <string>
#include <utility>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/status_helper.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#include <errno.h> // IWYU pragma: keep
#include <sys/socket.h> // IWYU pragma: keep
#include <unistd.h> // IWYU pragma: keep
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/iomgr/socket_mutator.h"
#endif
namespace grpc_event_engine {
namespace posix_engine {
#ifdef GRPC_POSIX_SOCKET_TCP
PosixEngineListenerImpl::PosixEngineListenerImpl(
EventEngine::Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const grpc_event_engine::experimental::EndpointConfig& config,
std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory>
memory_allocator_factory,
PosixEventPoller* poller, std::shared_ptr<EventEngine> engine)
: poller_(poller),
options_(TcpOptionsFromEndpointConfig(config)),
engine_(std::move(engine)),
acceptors_(this),
on_accept_(std::move(on_accept)),
on_shutdown_(std::move(on_shutdown)),
memory_allocator_factory_(std::move(memory_allocator_factory)) {}
absl::StatusOr<int> PosixEngineListenerImpl::Bind(
const EventEngine::ResolvedAddress& addr) {
EventEngine::ResolvedAddress res_addr = addr;
EventEngine::ResolvedAddress addr6_v4mapped;
int requested_port = SockaddrGetPort(res_addr);
absl::MutexLock lock(&this->mu_);
GPR_ASSERT(!this->started_);
GPR_ASSERT(addr.size() <= EventEngine::ResolvedAddress::MAX_SIZE_BYTES);
UnlinkIfUnixDomainSocket(addr);
/// Check if this is a wildcard port, and if so, try to keep the port the same
/// as some previously created listener socket.
for (auto it = acceptors_.begin();
requested_port == 0 && it != acceptors_.end(); it++) {
EventEngine::ResolvedAddress sockname_temp;
socklen_t len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
if (0 == getsockname((*it)->Socket().sock.Fd(),
const_cast<sockaddr*>(sockname_temp.address()),
&len)) {
int used_port = SockaddrGetPort(sockname_temp);
if (used_port > 0) {
requested_port = used_port;
SockaddrSetPort(res_addr, requested_port);
break;
}
}
}
auto used_port = SockaddrIsWildcard(res_addr);
if (used_port.has_value()) {
requested_port = *used_port;
return ListenerContainerAddWildcardAddresses(acceptors_, options_,
requested_port);
}
if (SockaddrToV4Mapped(&res_addr, &addr6_v4mapped)) {
res_addr = addr6_v4mapped;
}
auto result = CreateAndPrepareListenerSocket(options_, res_addr);
GRPC_RETURN_IF_ERROR(result.status());
acceptors_.Append(*result);
return result->port;
}
void PosixEngineListenerImpl::AsyncConnectionAcceptor::Start() {
Ref();
handle_->NotifyOnRead(notify_on_accept_);
}
void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
absl::Status status) {
if (!status.ok()) {
// Shutting down the acceptor. Unref the ref grabbed in
// AsyncConnectionAcceptor::Start().
Unref();
return;
}
// loop until accept4 returns EAGAIN, and then re-arm notification.
for (;;) {
EventEngine::ResolvedAddress addr;
memset(const_cast<sockaddr*>(addr.address()), 0, addr.size());
// Note: If we ever decide to return this address to the user, remember to
// strip off the ::ffff:0.0.0.0/96 prefix first.
int fd = Accept4(handle_->WrappedFd(), addr, 1, 1);
if (fd < 0) {
switch (errno) {
case EINTR:
continue;
case EAGAIN:
case ECONNABORTED:
handle_->NotifyOnRead(notify_on_accept_);
return;
default:
gpr_log(GPR_ERROR, "Closing acceptor. Failed accept4: %s",
strerror(errno));
// Shutting down the acceptor. Unref the ref grabbed in
// AsyncConnectionAcceptor::Start().
Unref();
return;
}
}
// For UNIX sockets, the accept call might not fill up the member
// sun_path of sockaddr_un, so explicitly call getsockname to get it.
if (addr.address()->sa_family == AF_UNIX) {
socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
if (getsockname(fd, const_cast<sockaddr*>(addr.address()), &len) < 0) {
gpr_log(GPR_ERROR, "Closing acceptor. Failed getsockname: %s",
strerror(errno));
close(fd);
// Shutting down the acceptor. Unref the ref grabbed in
// AsyncConnectionAcceptor::Start().
Unref();
return;
}
}
PosixSocketWrapper sock(fd);
(void)sock.SetSocketNoSigpipeIfPossible();
auto result = sock.ApplySocketMutatorInOptions(
GRPC_FD_SERVER_CONNECTION_USAGE, listener_->options_);
if (!result.ok()) {
gpr_log(GPR_ERROR, "Closing acceptor. Failed to apply socket mutator: %s",
result.ToString().c_str());
// Shutting down the acceptor. Unref the ref grabbed in
// AsyncConnectionAcceptor::Start().
Unref();
return;
}
// Create an Endpoint here.
std::string peer_name = *SockaddrToString(&addr, true);
auto endpoint = CreatePosixEndpoint(
/*handle=*/listener_->poller_->CreateHandle(
fd, peer_name, listener_->poller_->CanTrackErrors()),
/*on_shutdown=*/nullptr, /*engine=*/listener_->engine_,
/*allocator=*/
listener_->memory_allocator_factory_->CreateMemoryAllocator(
absl::StrCat("endpoint-tcp-server-connection: ", peer_name)),
/*options=*/listener_->options_);
// Call on_accept_ and then resume accepting new connections by continuing
// the parent for-loop.
listener_->on_accept_(
std::move(endpoint),
listener_->memory_allocator_factory_->CreateMemoryAllocator(
absl::StrCat("on-accept-tcp-server-connection: ", peer_name)));
}
GPR_UNREACHABLE_CODE(return);
}
void PosixEngineListenerImpl::AsyncConnectionAcceptor::Shutdown() {
// The ShutdownHandle whould trigger any waiting notify_on_accept_ to get
// scheduled with the not-OK status.
handle_->ShutdownHandle(absl::InternalError("Shutting down acceptor"));
Unref();
}
absl::Status PosixEngineListenerImpl::Start() {
absl::MutexLock lock(&this->mu_);
// Start each asynchronous acceptor.
GPR_ASSERT(!this->started_);
this->started_ = true;
for (auto it = acceptors_.begin(); it != acceptors_.end(); it++) {
(*it)->Start();
}
return absl::OkStatus();
}
void PosixEngineListenerImpl::TriggerShutdown() {
// This would get invoked from the destructor of the parent
// PosixEngineListener object.
absl::MutexLock lock(&this->mu_);
for (auto it = acceptors_.begin(); it != acceptors_.end(); it++) {
// Trigger shutdown of each asynchronous acceptor. This in-turn calls
// ShutdownHandle on the associated poller event handle. It may also
// immediately delete the asynchronous acceptor if the acceptor was never
// started.
(*it)->Shutdown();
}
}
PosixEngineListenerImpl::~PosixEngineListenerImpl() {
// This should get invoked only after all the AsyncConnectionAcceptor's have
// been destroyed. This is because each AsyncConnectionAcceptor has a
// shared_ptr ref to the parent PosixEngineListenerImpl.
if (on_shutdown_ != nullptr) {
on_shutdown_(absl::InternalError("Shutting down listener"));
}
}
#endif // GRPC_POSIX_SOCKET_TCP
} // namespace posix_engine
} // namespace grpc_event_engine

@ -0,0 +1,223 @@
// Copyright 2022 gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_H
#define GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_H
#include <grpc/support/port_platform.h>
#include <string.h>
#include <atomic>
#include <list>
#include <memory>
#include <string>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/synchronization/mutex.h"
#include <grpc/event_engine/endpoint_config.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h"
#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#endif
namespace grpc_event_engine {
namespace posix_engine {
#ifdef GRPC_POSIX_SOCKET_TCP
class PosixEngineListenerImpl
: public std::enable_shared_from_this<PosixEngineListenerImpl> {
public:
PosixEngineListenerImpl(
EventEngine::Listener::AcceptCallback on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const grpc_event_engine::experimental::EndpointConfig& config,
std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory>
memory_allocator_factory,
PosixEventPoller* poller, std::shared_ptr<EventEngine> engine);
// Binds an address to the listener. This creates a ListenerSocket
// and sets its fields appropriately.
absl::StatusOr<int> Bind(const EventEngine::ResolvedAddress& addr);
// Signals event manager to listen for connections on all created sockets.
absl::Status Start();
// Trigger graceful shutdown of all asynchronous accept operations.
void TriggerShutdown();
~PosixEngineListenerImpl();
private:
// This class represents accepting for one bind fd belonging to the listener.
// Each AsyncConnectionAcceptor takes a ref to the parent
// PosixEngineListenerImpl object. So the PosixEngineListenerImpl can be
// deleted only after all AsyncConnectionAcceptor's get destroyed.
class AsyncConnectionAcceptor {
public:
AsyncConnectionAcceptor(std::shared_ptr<EventEngine> engine,
std::shared_ptr<PosixEngineListenerImpl> listener,
ListenerSocketsContainer::ListenerSocket socket)
: engine_(std::move(engine)),
listener_(std::move(listener)),
socket_(socket),
handle_(listener_->poller_->CreateHandle(
socket_.sock.Fd(), *SockaddrToString(&socket_.addr, true),
listener_->poller_->CanTrackErrors())),
notify_on_accept_(PosixEngineClosure::ToPermanentClosure(
[this](absl::Status status) { NotifyOnAccept(status); })){};
// Start listening for incoming connections on the socket.
void Start();
// Internal callback invoked when the socket has incoming connections to
// process.
void NotifyOnAccept(absl::Status status);
// Shutdown the poller handle associated with this socket.
void Shutdown();
void Ref() { ref_count_.fetch_add(1, std::memory_order_relaxed); }
void Unref() {
if (ref_count_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
delete this;
}
}
ListenerSocketsContainer::ListenerSocket& Socket() { return socket_; }
~AsyncConnectionAcceptor() {
handle_->OrphanHandle(nullptr, nullptr, "");
delete notify_on_accept_;
}
private:
std::atomic<int> ref_count_{1};
std::shared_ptr<EventEngine> engine_;
std::shared_ptr<PosixEngineListenerImpl> listener_;
ListenerSocketsContainer::ListenerSocket socket_;
EventHandle* handle_;
PosixEngineClosure* notify_on_accept_;
};
class ListenerAsyncAcceptors : public ListenerSocketsContainer {
public:
explicit ListenerAsyncAcceptors(PosixEngineListenerImpl* listener)
: listener_(listener){};
void Append(ListenerSocket socket) override {
acceptors_.push_back(new AsyncConnectionAcceptor(
listener_->engine_, listener_->shared_from_this(), socket));
}
absl::StatusOr<ListenerSocket> Find(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
addr) override {
for (auto acceptor = acceptors_.begin(); acceptor != acceptors_.end();
++acceptor) {
if ((*acceptor)->Socket().addr.size() == addr.size() &&
memcmp((*acceptor)->Socket().addr.address(), addr.address(),
addr.size()) == 0) {
return (*acceptor)->Socket();
}
}
return absl::NotFoundError("Socket not found!");
}
int Size() { return static_cast<int>(acceptors_.size()); }
std::list<AsyncConnectionAcceptor*>::const_iterator begin() {
return acceptors_.begin();
}
std::list<AsyncConnectionAcceptor*>::const_iterator end() {
return acceptors_.end();
}
private:
std::list<AsyncConnectionAcceptor*> acceptors_;
PosixEngineListenerImpl* listener_;
};
friend class ListenerAsyncAcceptors;
friend class AsyncConnectionAcceptor;
// The mutex ensures thread safety when multiple threads try to call Bind
// and Start in parallel.
absl::Mutex mu_;
PosixEventPoller* poller_;
PosixTcpOptions options_;
std::shared_ptr<EventEngine> engine_;
// Linked list of sockets. One is created upon each successful bind
// operation.
ListenerAsyncAcceptors acceptors_ ABSL_GUARDED_BY(mu_);
// Callback to be invoked upon accepting a connection.
EventEngine::Listener::AcceptCallback on_accept_;
// Callback to be invoked upon shutdown of listener.
absl::AnyInvocable<void(absl::Status)> on_shutdown_;
// Set to true when the listener has started listening for new connections.
// Any further bind operations would fail.
bool started_ ABSL_GUARDED_BY(mu_) = false;
// Pointer to a slice allocator factory object which can generate
// unique slice allocators for each new incoming connection.
std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory>
memory_allocator_factory_;
};
class PosixEngineListener
: public grpc_event_engine::experimental::EventEngine::Listener {
public:
PosixEngineListener(
grpc_event_engine::experimental::EventEngine::Listener::AcceptCallback
on_accept,
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const grpc_event_engine::experimental::EndpointConfig& config,
std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory>
memory_allocator_factory,
PosixEventPoller* poller, std::shared_ptr<EventEngine> engine)
: impl_(std::make_shared<PosixEngineListenerImpl>(
std::move(on_accept), std::move(on_shutdown), config,
std::move(memory_allocator_factory), poller, std::move(engine))) {}
~PosixEngineListener() override { impl_->TriggerShutdown(); };
absl::StatusOr<int> Bind(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr)
override {
return impl_->Bind(addr);
}
absl::Status Start() override { return impl_->Start(); }
private:
std::shared_ptr<PosixEngineListenerImpl> impl_;
};
#else // GRPC_POSIX_SOCKET_TCP
class PosixEngineListener
: public grpc_event_engine::experimental::EventEngine::Listener {
public:
PosixEngineListener() = default;
~PosixEngineListener() override = default;
absl::StatusOr<int> Bind(const grpc_event_engine::experimental::EventEngine::
ResolvedAddress& /*addr*/) override {
GPR_ASSERT(false &&
"EventEngine::Listener::Bind not supported on this platform");
}
absl::Status Start() override {
GPR_ASSERT(false &&
"EventEngine::Listener::Start not supported on this platform");
}
};
#endif
} // namespace posix_engine
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_H

@ -19,7 +19,6 @@
#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <cstring>
#include <string>
@ -41,6 +40,7 @@
#include <ifaddrs.h> // IWYU pragma: keep
#include <netinet/in.h> // IWYU pragma: keep
#include <sys/socket.h> // IWYU pragma: keep
#include <unistd.h> // IWYU pragma: keep
#include "absl/strings/str_cat.h"
#endif
@ -283,6 +283,9 @@ absl::StatusOr<int> ListenerContainerAddAllLocalAddresses(
return assigned_port;
#else
(void)listener_sockets;
(void)options;
(void)requested_port;
GPR_ASSERT(false && "System does not support ifaddrs");
#endif
}

@ -458,6 +458,8 @@ CORE_SOURCE_FILES = [
'src/core/lib/event_engine/posix_engine/lockfree_event.cc',
'src/core/lib/event_engine/posix_engine/posix_endpoint.cc',
'src/core/lib/event_engine/posix_engine/posix_engine.cc',
'src/core/lib/event_engine/posix_engine/posix_engine_listener.cc',
'src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc',
'src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/posix_engine/timer.cc',
'src/core/lib/event_engine/posix_engine/timer_heap.cc',

@ -85,6 +85,7 @@ grpc_cc_test(
deps = [
":client",
":oracle_event_engine_posix",
":server",
"//src/core:posix_event_engine",
"//test/core/event_engine/test_suite:timer",
],

@ -1977,6 +1977,10 @@ src/core/lib/event_engine/posix_engine/posix_endpoint.h \
src/core/lib/event_engine/posix_engine/posix_engine.cc \
src/core/lib/event_engine/posix_engine/posix_engine.h \
src/core/lib/event_engine/posix_engine/posix_engine_closure.h \
src/core/lib/event_engine/posix_engine/posix_engine_listener.cc \
src/core/lib/event_engine/posix_engine/posix_engine_listener.h \
src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc \
src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h \
src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/posix_engine/tcp_socket_utils.h \
src/core/lib/event_engine/posix_engine/timer.cc \

@ -1763,6 +1763,10 @@ src/core/lib/event_engine/posix_engine/posix_endpoint.h \
src/core/lib/event_engine/posix_engine/posix_engine.cc \
src/core/lib/event_engine/posix_engine/posix_engine.h \
src/core/lib/event_engine/posix_engine/posix_engine_closure.h \
src/core/lib/event_engine/posix_engine/posix_engine_listener.cc \
src/core/lib/event_engine/posix_engine/posix_engine_listener.h \
src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.cc \
src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h \
src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/posix_engine/tcp_socket_utils.h \
src/core/lib/event_engine/posix_engine/timer.cc \

Loading…
Cancel
Save