[Fork] Kick the EE PollPoller in prefork (#33916)

And add some trace points. This does not solve
[go/event-engine-forkable-prefork-deadlock](http://go/event-engine-forkable-prefork-deadlock),
but is a necessary step.
So ¯\\_(ツ)_/¯.

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/33988/head
Yijie Ma 2 years ago committed by GitHub
parent 6468b8a74f
commit 60c1701f87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      CMakeLists.txt
  2. 3
      build_autogenerated.yaml
  3. 2
      src/core/BUILD
  4. 16
      src/core/lib/event_engine/forkable.cc
  5. 15
      src/core/lib/event_engine/forkable.h
  6. 28
      src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
  7. 11
      src/core/lib/event_engine/posix_engine/ev_poll_posix.h

2
CMakeLists.txt generated

@ -12229,6 +12229,7 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(forkable_test
src/core/lib/debug/trace.cc
src/core/lib/event_engine/forkable.cc
test/core/event_engine/forkable_test.cc
third_party/googletest/googletest/src/gtest-all.cc
@ -12259,6 +12260,7 @@ target_link_libraries(forkable_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::statusor
gpr
)

@ -7985,11 +7985,14 @@ targets:
build: test
language: c++
headers:
- src/core/lib/debug/trace.h
- src/core/lib/event_engine/forkable.h
src:
- src/core/lib/debug/trace.cc
- src/core/lib/event_engine/forkable.cc
- test/core/event_engine/forkable_test.cc
deps:
- absl/status:statusor
- gpr
- name: format_request_test
gtest: true

@ -1386,6 +1386,7 @@ grpc_cc_library(
"//:config_vars",
"//:gpr",
"//:gpr_platform",
"//:grpc_trace",
],
)
@ -1742,6 +1743,7 @@ grpc_cc_library(
"common_event_engine_closures",
"event_engine_poller",
"event_engine_time_util",
"forkable",
"iomgr_port",
"posix_event_engine_closure",
"posix_event_engine_event_poller",

@ -35,6 +35,8 @@
namespace grpc_event_engine {
namespace experimental {
grpc_core::TraceFlag grpc_trace_fork(false, "fork");
namespace {
grpc_core::NoDestruct<grpc_core::Mutex> g_mu;
bool g_registered ABSL_GUARDED_BY(g_mu){false};
@ -58,41 +60,52 @@ void RegisterForkHandlers() {
grpc_core::MutexLock lock(g_mu.get());
if (!std::exchange(g_registered, true)) {
#ifdef GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK
GRPC_FORK_TRACE_LOG_STRING("RegisterForkHandlers");
pthread_atfork(PrepareFork, PostforkParent, PostforkChild);
#endif
}
}
};
}
void PrepareFork() {
if (IsForkEnabled()) {
GRPC_FORK_TRACE_LOG_STRING("PrepareFork");
grpc_core::MutexLock lock(g_mu.get());
for (auto forkable_iter = g_forkables->rbegin();
forkable_iter != g_forkables->rend(); ++forkable_iter) {
(*forkable_iter)->PrepareFork();
}
GRPC_FORK_TRACE_LOG_STRING("PrepareFork finished");
}
}
void PostforkParent() {
if (IsForkEnabled()) {
GRPC_FORK_TRACE_LOG_STRING("PostforkParent");
grpc_core::MutexLock lock(g_mu.get());
for (auto* forkable : *g_forkables) {
GRPC_FORK_TRACE_LOG("Calling PostforkParent for forkable::%p", forkable);
forkable->PostforkParent();
}
GRPC_FORK_TRACE_LOG_STRING("PostforkParent finished");
}
}
void PostforkChild() {
if (IsForkEnabled()) {
GRPC_FORK_TRACE_LOG_STRING("PostforkChild");
grpc_core::MutexLock lock(g_mu.get());
for (auto* forkable : *g_forkables) {
GRPC_FORK_TRACE_LOG("Calling PostforkChild for forkable::%p", forkable);
forkable->PostforkChild();
}
GRPC_FORK_TRACE_LOG_STRING("PostforkChild finished");
}
}
void ManageForkable(Forkable* forkable) {
if (IsForkEnabled()) {
GRPC_FORK_TRACE_LOG("Manage forkable::%p", forkable);
grpc_core::MutexLock lock(g_mu.get());
g_forkables->push_back(forkable);
}
@ -100,6 +113,7 @@ void ManageForkable(Forkable* forkable) {
void StopManagingForkable(Forkable* forkable) {
if (IsForkEnabled()) {
GRPC_FORK_TRACE_LOG("Stop managing forkable::%p", forkable);
grpc_core::MutexLock lock(g_mu.get());
auto iter = std::find(g_forkables->begin(), g_forkables->end(), forkable);
GPR_ASSERT(iter != g_forkables->end());

@ -16,9 +16,24 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
namespace grpc_event_engine {
namespace experimental {
extern grpc_core::TraceFlag grpc_trace_fork;
#define GRPC_FORK_TRACE_LOG(format, ...) \
do { \
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fork)) { \
gpr_log(GPR_DEBUG, "[fork] " format, __VA_ARGS__); \
} \
} while (0)
#define GRPC_FORK_TRACE_LOG_STRING(format) GRPC_FORK_TRACE_LOG("%s", format)
// Register fork handlers with the system, enabling fork support.
//
// This provides pthread-based support for fork events. Any objects that

@ -316,7 +316,7 @@ void ResetEventManagerOnFork() {
while (!fork_poller_list.empty()) {
PollPoller* poller = fork_poller_list.front();
fork_poller_list.pop_front();
delete poller;
poller->Close();
}
gpr_mu_unlock(&fork_fd_list_mu);
InitPollPollerPosix();
@ -566,6 +566,9 @@ bool PollEventHandle::EndPollLocked(bool got_read, bool got_write) {
void PollPoller::KickExternal(bool ext) {
grpc_core::MutexLock lock(&mu_);
if (closed_) {
return;
}
if (was_kicked_) {
if (ext) {
was_kicked_ext_ = true;
@ -610,7 +613,8 @@ PollPoller::PollPoller(Scheduler* scheduler)
was_kicked_(false),
was_kicked_ext_(false),
num_poll_handles_(0),
poll_handles_list_head_(nullptr) {
poll_handles_list_head_(nullptr),
closed_(false) {
wakeup_fd_ = *CreateWakeupFd();
GPR_ASSERT(wakeup_fd_ != nullptr);
ForkPollerListAddPoller(this);
@ -622,7 +626,8 @@ PollPoller::PollPoller(Scheduler* scheduler, bool use_phony_poll)
was_kicked_(false),
was_kicked_ext_(false),
num_poll_handles_(0),
poll_handles_list_head_(nullptr) {
poll_handles_list_head_(nullptr),
closed_(false) {
wakeup_fd_ = *CreateWakeupFd();
GPR_ASSERT(wakeup_fd_ != nullptr);
ForkPollerListAddPoller(this);
@ -829,6 +834,17 @@ void PollPoller::Shutdown() {
Unref();
}
void PollPoller::PrepareFork() { Kick(); }
// TODO(vigneshbabu): implement
void PollPoller::PostforkParent() {}
// TODO(vigneshbabu): implement
void PollPoller::PostforkChild() {}
void PollPoller::Close() {
grpc_core::MutexLock lock(&mu_);
closed_ = true;
}
PollPoller* MakePollPoller(Scheduler* scheduler, bool use_phony_poll) {
static bool kPollPollerSupported = InitPollPollerPosix();
if (kPollPollerSupported) {
@ -875,6 +891,12 @@ PollPoller* MakePollPoller(Scheduler* /*scheduler*/,
return nullptr;
}
void PollPoller::PrepareFork() { grpc_core::Crash("unimplemented"); }
void PollPoller::PostforkParent() { grpc_core::Crash("unimplemented"); }
void PollPoller::PostforkChild() { grpc_core::Crash("unimplemented"); }
void PollPoller::Close() { grpc_core::Crash("unimplemented"); }
void PollPoller::KickExternal(bool /*ext*/) {
grpc_core::Crash("unimplemented");
}

@ -27,6 +27,7 @@
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
@ -38,7 +39,7 @@ namespace experimental {
class PollEventHandle;
// Definition of poll based poller.
class PollPoller : public PosixEventPoller {
class PollPoller : public PosixEventPoller, public Forkable {
public:
explicit PollPoller(Scheduler* scheduler);
PollPoller(Scheduler* scheduler, bool use_phony_poll);
@ -54,6 +55,13 @@ class PollPoller : public PosixEventPoller {
bool CanTrackErrors() const override { return false; }
~PollPoller() override;
// Forkable
void PrepareFork() override;
void PostforkParent() override;
void PostforkChild() override;
void Close();
private:
void Ref() { ref_count_.fetch_add(1, std::memory_order_relaxed); }
void Unref() {
@ -83,6 +91,7 @@ class PollPoller : public PosixEventPoller {
int num_poll_handles_ ABSL_GUARDED_BY(mu_);
PollEventHandle* poll_handles_list_head_ ABSL_GUARDED_BY(mu_) = nullptr;
std::unique_ptr<WakeupFd> wakeup_fd_;
bool closed_ ABSL_GUARDED_BY(mu_);
};
// Return an instance of a poll based poller tied to the specified scheduler.

Loading…
Cancel
Save