diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index fb4c71ef71b..16562538a63 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -60,6 +60,19 @@ typedef struct grpc_fd_watcher { grpc_fd* fd; } grpc_fd_watcher; +typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd; + +/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */ +struct grpc_fork_fd_list { + /* Only one of fd or cached_wakeup_fd will be set. The unused field will be + set to nullptr. */ + grpc_fd* fd; + grpc_cached_wakeup_fd* cached_wakeup_fd; + + grpc_fork_fd_list* next; + grpc_fork_fd_list* prev; +}; + struct grpc_fd { int fd; /* refst format: @@ -108,8 +121,18 @@ struct grpc_fd { grpc_closure* on_done_closure; grpc_iomgr_object iomgr_object; + + /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */ + grpc_fork_fd_list* fork_fd_list; }; +/* True when GRPC_ENABLE_FORK_SUPPORT=1. We do not support fork with poll-cv */ +static bool track_fds_for_fork = false; + +/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */ +static grpc_fork_fd_list* fork_fd_list_head = nullptr; +static gpr_mu fork_fd_list_mu; + /* Begin polling on an fd. Registers that the given pollset is interested in this fd - so that if read or writability interest changes, the pollset can be kicked to pick up that @@ -156,6 +179,9 @@ static void fd_unref(grpc_fd* fd); typedef struct grpc_cached_wakeup_fd { grpc_wakeup_fd fd; struct grpc_cached_wakeup_fd* next; + + /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */ + grpc_fork_fd_list* fork_fd_list; } grpc_cached_wakeup_fd; struct grpc_pollset_worker { @@ -281,9 +307,61 @@ poll_hash_table poll_cache; grpc_cv_fd_table g_cvfds; /******************************************************************************* - * fd_posix.c + * functions to track opened fds. No-ops unless track_fds_for_fork is true. */ +static void fork_fd_list_remove_node(grpc_fork_fd_list* node) { + if (track_fds_for_fork) { + gpr_mu_lock(&fork_fd_list_mu); + if (fork_fd_list_head == node) { + fork_fd_list_head = node->next; + } + if (node->prev != nullptr) { + node->prev->next = node->next; + } + if (node->next != nullptr) { + node->next->prev = node->prev; + } + gpr_free(node); + gpr_mu_unlock(&fork_fd_list_mu); + } +} + +static void fork_fd_list_add_node(grpc_fork_fd_list* node) { + gpr_mu_lock(&fork_fd_list_mu); + node->next = fork_fd_list_head; + node->prev = nullptr; + if (fork_fd_list_head != nullptr) { + fork_fd_list_head->prev = node; + } + fork_fd_list_head = node; + gpr_mu_unlock(&fork_fd_list_mu); +} + +static void fork_fd_list_add_grpc_fd(grpc_fd* fd) { + if (track_fds_for_fork) { + fd->fork_fd_list = + static_cast(gpr_malloc(sizeof(grpc_fork_fd_list))); + fd->fork_fd_list->fd = fd; + fd->fork_fd_list->cached_wakeup_fd = nullptr; + fork_fd_list_add_node(fd->fork_fd_list); + } +} + +static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) { + if (track_fds_for_fork) { + fd->fork_fd_list = + static_cast(gpr_malloc(sizeof(grpc_fork_fd_list))); + fd->fork_fd_list->cached_wakeup_fd = fd; + fd->fork_fd_list->fd = nullptr; + fork_fd_list_add_node(fd->fork_fd_list); + } +} + + /******************************************************************************* + * fd_posix.c + */ + #ifndef NDEBUG #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__) #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__) @@ -319,6 +397,7 @@ static void unref_by(grpc_fd* fd, int n) { if (old == n) { gpr_mu_destroy(&fd->mu); grpc_iomgr_unregister_object(&fd->iomgr_object); + fork_fd_list_remove_node(fd->fork_fd_list); if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error); gpr_free(fd); } else { @@ -347,6 +426,7 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) { gpr_asprintf(&name2, "%s fd=%d", name, fd); grpc_iomgr_register_object(&r->iomgr_object, name2); gpr_free(name2); + fork_fd_list_add_grpc_fd(r); return r; } @@ -822,6 +902,7 @@ static void pollset_destroy(grpc_pollset* pollset) { GPR_ASSERT(!pollset_has_workers(pollset)); while (pollset->local_wakeup_cache) { grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next; + fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list); grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); gpr_free(pollset->local_wakeup_cache); pollset->local_wakeup_cache = next; @@ -895,6 +976,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, worker.wakeup_fd = static_cast( gpr_malloc(sizeof(*worker.wakeup_fd))); error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + fork_fd_list_add_wakeup_fd(worker.wakeup_fd); if (error != GRPC_ERROR_NONE) { GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error)); return error; @@ -1705,6 +1787,10 @@ static void shutdown_engine(void) { if (grpc_cv_wakeup_fds_enabled()) { global_cv_fd_table_shutdown(); } + if (track_fds_for_fork) { + gpr_mu_destroy(&fork_fd_list_mu); + grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr); + } } static const grpc_event_engine_vtable vtable = { @@ -1742,6 +1828,26 @@ static const grpc_event_engine_vtable vtable = { shutdown_engine, }; +/* Called by the child process's post-fork handler to close open fds, including + * worker wakeup fds. This allows gRPC to shutdown in the child process without + * interfering with connections or RPCs ongoing in the parent. */ +static void reset_event_manager_on_fork() { + gpr_mu_lock(&fork_fd_list_mu); + while (fork_fd_list_head != nullptr) { + if (fork_fd_list_head->fd != nullptr) { + close(fork_fd_list_head->fd->fd); + fork_fd_list_head->fd->fd = -1; + } else { + close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd); + fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1; + close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd); + fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1; + } + fork_fd_list_head = fork_fd_list_head->next; + } + gpr_mu_unlock(&fork_fd_list_mu); +} + const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) { if (!grpc_has_wakeup_fd()) { gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd."); @@ -1750,6 +1856,12 @@ const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) { if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) { return nullptr; } + if (grpc_core::Fork::Enabled()) { + track_fds_for_fork = true; + gpr_mu_init(&fork_fd_list_mu); + grpc_core::Fork::SetResetChildPollingEngineFunc( + reset_event_manager_on_fork); + } return &vtable; } diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc index ac85c81de2d..e957bad73d3 100644 --- a/src/core/lib/iomgr/fork_posix.cc +++ b/src/core/lib/iomgr/fork_posix.cc @@ -58,6 +58,12 @@ void grpc_prefork() { "environment variable GRPC_ENABLE_FORK_SUPPORT=1"); return; } + if (strcmp(grpc_get_poll_strategy_name(), "epoll1") != 0 && + strcmp(grpc_get_poll_strategy_name(), "poll") != 0) { + gpr_log(GPR_ERROR, + "Fork support is only compatible with the epoll1 and poll polling " + "strategies"); + } if (!grpc_core::Fork::BlockExecCtx()) { gpr_log(GPR_INFO, "Other threads are currently calling into gRPC, skipping fork() " diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi index 0d2516977b8..433ae1f374f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi @@ -37,8 +37,6 @@ _GRPC_ENABLE_FORK_SUPPORT = ( os.environ.get('GRPC_ENABLE_FORK_SUPPORT', '0') .lower() in _TRUE_VALUES) -_GRPC_POLL_STRATEGY = os.environ.get('GRPC_POLL_STRATEGY') - cdef void __prefork() nogil: with gil: with _fork_state.fork_in_progress_condition: @@ -82,13 +80,6 @@ cdef void __postfork_child() nogil: def fork_handlers_and_grpc_init(): grpc_init() if _GRPC_ENABLE_FORK_SUPPORT: - # TODO(ericgribkoff) epoll1 is default for grpcio distribution. Decide whether to expose - # grpc_get_poll_strategy_name() from ev_posix.cc to get actual polling choice. - if _GRPC_POLL_STRATEGY is not None and _GRPC_POLL_STRATEGY != "epoll1": - _LOGGER.error( - 'gRPC Python fork support is only compatible with the epoll1 ' - 'polling engine') - return with _fork_state.fork_handler_registered_lock: if not _fork_state.fork_handler_registered: pthread_atfork(&__prefork, &__postfork_parent, &__postfork_child)