Fix Python epoll1 Fork Support (#32196)

* WIP. A seemingly properly failing test

* WIP. Pre-fork handlers now work

* Roughly working.

* Clean up

* Clean up more

* Add to CI

* Format

* Ugh. Remove swap file

* And another

* clean up

* Add copyright

* Format

* Remove another debug line

* Add stub forkable methods

* Remove use of 3.9+ function

* Remove unintentional double copyright

* drfloob review comments

* Only hold lock during Close once

* Create separate job for fork test

* Bump up gdb timeout

* Format
pull/32278/head
Richard Belleville 2 years ago committed by GitHub
parent 7d8a978aa0
commit 190d095a62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/core/BUILD
  2. 17
      src/core/lib/event_engine/forkable.cc
  3. 42
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
  4. 12
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
  5. 7
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  6. 1
      src/core/lib/event_engine/posix_engine/posix_engine.h
  7. 18
      src/core/lib/gprpp/fork.cc
  8. 6
      src/core/lib/gprpp/fork.h
  9. 9
      src/core/lib/iomgr/fork_posix.cc
  10. 37
      src/python/grpcio_tests/tests/fork/BUILD.bazel
  11. 110
      src/python/grpcio_tests/tests/fork/_fork_interop_test.py
  12. 131
      src/python/grpcio_tests/tests/fork/methods.py
  13. 33
      src/python/grpcio_tests/tests/fork/native_debug.pyx
  14. 5
      tools/bazel.rc
  15. 26
      tools/internal_ci/linux/grpc_python_bazel_test_fork_in_docker.sh
  16. 6
      tools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh
  17. 30
      tools/internal_ci/linux/grpc_python_fork_bazel.cfg

@ -1685,6 +1685,7 @@ grpc_cc_library(
deps = [ deps = [
"event_engine_poller", "event_engine_poller",
"event_engine_time_util", "event_engine_time_util",
"forkable",
"iomgr_port", "iomgr_port",
"posix_event_engine_closure", "posix_event_engine_closure",
"posix_event_engine_event_poller", "posix_event_engine_event_poller",

@ -31,8 +31,10 @@ namespace experimental {
namespace { namespace {
grpc_core::NoDestruct<grpc_core::Mutex> g_mu; grpc_core::NoDestruct<grpc_core::Mutex> g_mu;
bool g_registered ABSL_GUARDED_BY(g_mu){false}; bool g_registered ABSL_GUARDED_BY(g_mu){false};
grpc_core::NoDestruct<absl::flat_hash_set<Forkable*>> g_forkables
ABSL_GUARDED_BY(g_mu); // This must be ordered because there are ordering dependencies between
// certain fork handlers.
grpc_core::NoDestruct<std::vector<Forkable*>> g_forkables ABSL_GUARDED_BY(g_mu);
} // namespace } // namespace
Forkable::Forkable() { ManageForkable(this); } Forkable::Forkable() { ManageForkable(this); }
@ -48,8 +50,9 @@ void RegisterForkHandlers() {
void PrepareFork() { void PrepareFork() {
grpc_core::MutexLock lock(g_mu.get()); grpc_core::MutexLock lock(g_mu.get());
for (auto* forkable : *g_forkables) { for (auto forkable_iter = g_forkables->rbegin();
forkable->PrepareFork(); forkable_iter != g_forkables->rend(); ++forkable_iter) {
(*forkable_iter)->PrepareFork();
} }
} }
void PostforkParent() { void PostforkParent() {
@ -68,12 +71,14 @@ void PostforkChild() {
void ManageForkable(Forkable* forkable) { void ManageForkable(Forkable* forkable) {
grpc_core::MutexLock lock(g_mu.get()); grpc_core::MutexLock lock(g_mu.get());
g_forkables->insert(forkable); g_forkables->push_back(forkable);
} }
void StopManagingForkable(Forkable* forkable) { void StopManagingForkable(Forkable* forkable) {
grpc_core::MutexLock lock(g_mu.get()); grpc_core::MutexLock lock(g_mu.get());
g_forkables->erase(forkable); auto iter = std::find(g_forkables->begin(), g_forkables->end(), forkable);
GPR_ASSERT(iter != g_forkables->end());
g_forkables->erase(iter);
} }
} // namespace experimental } // namespace experimental

@ -252,7 +252,7 @@ void ResetEventManagerOnFork() {
while (!fork_poller_list.empty()) { while (!fork_poller_list.empty()) {
Epoll1Poller* poller = fork_poller_list.front(); Epoll1Poller* poller = fork_poller_list.front();
fork_poller_list.pop_front(); fork_poller_list.pop_front();
delete poller; poller->Close();
} }
gpr_mu_unlock(&fork_fd_list_mu); gpr_mu_unlock(&fork_fd_list_mu);
if (grpc_core::Fork::Enabled()) { if (grpc_core::Fork::Enabled()) {
@ -354,7 +354,7 @@ void Epoll1EventHandle::HandleShutdownInternal(absl::Status why,
} }
Epoll1Poller::Epoll1Poller(Scheduler* scheduler) Epoll1Poller::Epoll1Poller(Scheduler* scheduler)
: scheduler_(scheduler), was_kicked_(false) { : scheduler_(scheduler), was_kicked_(false), closed_(false) {
g_epoll_set_.epfd = EpollCreateAndCloexec(); g_epoll_set_.epfd = EpollCreateAndCloexec();
wakeup_fd_ = *CreateWakeupFd(); wakeup_fd_ = *CreateWakeupFd();
GPR_ASSERT(wakeup_fd_ != nullptr); GPR_ASSERT(wakeup_fd_ != nullptr);
@ -375,22 +375,26 @@ void Epoll1Poller::Shutdown() {
delete this; delete this;
} }
Epoll1Poller::~Epoll1Poller() { void Epoll1Poller::Close() {
grpc_core::MutexLock lock(&mu_);
if (closed_) return;
if (g_epoll_set_.epfd >= 0) { if (g_epoll_set_.epfd >= 0) {
close(g_epoll_set_.epfd); close(g_epoll_set_.epfd);
g_epoll_set_.epfd = -1; g_epoll_set_.epfd = -1;
} }
{
grpc_core::MutexLock lock(&mu_); while (!free_epoll1_handles_list_.empty()) {
while (!free_epoll1_handles_list_.empty()) { Epoll1EventHandle* handle =
Epoll1EventHandle* handle = reinterpret_cast<Epoll1EventHandle*>( reinterpret_cast<Epoll1EventHandle*>(free_epoll1_handles_list_.front());
free_epoll1_handles_list_.front()); free_epoll1_handles_list_.pop_front();
free_epoll1_handles_list_.pop_front(); delete handle;
delete handle;
}
} }
closed_ = true;
} }
Epoll1Poller::~Epoll1Poller() { Close(); }
EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/, EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/,
bool track_err) { bool track_err) {
Epoll1EventHandle* new_handle = nullptr; Epoll1EventHandle* new_handle = nullptr;
@ -556,7 +560,7 @@ Poller::WorkResult Epoll1Poller::Work(
void Epoll1Poller::Kick() { void Epoll1Poller::Kick() {
grpc_core::MutexLock lock(&mu_); grpc_core::MutexLock lock(&mu_);
if (was_kicked_) { if (was_kicked_ || closed_) {
return; return;
} }
was_kicked_ = true; was_kicked_ = true;
@ -571,6 +575,14 @@ Epoll1Poller* MakeEpoll1Poller(Scheduler* scheduler) {
return nullptr; return nullptr;
} }
void Epoll1Poller::PrepareFork() { Kick(); }
// TODO(vigneshbabu): implement
void Epoll1Poller::PostforkParent() {}
// TODO(vigneshbabu): implement
void Epoll1Poller::PostforkChild() {}
} // namespace experimental } // namespace experimental
} // namespace grpc_event_engine } // namespace grpc_event_engine
@ -617,6 +629,12 @@ void Epoll1Poller::Kick() { grpc_core::Crash("unimplemented"); }
// nullptr. // nullptr.
Epoll1Poller* MakeEpoll1Poller(Scheduler* /*scheduler*/) { return nullptr; } Epoll1Poller* MakeEpoll1Poller(Scheduler* /*scheduler*/) { return nullptr; }
void Epoll1Poller::PrepareFork() {}
void Epoll1Poller::PostforkParent() {}
void Epoll1Poller::PostforkChild() {}
} // namespace experimental } // namespace experimental
} // namespace grpc_event_engine } // namespace grpc_event_engine

@ -27,6 +27,7 @@
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/event_engine/poller.h" #include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/internal_errqueue.h" #include "src/core/lib/event_engine/posix_engine/internal_errqueue.h"
@ -46,7 +47,7 @@ namespace experimental {
class Epoll1EventHandle; class Epoll1EventHandle;
// Definition of epoll1 based poller. // Definition of epoll1 based poller.
class Epoll1Poller : public PosixEventPoller { class Epoll1Poller : public PosixEventPoller, public Forkable {
public: public:
explicit Epoll1Poller(Scheduler* scheduler); explicit Epoll1Poller(Scheduler* scheduler);
EventHandle* CreateHandle(int fd, absl::string_view name, EventHandle* CreateHandle(int fd, absl::string_view name,
@ -67,6 +68,13 @@ class Epoll1Poller : public PosixEventPoller {
} }
~Epoll1Poller() override; ~Epoll1Poller() override;
// Forkable
void PrepareFork() override;
void PostforkParent() override;
void PostforkChild() override;
void Close();
private: private:
// This initial vector size may need to be tuned // This initial vector size may need to be tuned
using Events = absl::InlinedVector<Epoll1EventHandle*, 5>; using Events = absl::InlinedVector<Epoll1EventHandle*, 5>;
@ -79,6 +87,7 @@ class Epoll1Poller : public PosixEventPoller {
// on file descriptors that became readable/writable. // on file descriptors that became readable/writable.
bool ProcessEpollEvents(int max_epoll_events_to_handle, bool ProcessEpollEvents(int max_epoll_events_to_handle,
Events& pending_events); Events& pending_events);
// Do epoll_wait and store the events in g_epoll_set.events field. This does // Do epoll_wait and store the events in g_epoll_set.events field. This does
// not "process" any of the events yet; that is done in ProcessEpollEvents(). // not "process" any of the events yet; that is done in ProcessEpollEvents().
// See ProcessEpollEvents() function for more details. It returns the number // See ProcessEpollEvents() function for more details. It returns the number
@ -117,6 +126,7 @@ class Epoll1Poller : public PosixEventPoller {
bool was_kicked_ ABSL_GUARDED_BY(mu_); bool was_kicked_ ABSL_GUARDED_BY(mu_);
std::list<EventHandle*> free_epoll1_handles_list_ ABSL_GUARDED_BY(mu_); std::list<EventHandle*> free_epoll1_handles_list_ ABSL_GUARDED_BY(mu_);
std::unique_ptr<WakeupFd> wakeup_fd_; std::unique_ptr<WakeupFd> wakeup_fd_;
bool closed_;
}; };
// Return an instance of a epoll1 based poller tied to the specified event // Return an instance of a epoll1 based poller tied to the specified event

@ -293,7 +293,8 @@ void PosixEventEngine::OnConnectFinishInternal(int connection_handle) {
PosixEnginePollerManager::PosixEnginePollerManager( PosixEnginePollerManager::PosixEnginePollerManager(
std::shared_ptr<ThreadPool> executor) std::shared_ptr<ThreadPool> executor)
: poller_(grpc_event_engine::experimental::MakeDefaultPoller(this)), : poller_(grpc_event_engine::experimental::MakeDefaultPoller(this)),
executor_(std::move(executor)) {} executor_(std::move(executor)),
trigger_shutdown_called_(false) {}
PosixEnginePollerManager::PosixEnginePollerManager(PosixEventPoller* poller) PosixEnginePollerManager::PosixEnginePollerManager(PosixEventPoller* poller)
: poller_(poller), : poller_(poller),
@ -316,6 +317,8 @@ void PosixEnginePollerManager::Run(absl::AnyInvocable<void()> cb) {
} }
void PosixEnginePollerManager::TriggerShutdown() { void PosixEnginePollerManager::TriggerShutdown() {
GPR_DEBUG_ASSERT(trigger_shutdown_called_ == false);
trigger_shutdown_called_ = true;
// If the poller is external, dont try to shut it down. Otherwise // If the poller is external, dont try to shut it down. Otherwise
// set poller state to PollerState::kShuttingDown. // set poller state to PollerState::kShuttingDown.
if (poller_state_.exchange(PollerState::kShuttingDown) == if (poller_state_.exchange(PollerState::kShuttingDown) ==
@ -347,6 +350,8 @@ PosixEventEngine::PosixEventEngine()
timer_manager_(executor_) { timer_manager_(executor_) {
if (NeedPosixEngine()) { if (NeedPosixEngine()) {
poller_manager_ = std::make_shared<PosixEnginePollerManager>(executor_); poller_manager_ = std::make_shared<PosixEnginePollerManager>(executor_);
// The threadpool must be instantiated after the poller otherwise, the
// process will deadlock when forking.
if (poller_manager_->Poller() != nullptr) { if (poller_manager_->Poller() != nullptr) {
executor_->Run([poller_manager = poller_manager_]() { executor_->Run([poller_manager = poller_manager_]() {
PollerWorkInternal(poller_manager); PollerWorkInternal(poller_manager);

@ -125,6 +125,7 @@ class PosixEnginePollerManager
grpc_event_engine::experimental::PosixEventPoller* poller_ = nullptr; grpc_event_engine::experimental::PosixEventPoller* poller_ = nullptr;
std::atomic<PollerState> poller_state_{PollerState::kOk}; std::atomic<PollerState> poller_state_{PollerState::kOk};
std::shared_ptr<ThreadPool> executor_; std::shared_ptr<ThreadPool> executor_;
bool trigger_shutdown_called_;
}; };
#endif // GRPC_POSIX_SOCKET_TCP #endif // GRPC_POSIX_SOCKET_TCP

@ -197,10 +197,19 @@ void Fork::DoDecExecCtxCount() {
void Fork::SetResetChildPollingEngineFunc( void Fork::SetResetChildPollingEngineFunc(
Fork::child_postfork_func reset_child_polling_engine) { Fork::child_postfork_func reset_child_polling_engine) {
reset_child_polling_engine_ = reset_child_polling_engine; if (reset_child_polling_engine_ == nullptr) {
reset_child_polling_engine_ = new std::vector<Fork::child_postfork_func>();
}
if (reset_child_polling_engine == nullptr) {
reset_child_polling_engine_->clear();
} else {
reset_child_polling_engine_->emplace_back(reset_child_polling_engine);
}
} }
Fork::child_postfork_func Fork::GetResetChildPollingEngineFunc() {
return reset_child_polling_engine_; const std::vector<Fork::child_postfork_func>&
Fork::GetResetChildPollingEngineFunc() {
return *reset_child_polling_engine_;
} }
bool Fork::BlockExecCtx() { bool Fork::BlockExecCtx() {
@ -235,5 +244,6 @@ void Fork::AwaitThreads() {
std::atomic<bool> Fork::support_enabled_(false); std::atomic<bool> Fork::support_enabled_(false);
bool Fork::override_enabled_ = false; bool Fork::override_enabled_ = false;
Fork::child_postfork_func Fork::reset_child_polling_engine_ = nullptr; std::vector<Fork::child_postfork_func>* Fork::reset_child_polling_engine_ =
nullptr;
} // namespace grpc_core } // namespace grpc_core

@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <atomic> #include <atomic>
#include <vector>
// //
// NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK // NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
@ -58,7 +59,8 @@ class Fork {
// reset the polling engine's internal state. // reset the polling engine's internal state.
static void SetResetChildPollingEngineFunc( static void SetResetChildPollingEngineFunc(
child_postfork_func reset_child_polling_engine); child_postfork_func reset_child_polling_engine);
static child_postfork_func GetResetChildPollingEngineFunc(); static const std::vector<child_postfork_func>&
GetResetChildPollingEngineFunc();
// Check if there is a single active ExecCtx // Check if there is a single active ExecCtx
// (the one used to invoke this function). If there are more, // (the one used to invoke this function). If there are more,
@ -87,7 +89,7 @@ class Fork {
static std::atomic<bool> support_enabled_; static std::atomic<bool> support_enabled_;
static bool override_enabled_; static bool override_enabled_;
static child_postfork_func reset_child_polling_engine_; static std::vector<child_postfork_func>* reset_child_polling_engine_;
}; };
} // namespace grpc_core } // namespace grpc_core

@ -100,10 +100,11 @@ void grpc_postfork_child() {
if (!skipped_handler) { if (!skipped_handler) {
grpc_core::Fork::AllowExecCtx(); grpc_core::Fork::AllowExecCtx();
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_core::Fork::child_postfork_func reset_polling_engine = for (auto* reset_polling_engine :
grpc_core::Fork::GetResetChildPollingEngineFunc(); grpc_core::Fork::GetResetChildPollingEngineFunc()) {
if (reset_polling_engine != nullptr) { if (reset_polling_engine != nullptr) {
reset_polling_engine(); reset_polling_engine();
}
} }
grpc_timer_manager_set_threading(true); grpc_timer_manager_set_threading(true);
grpc_core::Executor::SetThreadingAll(true); grpc_core::Executor::SetThreadingAll(true);

@ -0,0 +1,37 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:cython_library.bzl", "pyx_library")
pyx_library(
name = "native_debug",
srcs = ["native_debug.pyx"],
deps = ["@com_google_absl//absl/debugging:failure_signal_handler"],
)
py_test(
name = "fork_test",
srcs = glob(["*.py"]),
imports = ["../.."],
main = "_fork_interop_test.py",
python_version = "PY3",
deps = [
":native_debug",
"//src/proto/grpc/testing:empty_py_pb2",
"//src/proto/grpc/testing:py_messages_proto",
"//src/proto/grpc/testing:test_py_pb2_grpc",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_tests/tests/interop:service",
"//src/python/grpcio_tests/tests/unit:test_common",
],
)

@ -13,16 +13,29 @@
# limitations under the License. # limitations under the License.
"""Client-side fork interop tests as a unit test.""" """Client-side fork interop tests as a unit test."""
import os
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
import threading import threading
import time
import unittest import unittest
from grpc._cython import cygrpc from grpc._cython import cygrpc
from tests.fork import methods from tests.fork import methods
def _dump_streams(name, streams):
assert len(streams) == 2
for stream_name, stream in zip(("STDOUT", "STDERR"), streams):
stream.seek(0)
sys.stderr.write("{} {}:\n{}\n".format(name, stream_name,
stream.read().decode("ascii")))
stream.close()
sys.stderr.flush()
# New instance of multiprocessing.Process using fork without exec can and will # New instance of multiprocessing.Process using fork without exec can and will
# freeze if the Python process has any other threads running. This includes the # freeze if the Python process has any other threads running. This includes the
# additional thread spawned by our _runner.py class. So in order to test our # additional thread spawned by our _runner.py class. So in order to test our
@ -30,27 +43,36 @@ from tests.fork import methods
# we don't have any conflicting background threads. # we don't have any conflicting background threads.
_CLIENT_FORK_SCRIPT_TEMPLATE = """if True: _CLIENT_FORK_SCRIPT_TEMPLATE = """if True:
import os import os
import sys
from grpc._cython import cygrpc from grpc._cython import cygrpc
from tests.fork import methods from tests.fork import methods
from src.python.grpcio_tests.tests.fork import native_debug
native_debug.install_failure_signal_handler()
cygrpc._GRPC_ENABLE_FORK_SUPPORT = True cygrpc._GRPC_ENABLE_FORK_SUPPORT = True
os.environ['GRPC_POLL_STRATEGY'] = 'epoll1' os.environ['GRPC_POLL_STRATEGY'] = 'epoll1'
os.environ['GRPC_ENABLE_FORK_SUPPORT'] = 'true'
methods.TestCase.%s.run_test({ methods.TestCase.%s.run_test({
'server_host': 'localhost', 'server_host': 'localhost',
'server_port': %d, 'server_port': %d,
'use_tls': False 'use_tls': False
}) })
""" """
_SUBPROCESS_TIMEOUT_S = 30 _SUBPROCESS_TIMEOUT_S = 80
_GDB_TIMEOUT_S = 60
@unittest.skipUnless( @unittest.skipUnless(
sys.platform.startswith("linux"), sys.platform.startswith("linux"),
"not supported on windows, and fork+exec networking blocked on mac") "not supported on windows, and fork+exec networking blocked on mac")
@unittest.skipUnless(
os.getenv("GRPC_ENABLE_FORK_SUPPORT") is not None,
"Core must be built with fork support to run this test.")
class ForkInteropTest(unittest.TestCase): class ForkInteropTest(unittest.TestCase):
def setUp(self): def setUp(self):
self._port = None
start_server_script = """if True: start_server_script = """if True:
import sys import sys
import time import time
@ -70,23 +92,36 @@ class ForkInteropTest(unittest.TestCase):
while True: while True:
time.sleep(1) time.sleep(1)
""" """
streams = tuple(tempfile.TemporaryFile() for _ in range(2)) self._streams = tuple(tempfile.TemporaryFile() for _ in range(2))
self._server_process = subprocess.Popen( self._server_process = subprocess.Popen(
[sys.executable, '-c', start_server_script], [sys.executable, '-c', start_server_script],
stdout=streams[0], stdout=self._streams[0],
stderr=streams[1]) stderr=self._streams[1])
timer = threading.Timer(_SUBPROCESS_TIMEOUT_S, timer = threading.Timer(_SUBPROCESS_TIMEOUT_S,
self._server_process.kill) self._server_process.kill)
interval_secs = 2.0
cumulative_secs = 0.0
try: try:
timer.start() timer.start()
while True: while cumulative_secs < _SUBPROCESS_TIMEOUT_S:
streams[0].seek(0) self._streams[0].seek(0)
s = streams[0].readline() s = self._streams[0].readline()
if not s: if s:
continue
else:
self._port = int(s) self._port = int(s)
break break
time.sleep(interval_secs)
cumulative_secs += interval_secs
if self._port is None:
# Timeout
self._streams[0].seek(0)
sys.stderr.write("Server STDOUT:\n{}\n".format(
self._streams[0].read()))
self._streams[1].seek(0)
sys.stderr.write("Server STDERR:\n{}\n".format(
self._streams[1].read()))
sys.stderr.flush()
raise Exception("Failed to get port from server.")
except ValueError: except ValueError:
raise Exception('Failed to get port from server') raise Exception('Failed to get port from server')
finally: finally:
@ -131,6 +166,37 @@ class ForkInteropTest(unittest.TestCase):
def tearDown(self): def tearDown(self):
self._server_process.kill() self._server_process.kill()
for stream in self._streams:
stream.close()
def _print_backtraces(self, pid):
cmd = [
"gdb",
"-ex",
"set confirm off",
"-ex",
"echo attaching",
"-ex",
"attach {}".format(pid),
"-ex",
"echo print_backtrace",
"-ex",
"thread apply all bt",
"-ex",
"echo printed_backtrace",
"-ex",
"quit",
]
streams = tuple(tempfile.TemporaryFile() for _ in range(2))
sys.stderr.write("Invoking gdb\n")
sys.stderr.flush()
process = subprocess.Popen(cmd, stdout=streams[0], stderr=streams[1])
try:
process.wait(timeout=_GDB_TIMEOUT_S)
except subprocess.TimeoutExpired:
sys.stderr.write("gdb stacktrace generation timed out.\n")
finally:
_dump_streams("gdb", streams)
def _verifyTestCase(self, test_case): def _verifyTestCase(self, test_case):
script = _CLIENT_FORK_SCRIPT_TEMPLATE % (test_case.name, self._port) script = _CLIENT_FORK_SCRIPT_TEMPLATE % (test_case.name, self._port)
@ -138,18 +204,16 @@ class ForkInteropTest(unittest.TestCase):
process = subprocess.Popen([sys.executable, '-c', script], process = subprocess.Popen([sys.executable, '-c', script],
stdout=streams[0], stdout=streams[0],
stderr=streams[1]) stderr=streams[1])
timer = threading.Timer(_SUBPROCESS_TIMEOUT_S, process.kill) try:
timer.start() process.wait(timeout=_SUBPROCESS_TIMEOUT_S)
process.wait() self.assertEqual(0, process.returncode)
timer.cancel() except subprocess.TimeoutExpired:
outputs = [] self._print_backtraces(process.pid)
for stream in streams: process.kill()
stream.seek(0) raise AssertionError("Parent process timed out.")
outputs.append(stream.read()) finally:
self.assertEqual( _dump_streams("Parent", streams)
0, process.returncode, _dump_streams("Server", self._streams)
'process failed with exit code %d (stdout: "%s", stderr: "%s")' %
(process.returncode, outputs[0], outputs[1]))
if __name__ == '__main__': if __name__ == '__main__':

@ -19,8 +19,12 @@ import logging
import multiprocessing import multiprocessing
import os import os
import queue import queue
import subprocess
import sys
import tempfile
import threading import threading
import time import time
import traceback
import grpc import grpc
@ -30,7 +34,8 @@ from src.proto.grpc.testing import test_pb2_grpc
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
_RPC_TIMEOUT_S = 10 _RPC_TIMEOUT_S = 10
_CHILD_FINISH_TIMEOUT_S = 60 _CHILD_FINISH_TIMEOUT_S = 20
_GDB_TIMEOUT_S = 60
def _channel(args): def _channel(args):
@ -58,6 +63,7 @@ def _async_unary(stub):
response_type=messages_pb2.COMPRESSABLE, response_type=messages_pb2.COMPRESSABLE,
response_size=size, response_size=size,
payload=messages_pb2.Payload(body=b'\x00' * 271828)) payload=messages_pb2.Payload(body=b'\x00' * 271828))
response_future = stub.UnaryCall.future(request, timeout=_RPC_TIMEOUT_S) response_future = stub.UnaryCall.future(request, timeout=_RPC_TIMEOUT_S)
response = response_future.result() response = response_future.result()
_validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
@ -118,27 +124,113 @@ class _ChildProcess(object):
if args is None: if args is None:
args = () args = ()
self._exceptions = multiprocessing.Queue() self._exceptions = multiprocessing.Queue()
self._stdout_path = tempfile.mkstemp()[1]
self._stderr_path = tempfile.mkstemp()[1]
self._child_pid = None
self._rc = None
self._args = args
self._task = task
def record_exceptions(): def _child_main(self):
try: import faulthandler
task(*args) faulthandler.enable(all_threads=True)
except grpc.RpcError as rpc_error:
self._exceptions.put('RpcError: %s' % rpc_error)
except Exception as e: # pylint: disable=broad-except
self._exceptions.put(e)
self._process = multiprocessing.Process(target=record_exceptions) try:
self._task(*self._args)
except grpc.RpcError as rpc_error:
traceback.print_exc()
self._exceptions.put('RpcError: %s' % rpc_error)
except Exception as e: # pylint: disable=broad-except
traceback.print_exc()
self._exceptions.put(e)
sys.exit(0)
def _orchestrate_child_gdb(self):
cmd = [
"gdb",
"-ex",
"set confirm off",
"-ex",
"attach {}".format(os.getpid()),
"-ex",
"set follow-fork-mode child",
"-ex",
"continue",
"-ex",
"bt",
]
streams = tuple(tempfile.TemporaryFile() for _ in range(2))
sys.stderr.write("Invoking gdb\n")
sys.stderr.flush()
process = subprocess.Popen(cmd, stdout=sys.stderr, stderr=sys.stderr)
time.sleep(5)
def start(self): def start(self):
self._process.start() # NOTE: Try uncommenting the following line if the child is segfaulting.
# self._orchestrate_child_gdb()
ret = os.fork()
if ret == 0:
self._child_main()
else:
self._child_pid = ret
def wait(self, timeout):
total = 0.0
wait_interval = 1.0
while total < timeout:
ret, termination = os.waitpid(self._child_pid, os.WNOHANG)
if ret == self._child_pid:
self._rc = termination
return True
time.sleep(wait_interval)
total += wait_interval
else:
return False
def _print_backtraces(self):
cmd = [
"gdb",
"-ex",
"set confirm off",
"-ex",
"echo attaching",
"-ex",
"attach {}".format(self._child_pid),
"-ex",
"echo print_backtrace",
"-ex",
"thread apply all bt",
"-ex",
"echo printed_backtrace",
"-ex",
"quit",
]
streams = tuple(tempfile.TemporaryFile() for _ in range(2))
sys.stderr.write("Invoking gdb\n")
sys.stderr.flush()
process = subprocess.Popen(cmd, stdout=streams[0], stderr=streams[1])
try:
process.wait(timeout=_GDB_TIMEOUT_S)
except subprocess.TimeoutExpired:
sys.stderr.write("gdb stacktrace generation timed out.\n")
finally:
for stream_name, stream in zip(("STDOUT", "STDERR"), streams):
stream.seek(0)
sys.stderr.write("gdb {}:\n{}\n".format(
stream_name,
stream.read().decode("ascii")))
stream.close()
sys.stderr.flush()
def finish(self): def finish(self):
self._process.join(timeout=_CHILD_FINISH_TIMEOUT_S) terminated = self.wait(_CHILD_FINISH_TIMEOUT_S)
if self._process.is_alive(): sys.stderr.write("Exit code: {}\n".format(self._rc))
if not terminated:
self._print_backtraces()
raise RuntimeError('Child process did not terminate') raise RuntimeError('Child process did not terminate')
if self._process.exitcode != 0: if self._rc != 0:
raise ValueError('Child process failed with exitcode %d' % raise ValueError('Child process failed with exitcode %d' % self._rc)
self._process.exitcode)
try: try:
exception = self._exceptions.get(block=False) exception = self._exceptions.get(block=False)
raise ValueError('Child process failed: "%s": "%s"' % raise ValueError('Child process failed: "%s": "%s"' %
@ -449,3 +541,12 @@ class TestCase(enum.Enum):
raise NotImplementedError('Test case "%s" not implemented!' % raise NotImplementedError('Test case "%s" not implemented!' %
self.name) self.name)
channel.close() channel.close()
# Useful if needing to find a block of code from an address in an SO.
def dump_object_map():
with open("/proc/self/maps", "r") as f:
sys.stderr.write("=============== /proc/self/maps ===============\n")
sys.stderr.write(f.read())
sys.stderr.write("\n")
sys.stderr.flush()

@ -0,0 +1,33 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from libcpp cimport bool
cdef extern from "absl/debugging/failure_signal_handler.h" namespace "absl":
ctypedef struct FailureSignalHandlerOptions "::absl::FailureSignalHandlerOptions":
bool symbolize_stacktrace
bool use_alternate_stack
int alarm_on_failure_secs
bool call_previous_handler
void (*writerfn)(const char*)
void InstallFailureSignalHandler(const FailureSignalHandlerOptions& options)
def install_failure_signal_handler(**kwargs):
cdef FailureSignalHandlerOptions opts = FailureSignalHandlerOptions(
True, False, -1, False, NULL
)
InstallFailureSignalHandler(opts)

@ -46,6 +46,11 @@ build:fuzzer_asan --linkopt=-fsanitize=fuzzer,address
build:fuzzer_asan --action_env=ASAN_OPTIONS=detect_leaks=1:color=always build:fuzzer_asan --action_env=ASAN_OPTIONS=detect_leaks=1:color=always
build:fuzzer_asan --action_env=LSAN_OPTIONS=suppressions=test/core/util/lsan_suppressions.txt:report_objects=1 build:fuzzer_asan --action_env=LSAN_OPTIONS=suppressions=test/core/util/lsan_suppressions.txt:report_objects=1
build:fork_support --cxxopt=-DGRPC_ENABLE_FORK_SUPPORT=1
build:fork_support --cxxopt=-DGRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK=1
build:fork_support --action_env=GRPC_ENABLE_FORK_SUPPORT=1
# We have a separate ASAN config for Mac OS to workaround a couple of bugs: # We have a separate ASAN config for Mac OS to workaround a couple of bugs:
# 1. https://github.com/bazelbuild/bazel/issues/6932 # 1. https://github.com/bazelbuild/bazel/issues/6932
# _FORTIFY_SOURCE=1 is enabled by default on Mac OS, which breaks ASAN. # _FORTIFY_SOURCE=1 is enabled by default on Mac OS, which breaks ASAN.

@ -0,0 +1,26 @@
#!/usr/bin/env bash
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -ex
RESULTSTORE_RESULTS_FLAG="--bazelrc=tools/remote_build/include/test_locally_with_resultstore_results.bazelrc"
BAZEL_FLAGS="--test_output=errors"
python3 tools/run_tests/python_utils/bazel_report_helper.py --report_path python_bazel_tests_fork_support
# TODO(https://github.com/grpc/grpc/issues/32207): Pull this out into a
# separate Kokoro job so we can do more runs without impacting overall PR
# presubmit duration.
python_bazel_tests_fork_support/bazel_wrapper ${RESULTSTORE_RESULTS_FLAG} test --config=fork_support --runs_per_test=16 ${BAZEL_FLAGS} //src/python/grpcio_tests/tests/fork:fork_test

@ -27,3 +27,9 @@ python_bazel_tests_single_threaded_unary_streams/bazel_wrapper ${RESULTSTORE_RES
python3 tools/run_tests/python_utils/bazel_report_helper.py --report_path python_bazel_tests_poller_engine python3 tools/run_tests/python_utils/bazel_report_helper.py --report_path python_bazel_tests_poller_engine
python_bazel_tests_poller_engine/bazel_wrapper ${RESULTSTORE_RESULTS_FLAG} test --config=python_poller_engine ${BAZEL_FLAGS} ${TEST_TARGETS} python_bazel_tests_poller_engine/bazel_wrapper ${RESULTSTORE_RESULTS_FLAG} test --config=python_poller_engine ${BAZEL_FLAGS} ${TEST_TARGETS}
python3 tools/run_tests/python_utils/bazel_report_helper.py --report_path python_bazel_tests_fork_support
# TODO(https://github.com/grpc/grpc/issues/32207): Remove from this job once
# the fork job is in the master dashboard.
python_bazel_tests_fork_support/bazel_wrapper ${RESULTSTORE_RESULTS_FLAG} test --config=fork_support --runs_per_test=16 ${BAZEL_FLAGS} //src/python/grpcio_tests/tests/fork:fork_test

@ -0,0 +1,30 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Config file for the internal CI (in protobuf text format)
# Location of the continuous shell script in repository.
build_file: "grpc/tools/internal_ci/linux/grpc_bazel.sh"
timeout_mins: 240
action {
define_artifacts {
regex: "**/*sponge_log.*"
regex: "github/grpc/reports/**"
}
}
env_vars {
key: "BAZEL_SCRIPT"
value: "tools/internal_ci/linux/grpc_python_bazel_test_fork_in_docker.sh"
}
Loading…
Cancel
Save