[fork] simplify Fork::SetResetChildPollingEngineFunc to fix nested forking (#33495)

Noticed this in a nested forking test in
https://github.com/grpc/grpc/pull/33430 (by nested fork - forked child
process forks again).

Before this change, the EventEngine and non-EventEngine pollers were
competing with each other in their calls to
`Fork::SetResetChildPollingEngineFunc`. In the first child's after-fork
handler, the EE engine would [clear
out](123da4a866/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc (L260))
the post-fork handlers of the non-EE poller, leaving the grandchild
without the right post-fork cleanup method.
pull/33508/head
apolcyn 2 years ago committed by GitHub
parent cd4ff81b3f
commit e9c44836bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
  2. 10
      src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
  3. 17
      src/core/lib/gprpp/fork.cc
  4. 11
      src/core/lib/gprpp/fork.h
  5. 12
      src/core/lib/iomgr/ev_epoll1_linux.cc
  6. 9
      src/core/lib/iomgr/ev_poll_posix.cc

@ -255,10 +255,6 @@ void ResetEventManagerOnFork() {
poller->Close();
}
gpr_mu_unlock(&fork_fd_list_mu);
if (grpc_core::Fork::Enabled()) {
gpr_mu_destroy(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
}
InitEpoll1PollerLinux();
}
@ -273,8 +269,10 @@ bool InitEpoll1PollerLinux() {
return false;
}
if (grpc_core::Fork::Enabled()) {
gpr_mu_init(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(ResetEventManagerOnFork);
if (grpc_core::Fork::RegisterResetChildPollingEngineFunc(
ResetEventManagerOnFork)) {
gpr_mu_init(&fork_fd_list_mu);
}
}
close(fd);
return true;

@ -319,10 +319,6 @@ void ResetEventManagerOnFork() {
delete poller;
}
gpr_mu_unlock(&fork_fd_list_mu);
if (grpc_core::Fork::Enabled()) {
gpr_mu_destroy(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
}
InitPollPollerPosix();
}
@ -333,8 +329,10 @@ bool InitPollPollerPosix() {
return false;
}
if (grpc_core::Fork::Enabled()) {
gpr_mu_init(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(ResetEventManagerOnFork);
if (grpc_core::Fork::RegisterResetChildPollingEngineFunc(
ResetEventManagerOnFork)) {
gpr_mu_init(&fork_fd_list_mu);
}
}
return true;
}

@ -20,6 +20,8 @@
#include "src/core/lib/gprpp/fork.h"
#include <utility>
#include <grpc/support/atm.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
@ -189,19 +191,16 @@ void Fork::DoDecExecCtxCount() {
NoDestructSingleton<ExecCtxState>::Get()->DecExecCtxCount();
}
void Fork::SetResetChildPollingEngineFunc(
bool Fork::RegisterResetChildPollingEngineFunc(
Fork::child_postfork_func reset_child_polling_engine) {
if (reset_child_polling_engine_ == nullptr) {
reset_child_polling_engine_ = new std::vector<Fork::child_postfork_func>();
}
if (reset_child_polling_engine == nullptr) {
reset_child_polling_engine_->clear();
} else {
reset_child_polling_engine_->emplace_back(reset_child_polling_engine);
reset_child_polling_engine_ = new std::set<Fork::child_postfork_func>();
}
auto ret = reset_child_polling_engine_->insert(reset_child_polling_engine);
return ret.second;
}
const std::vector<Fork::child_postfork_func>&
const std::set<Fork::child_postfork_func>&
Fork::GetResetChildPollingEngineFunc() {
return *reset_child_polling_engine_;
}
@ -238,6 +237,6 @@ void Fork::AwaitThreads() {
std::atomic<bool> Fork::support_enabled_(false);
bool Fork::override_enabled_ = false;
std::vector<Fork::child_postfork_func>* Fork::reset_child_polling_engine_ =
std::set<Fork::child_postfork_func>* Fork::reset_child_polling_engine_ =
nullptr;
} // namespace grpc_core

@ -22,7 +22,7 @@
#include <grpc/support/port_platform.h>
#include <atomic>
#include <vector>
#include <set>
//
// NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
@ -57,10 +57,11 @@ class Fork {
// Provide a function that will be invoked in the child's postfork handler to
// reset the polling engine's internal state.
static void SetResetChildPollingEngineFunc(
// Returns true if reset_child_polling_engine was not previously registered,
// otherwise returns false and does nothing.
static bool RegisterResetChildPollingEngineFunc(
child_postfork_func reset_child_polling_engine);
static const std::vector<child_postfork_func>&
GetResetChildPollingEngineFunc();
static const std::set<child_postfork_func>& GetResetChildPollingEngineFunc();
// Check if there is a single active ExecCtx
// (the one used to invoke this function). If there are more,
@ -89,7 +90,7 @@ class Fork {
static std::atomic<bool> support_enabled_;
static bool override_enabled_;
static std::vector<child_postfork_func>* reset_child_polling_engine_;
static std::set<child_postfork_func>* reset_child_polling_engine_;
};
} // namespace grpc_core

@ -1236,10 +1236,6 @@ static void shutdown_engine(void) {
fd_global_shutdown();
pollset_global_shutdown();
epoll_set_shutdown();
if (grpc_core::Fork::Enabled()) {
gpr_mu_destroy(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
}
g_is_shutdown = true;
}
@ -1296,6 +1292,7 @@ const grpc_event_engine_vtable grpc_ev_epoll1_posix = {
// the global epoll fd. This allows gRPC to shutdown in the child process
// without interfering with connections or RPCs ongoing in the parent.
static void reset_event_manager_on_fork() {
if (g_is_shutdown) return;
gpr_mu_lock(&fork_fd_list_mu);
while (fork_fd_list_head != nullptr) {
close(fork_fd_list_head->fd);
@ -1330,9 +1327,10 @@ static bool init_epoll1_linux() {
}
if (grpc_core::Fork::Enabled()) {
gpr_mu_init(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(
reset_event_manager_on_fork);
if (grpc_core::Fork::RegisterResetChildPollingEngineFunc(
reset_event_manager_on_fork)) {
gpr_mu_init(&fork_fd_list_mu);
}
}
g_is_shutdown = false;
return true;

@ -1404,10 +1404,11 @@ const grpc_event_engine_vtable grpc_ev_poll_posix = {
return false;
}
if (grpc_core::Fork::Enabled()) {
track_fds_for_fork = true;
gpr_mu_init(&fork_fd_list_mu);
grpc_core::Fork::SetResetChildPollingEngineFunc(
reset_event_manager_on_fork);
if (grpc_core::Fork::RegisterResetChildPollingEngineFunc(
reset_event_manager_on_fork)) {
track_fds_for_fork = true;
gpr_mu_init(&fork_fd_list_mu);
}
}
return true;
},

Loading…
Cancel
Save