From 2788b56867bb36aebdfe567ef1efdbce1799424e Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Wed, 12 Jul 2023 12:01:27 -0700 Subject: [PATCH] [iomgr][EventEngine] Improve server handling of file descriptor exhaustion (#33656) --- src/core/BUILD | 1 + .../event_engine/posix_engine/posix_engine.h | 1 + .../posix_engine/posix_engine_listener.cc | 29 ++++++++++++ .../posix_engine/posix_engine_listener.h | 3 ++ src/core/lib/iomgr/tcp_server_posix.cc | 46 ++++++++++++++----- src/core/lib/iomgr/tcp_server_utils_posix.h | 12 +++++ .../iomgr/tcp_server_utils_posix_common.cc | 21 +++++++++ 7 files changed, 101 insertions(+), 12 deletions(-) diff --git a/src/core/BUILD b/src/core/BUILD index 25277c162b6..fd5828ef742 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1944,6 +1944,7 @@ grpc_cc_library( "posix_event_engine_tcp_socket_utils", "socket_mutator", "status_helper", + "time", "//:event_engine_base_hdrs", "//:exec_ctx", "//:gpr", diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.h b/src/core/lib/event_engine/posix_engine/posix_engine.h index 2084ab4041f..815a663d3d0 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.h +++ b/src/core/lib/event_engine/posix_engine/posix_engine.h @@ -192,6 +192,7 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport, const DNSResolver::ResolverOptions& options) override; void Run(Closure* closure) override; void Run(absl::AnyInvocable closure) override; + // Caution!! The timer implementation cannot create any fds. See #20418. TaskHandle RunAfter(Duration when, Closure* closure) override; TaskHandle RunAfter(Duration when, absl::AnyInvocable closure) override; diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc index 59facad83b5..bb97756dc7c 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener.cc @@ -24,7 +24,9 @@ #include // IWYU pragma: keep #include // IWYU pragma: keep +#include #include +#include #include #include @@ -43,6 +45,7 @@ #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" #include "src/core/lib/event_engine/tcp_socket_utils.h" #include "src/core/lib/gprpp/status_helper.h" +#include "src/core/lib/gprpp/time.h" #include "src/core/lib/iomgr/socket_mutator.h" namespace grpc_event_engine { @@ -138,6 +141,32 @@ void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept( switch (errno) { case EINTR: continue; + case EMFILE: + // When the process runs out of fds, accept4() returns EMFILE. When + // this happens, the connection is left in the accept queue until + // either a read event triggers the on_read callback, or time has + // passed and the accept should be re-tried regardless. This callback + // is not cancelled, so a spurious wakeup may occur even when there's + // nothing to accept. This is not a performant code path, but if an fd + // limit has been reached, the system is likely in an unhappy state + // regardless. + GRPC_LOG_EVERY_N_SEC(1, GPR_ERROR, "%s", + "File descriptor limit reached. Retrying."); + handle_->NotifyOnRead(notify_on_accept_); + // Do not schedule another timer if one is already armed. + if (retry_timer_armed_.exchange(true)) return; + // Hold a ref while the retry timer is waiting, to prevent listener + // destruction and the races that would ensue. + Ref(); + std::ignore = + engine_->RunAfter(grpc_core::Duration::Seconds(1), [this]() { + retry_timer_armed_.store(false); + if (!handle_->IsHandleShutdown()) { + handle_->SetReadable(); + } + Unref(); + }); + return; case EAGAIN: case ECONNABORTED: handle_->NotifyOnRead(notify_on_accept_); diff --git a/src/core/lib/event_engine/posix_engine/posix_engine_listener.h b/src/core/lib/event_engine/posix_engine/posix_engine_listener.h index c0b793a5015..b24c2baad56 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine_listener.h +++ b/src/core/lib/event_engine/posix_engine/posix_engine_listener.h @@ -121,6 +121,9 @@ class PosixEngineListenerImpl ListenerSocketsContainer::ListenerSocket socket_; EventHandle* handle_; PosixEngineClosure* notify_on_accept_; + // Tracks the status of a backup timer to retry accept4 calls after file + // descriptor exhaustion. + std::atomic retry_timer_armed_{false}; }; class ListenerAsyncAcceptors : public ListenerSocketsContainer { public: diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 3c292ac28d0..1640c8baf61 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -18,6 +18,8 @@ #include +#include + #include // FIXME: "posix" files shouldn't be depending on _GNU_SOURCE @@ -79,6 +81,8 @@ #include "src/core/lib/transport/error_utils.h" static std::atomic num_dropped_connections{0}; +static constexpr grpc_core::Duration kRetryAcceptWaitTime{ + grpc_core::Duration::Seconds(1)}; using ::grpc_event_engine::experimental::EndpointConfig; using ::grpc_event_engine::experimental::EventEngine; @@ -362,22 +366,38 @@ static void on_read(void* arg, grpc_error_handle err) { if (fd < 0) { if (errno == EINTR) { continue; - } else if (errno == EAGAIN || errno == ECONNABORTED || - errno == EWOULDBLOCK) { + } + // When the process runs out of fds, accept4() returns EMFILE. When this + // happens, the connection is left in the accept queue until either a + // read event triggers the on_read callback, or time has passed and the + // accept should be re-tried regardless. This callback is not cancelled, + // so a spurious wakeup may occur even when there's nothing to accept. + // This is not a performant code path, but if an fd limit has been + // reached, the system is likely in an unhappy state regardless. + if (errno == EMFILE) { + GRPC_LOG_EVERY_N_SEC(1, GPR_ERROR, "%s", + "File descriptor limit reached. Retrying."); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + if (gpr_atm_full_xchg(&sp->retry_timer_armed, true)) return; + grpc_timer_init(&sp->retry_timer, + grpc_core::Timestamp::Now() + kRetryAcceptWaitTime, + &sp->retry_closure); + return; + } + if (errno == EAGAIN || errno == ECONNABORTED || errno == EWOULDBLOCK) { grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); return; + } + gpr_mu_lock(&sp->server->mu); + if (!sp->server->shutdown_listeners) { + gpr_log(GPR_ERROR, "Failed accept4: %s", + grpc_core::StrError(errno).c_str()); } else { - gpr_mu_lock(&sp->server->mu); - if (!sp->server->shutdown_listeners) { - gpr_log(GPR_ERROR, "Failed accept4: %s", - grpc_core::StrError(errno).c_str()); - } else { - // if we have shutdown listeners, accept4 could fail, and we - // needn't notify users - } - gpr_mu_unlock(&sp->server->mu); - goto error; + // if we have shutdown listeners, accept4 could fail, and we + // needn't notify users } + gpr_mu_unlock(&sp->server->mu); + goto error; } if (sp->server->memory_quota->IsMemoryPressureHigh()) { @@ -570,6 +590,7 @@ static grpc_error_handle clone_port(grpc_tcp_listener* listener, sp->port_index = listener->port_index; sp->fd_index = listener->fd_index + count - i; GPR_ASSERT(sp->emfd); + grpc_tcp_server_listener_initialize_retry_timer(sp); while (listener->server->tail->next != nullptr) { listener->server->tail = listener->server->tail->next; } @@ -818,6 +839,7 @@ static void tcp_server_shutdown_listeners(grpc_tcp_server* s) { if (s->active_ports) { grpc_tcp_listener* sp; for (sp = s->head; sp; sp = sp->next) { + grpc_timer_cancel(&sp->retry_timer); grpc_fd_shutdown(sp->emfd, GRPC_ERROR_CREATE("Server shutdown")); } } diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h index 7fbd772224b..afe6833e60d 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -30,6 +30,7 @@ #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_server.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/resource_quota/memory_quota.h" // one listening port @@ -52,6 +53,11 @@ typedef struct grpc_tcp_listener { // identified while iterating through 'next'. struct grpc_tcp_listener* sibling; int is_sibling; + // If an accept4() call fails, a timer is started to drain the accept queue in + // case no further connection attempts reach the gRPC server. + grpc_closure retry_closure; + grpc_timer retry_timer; + gpr_atm retry_timer_armed; } grpc_tcp_listener; // the overall server @@ -139,4 +145,10 @@ grpc_error_handle grpc_tcp_server_prepare_socket( // Ruturn true if the platform supports ifaddrs bool grpc_tcp_server_have_ifaddrs(void); +// Initialize (but don't start) the timer and callback to retry accept4() on a +// listening socket after file descriptors have been exhausted. This must be +// called when creating a new listener. +void grpc_tcp_server_listener_initialize_retry_timer( + grpc_tcp_listener* listener); + #endif // GRPC_SRC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index 8f75a3ed5e0..23dccc76e03 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -18,6 +18,8 @@ #include +#include + #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON @@ -82,6 +84,24 @@ static int get_max_accept_queue_size(void) { return s_max_accept_queue_size; } +static void listener_retry_timer_cb(void* arg, grpc_error_handle err) { + // Do nothing if cancelled. + if (!err.ok()) return; + grpc_tcp_listener* listener = static_cast(arg); + gpr_atm_no_barrier_store(&listener->retry_timer_armed, false); + if (!grpc_fd_is_shutdown(listener->emfd)) { + grpc_fd_set_readable(listener->emfd); + } +} + +void grpc_tcp_server_listener_initialize_retry_timer( + grpc_tcp_listener* listener) { + gpr_atm_no_barrier_store(&listener->retry_timer_armed, false); + grpc_timer_init_unset(&listener->retry_timer); + GRPC_CLOSURE_INIT(&listener->retry_closure, listener_retry_timer_cb, listener, + grpc_schedule_on_exec_ctx); +} + static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd, const grpc_resolved_address* addr, unsigned port_index, @@ -113,6 +133,7 @@ static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd, sp->server = s; sp->fd = fd; sp->emfd = grpc_fd_create(fd, name.c_str(), true); + grpc_tcp_server_listener_initialize_retry_timer(sp); // Check and set fd as prellocated if (grpc_tcp_server_pre_allocated_fd(s) == fd) {