diff --git a/CMakeLists.txt b/CMakeLists.txt index 0392d456561..c0e5a3fb9e9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12229,6 +12229,7 @@ endif() if(gRPC_BUILD_TESTS) add_executable(forkable_test + src/core/lib/debug/trace.cc src/core/lib/event_engine/forkable.cc test/core/event_engine/forkable_test.cc third_party/googletest/googletest/src/gtest-all.cc @@ -12259,6 +12260,7 @@ target_link_libraries(forkable_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ZLIB_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} + absl::statusor gpr ) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index da1291c2353..82e6d6438c7 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -7985,11 +7985,14 @@ targets: build: test language: c++ headers: + - src/core/lib/debug/trace.h - src/core/lib/event_engine/forkable.h src: + - src/core/lib/debug/trace.cc - src/core/lib/event_engine/forkable.cc - test/core/event_engine/forkable_test.cc deps: + - absl/status:statusor - gpr - name: format_request_test gtest: true diff --git a/src/core/BUILD b/src/core/BUILD index 9a7a2c179a7..cda8d1c32ee 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1386,6 +1386,7 @@ grpc_cc_library( "//:config_vars", "//:gpr", "//:gpr_platform", + "//:grpc_trace", ], ) @@ -1742,6 +1743,7 @@ grpc_cc_library( "common_event_engine_closures", "event_engine_poller", "event_engine_time_util", + "forkable", "iomgr_port", "posix_event_engine_closure", "posix_event_engine_event_poller", diff --git a/src/core/lib/event_engine/forkable.cc b/src/core/lib/event_engine/forkable.cc index 4906084d274..57f60577d10 100644 --- a/src/core/lib/event_engine/forkable.cc +++ b/src/core/lib/event_engine/forkable.cc @@ -35,6 +35,8 @@ namespace grpc_event_engine { namespace experimental { +grpc_core::TraceFlag grpc_trace_fork(false, "fork"); + namespace { grpc_core::NoDestruct g_mu; bool g_registered ABSL_GUARDED_BY(g_mu){false}; @@ -58,41 +60,52 @@ void RegisterForkHandlers() { grpc_core::MutexLock lock(g_mu.get()); if (!std::exchange(g_registered, true)) { #ifdef GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK + GRPC_FORK_TRACE_LOG_STRING("RegisterForkHandlers"); pthread_atfork(PrepareFork, PostforkParent, PostforkChild); #endif } } -}; +} void PrepareFork() { if (IsForkEnabled()) { + GRPC_FORK_TRACE_LOG_STRING("PrepareFork"); grpc_core::MutexLock lock(g_mu.get()); for (auto forkable_iter = g_forkables->rbegin(); forkable_iter != g_forkables->rend(); ++forkable_iter) { (*forkable_iter)->PrepareFork(); } + GRPC_FORK_TRACE_LOG_STRING("PrepareFork finished"); } } + void PostforkParent() { if (IsForkEnabled()) { + GRPC_FORK_TRACE_LOG_STRING("PostforkParent"); grpc_core::MutexLock lock(g_mu.get()); for (auto* forkable : *g_forkables) { + GRPC_FORK_TRACE_LOG("Calling PostforkParent for forkable::%p", forkable); forkable->PostforkParent(); } + GRPC_FORK_TRACE_LOG_STRING("PostforkParent finished"); } } void PostforkChild() { if (IsForkEnabled()) { + GRPC_FORK_TRACE_LOG_STRING("PostforkChild"); grpc_core::MutexLock lock(g_mu.get()); for (auto* forkable : *g_forkables) { + GRPC_FORK_TRACE_LOG("Calling PostforkChild for forkable::%p", forkable); forkable->PostforkChild(); } + GRPC_FORK_TRACE_LOG_STRING("PostforkChild finished"); } } void ManageForkable(Forkable* forkable) { if (IsForkEnabled()) { + GRPC_FORK_TRACE_LOG("Manage forkable::%p", forkable); grpc_core::MutexLock lock(g_mu.get()); g_forkables->push_back(forkable); } @@ -100,6 +113,7 @@ void ManageForkable(Forkable* forkable) { void StopManagingForkable(Forkable* forkable) { if (IsForkEnabled()) { + GRPC_FORK_TRACE_LOG("Stop managing forkable::%p", forkable); grpc_core::MutexLock lock(g_mu.get()); auto iter = std::find(g_forkables->begin(), g_forkables->end(), forkable); GPR_ASSERT(iter != g_forkables->end()); diff --git a/src/core/lib/event_engine/forkable.h b/src/core/lib/event_engine/forkable.h index 17fed628e15..c585a830988 100644 --- a/src/core/lib/event_engine/forkable.h +++ b/src/core/lib/event_engine/forkable.h @@ -16,9 +16,24 @@ #include +#include + +#include "src/core/lib/debug/trace.h" + namespace grpc_event_engine { namespace experimental { +extern grpc_core::TraceFlag grpc_trace_fork; + +#define GRPC_FORK_TRACE_LOG(format, ...) \ + do { \ + if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fork)) { \ + gpr_log(GPR_DEBUG, "[fork] " format, __VA_ARGS__); \ + } \ + } while (0) + +#define GRPC_FORK_TRACE_LOG_STRING(format) GRPC_FORK_TRACE_LOG("%s", format) + // Register fork handlers with the system, enabling fork support. // // This provides pthread-based support for fork events. Any objects that diff --git a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc index fdb4ed09bec..56fd15c26f0 100644 --- a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc +++ b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc @@ -316,7 +316,7 @@ void ResetEventManagerOnFork() { while (!fork_poller_list.empty()) { PollPoller* poller = fork_poller_list.front(); fork_poller_list.pop_front(); - delete poller; + poller->Close(); } gpr_mu_unlock(&fork_fd_list_mu); InitPollPollerPosix(); @@ -566,6 +566,9 @@ bool PollEventHandle::EndPollLocked(bool got_read, bool got_write) { void PollPoller::KickExternal(bool ext) { grpc_core::MutexLock lock(&mu_); + if (closed_) { + return; + } if (was_kicked_) { if (ext) { was_kicked_ext_ = true; @@ -610,7 +613,8 @@ PollPoller::PollPoller(Scheduler* scheduler) was_kicked_(false), was_kicked_ext_(false), num_poll_handles_(0), - poll_handles_list_head_(nullptr) { + poll_handles_list_head_(nullptr), + closed_(false) { wakeup_fd_ = *CreateWakeupFd(); GPR_ASSERT(wakeup_fd_ != nullptr); ForkPollerListAddPoller(this); @@ -622,7 +626,8 @@ PollPoller::PollPoller(Scheduler* scheduler, bool use_phony_poll) was_kicked_(false), was_kicked_ext_(false), num_poll_handles_(0), - poll_handles_list_head_(nullptr) { + poll_handles_list_head_(nullptr), + closed_(false) { wakeup_fd_ = *CreateWakeupFd(); GPR_ASSERT(wakeup_fd_ != nullptr); ForkPollerListAddPoller(this); @@ -829,6 +834,17 @@ void PollPoller::Shutdown() { Unref(); } +void PollPoller::PrepareFork() { Kick(); } +// TODO(vigneshbabu): implement +void PollPoller::PostforkParent() {} +// TODO(vigneshbabu): implement +void PollPoller::PostforkChild() {} + +void PollPoller::Close() { + grpc_core::MutexLock lock(&mu_); + closed_ = true; +} + PollPoller* MakePollPoller(Scheduler* scheduler, bool use_phony_poll) { static bool kPollPollerSupported = InitPollPollerPosix(); if (kPollPollerSupported) { @@ -875,6 +891,12 @@ PollPoller* MakePollPoller(Scheduler* /*scheduler*/, return nullptr; } +void PollPoller::PrepareFork() { grpc_core::Crash("unimplemented"); } +void PollPoller::PostforkParent() { grpc_core::Crash("unimplemented"); } +void PollPoller::PostforkChild() { grpc_core::Crash("unimplemented"); } + +void PollPoller::Close() { grpc_core::Crash("unimplemented"); } + void PollPoller::KickExternal(bool /*ext*/) { grpc_core::Crash("unimplemented"); } diff --git a/src/core/lib/event_engine/posix_engine/ev_poll_posix.h b/src/core/lib/event_engine/posix_engine/ev_poll_posix.h index dac37ea130d..582991b7c7a 100644 --- a/src/core/lib/event_engine/posix_engine/ev_poll_posix.h +++ b/src/core/lib/event_engine/posix_engine/ev_poll_posix.h @@ -27,6 +27,7 @@ #include +#include "src/core/lib/event_engine/forkable.h" #include "src/core/lib/event_engine/poller.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h" @@ -38,7 +39,7 @@ namespace experimental { class PollEventHandle; // Definition of poll based poller. -class PollPoller : public PosixEventPoller { +class PollPoller : public PosixEventPoller, public Forkable { public: explicit PollPoller(Scheduler* scheduler); PollPoller(Scheduler* scheduler, bool use_phony_poll); @@ -54,6 +55,13 @@ class PollPoller : public PosixEventPoller { bool CanTrackErrors() const override { return false; } ~PollPoller() override; + // Forkable + void PrepareFork() override; + void PostforkParent() override; + void PostforkChild() override; + + void Close(); + private: void Ref() { ref_count_.fetch_add(1, std::memory_order_relaxed); } void Unref() { @@ -83,6 +91,7 @@ class PollPoller : public PosixEventPoller { int num_poll_handles_ ABSL_GUARDED_BY(mu_); PollEventHandle* poll_handles_list_head_ ABSL_GUARDED_BY(mu_) = nullptr; std::unique_ptr wakeup_fd_; + bool closed_ ABSL_GUARDED_BY(mu_); }; // Return an instance of a poll based poller tied to the specified scheduler.