mirror of https://github.com/krallin/tini.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
268 lines
8.9 KiB
268 lines
8.9 KiB
#!/usr/bin/env python3 |
|
# coding:utf-8 |
|
import os |
|
import sys |
|
import signal |
|
import subprocess |
|
import time |
|
import psutil |
|
import bitmap |
|
import re |
|
import itertools |
|
import tempfile |
|
|
|
DEVNULL = open(os.devnull, "wb") |
|
|
|
SIGNUM_TO_SIGNAME = dict( |
|
(v, k) for k, v in signal.__dict__.items() if re.match("^SIG[A-Z]+$", k) |
|
) |
|
|
|
|
|
def busy_wait(condition_callable, timeout): |
|
checks = 100 |
|
increment = float(timeout) / checks |
|
|
|
for _ in range(checks): |
|
if condition_callable(): |
|
return |
|
time.sleep(increment) |
|
|
|
assert False, "Condition was never met" |
|
|
|
|
|
def main(): |
|
src = os.environ["SOURCE_DIR"] |
|
build = os.environ["BUILD_DIR"] |
|
|
|
args_disabled = os.environ.get("MINIMAL") |
|
|
|
proxy = os.path.join(src, "test", "subreaper-proxy.py") |
|
tini = os.path.join(build, "tini") |
|
|
|
subreaper_support = bool(int(os.environ["FORCE_SUBREAPER"])) |
|
|
|
# Run the exit code test. We use POSIXLY_CORRECT here to not need -- |
|
# until that's the default in Tini anyways. |
|
if not args_disabled: |
|
print("Running exit code test for {0}".format(tini)) |
|
for code in range(0, 256): |
|
p = subprocess.Popen( |
|
[tini, "-e", str(code), "--", "sh", "-c", "exit {0}".format(code)], |
|
stdout=DEVNULL, |
|
stderr=DEVNULL, |
|
universal_newlines=True, |
|
) |
|
ret = p.wait() |
|
assert ret == 0, "Inclusive exit code test failed for %s, exit: %s" % ( |
|
code, |
|
ret, |
|
) |
|
|
|
other_codes = [x for x in range(0, 256) if x != code] |
|
args = list(itertools.chain(*[["-e", str(x)] for x in other_codes])) |
|
|
|
p = subprocess.Popen( |
|
[tini] + args + ["sh", "-c", "exit {0}".format(code)], |
|
env=dict(os.environ, POSIXLY_CORRECT="1"), |
|
stdout=DEVNULL, |
|
stderr=DEVNULL, |
|
universal_newlines=True, |
|
) |
|
ret = p.wait() |
|
assert ret == code, "Exclusive exit code test failed for %s, exit: %s" % ( |
|
code, |
|
ret, |
|
) |
|
|
|
tests = [([proxy, tini], {})] |
|
|
|
if subreaper_support: |
|
if not args_disabled: |
|
tests.append(([tini, "-s"], {})) |
|
tests.append(([tini], {"TINI_SUBREAPER": ""})) |
|
|
|
for target, env in tests: |
|
# Run the reaping test |
|
print("Running reaping test ({0} with env {1})".format(" ".join(target), env)) |
|
p = subprocess.Popen( |
|
target + [os.path.join(src, "test", "reaping", "stage_1.py")], |
|
env=dict(os.environ, **env), |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
universal_newlines=True, |
|
) |
|
|
|
out, err = p.communicate() |
|
|
|
if subreaper_support: |
|
# If subreaper support sin't available, Tini won't looku p its subreaper bit |
|
# and will output the error message here. |
|
assert "zombie reaping won't work" not in err, "Warning message was output!" |
|
ret = p.wait() |
|
assert ( |
|
"Reaped zombie process with pid=" not in err |
|
), "Warning message was output!" |
|
assert ret == 0, "Reaping test failed!\nOUT: %s\nERR: %s" % (out, err) |
|
|
|
if not args_disabled: |
|
print( |
|
"Running reaping display test ({0} with env {1})".format( |
|
" ".join(target), env |
|
) |
|
) |
|
p = subprocess.Popen( |
|
target + ["-w", os.path.join(src, "test", "reaping", "stage_1.py")], |
|
env=dict(os.environ, **env), |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
universal_newlines=True, |
|
) |
|
|
|
out, err = p.communicate() |
|
ret = p.wait() |
|
assert ( |
|
"Reaped zombie process with pid=" in err |
|
), "Warning message was output!" |
|
assert ret == 0, "Reaping display test failed!\nOUT: %s\nERR: %s" % ( |
|
out, |
|
err, |
|
) |
|
|
|
# Run the signals test |
|
for signum in [signal.SIGTERM, signal.SIGUSR1, signal.SIGUSR2]: |
|
print( |
|
"running signal test for: {0} ({1} with env {2})".format( |
|
signum, " ".join(target), env |
|
) |
|
) |
|
p = subprocess.Popen( |
|
target + [os.path.join(src, "test", "signals", "test.py")], |
|
env=dict(os.environ, **env), |
|
universal_newlines=True, |
|
) |
|
busy_wait( |
|
lambda: len(psutil.Process(p.pid).children(recursive=True)) > 1, 10 |
|
) |
|
p.send_signal(signum) |
|
ret = p.wait() |
|
assert ( |
|
ret == 128 + signum |
|
), "Signals test failed (ret was {0}, expected {1})".format( |
|
ret, 128 + signum |
|
) |
|
|
|
# Run the process group test |
|
# This test has Tini spawn a process that ignores SIGUSR1 and spawns a child that doesn't (and waits on the child) |
|
# We send SIGUSR1 to Tini, and expect the grand-child to terminate, then the child, and then Tini. |
|
if not args_disabled: |
|
print("Running process group test (arguments)") |
|
p = subprocess.Popen( |
|
[tini, "-g", os.path.join(src, "test", "pgroup", "stage_1.py")], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
universal_newlines=True, |
|
) |
|
|
|
busy_wait(lambda: len(psutil.Process(p.pid).children(recursive=True)) == 2, 10) |
|
p.send_signal(signal.SIGUSR1) |
|
busy_wait(lambda: p.poll() is not None, 10) |
|
|
|
print("Running process group test (environment variable)") |
|
p = subprocess.Popen( |
|
[tini, os.path.join(src, "test", "pgroup", "stage_1.py")], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
env=dict(os.environ, TINI_KILL_PROCESS_GROUP="1"), |
|
universal_newlines=True, |
|
) |
|
|
|
busy_wait(lambda: len(psutil.Process(p.pid).children(recursive=True)) == 2, 10) |
|
p.send_signal(signal.SIGUSR1) |
|
busy_wait(lambda: p.poll() is not None, 10) |
|
|
|
# Run failing test. Force verbosity to 1 so we see the subreaper warning |
|
# regardless of whether MINIMAL is set. |
|
print("Running zombie reaping failure test (Tini should warn)") |
|
p = subprocess.Popen( |
|
[tini, os.path.join(src, "test", "reaping", "stage_1.py")], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
env=dict(os.environ, TINI_VERBOSITY="1"), |
|
universal_newlines=True, |
|
) |
|
out, err = p.communicate() |
|
assert "zombie reaping won't work" in err, "No warning message was output!" |
|
ret = p.wait() |
|
assert ret == 1, "Reaping test succeeded (it should have failed)!" |
|
|
|
# Test that the signals are properly in place here. |
|
print("Running signal configuration test") |
|
|
|
p = subprocess.Popen( |
|
[os.path.join(build, "sigconf-test"), tini, "cat", "/proc/self/status"], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
universal_newlines=True, |
|
) |
|
out, err = p.communicate() |
|
|
|
# Extract the signal properties, and add a zero at the end. |
|
props = [line.split(":") for line in out.splitlines()] |
|
props = [(k.strip(), v.strip()) for (k, v) in props] |
|
props = [ |
|
(k, bitmap.BitMap.fromstring(bin(int(v, 16))[2:].zfill(32))) |
|
for (k, v) in props |
|
if k in ["SigBlk", "SigIgn", "SigCgt"] |
|
] |
|
props = dict(props) |
|
|
|
# Print actual handling configuration |
|
for k, bmp in props.items(): |
|
print( |
|
"{0}: {1}".format( |
|
k, |
|
", ".join( |
|
[ |
|
"{0} ({1})".format(SIGNUM_TO_SIGNAME[n + 1], n + 1) |
|
for n in bmp.nonzero() |
|
] |
|
), |
|
) |
|
) |
|
|
|
for signal_set_name, signals_to_test_for in [ |
|
("SigIgn", [signal.SIGTTOU, signal.SIGSEGV, signal.SIGINT]), |
|
("SigBlk", [signal.SIGTTIN, signal.SIGILL, signal.SIGTERM]), |
|
]: |
|
for signum in signals_to_test_for: |
|
# Use signum - 1 because the bitmap is 0-indexed but represents signals strting at 1 |
|
assert (signum - 1) in props[ |
|
signal_set_name |
|
].nonzero(), "{0} ({1}) is missing in {2}!".format( |
|
SIGNUM_TO_SIGNAME[signum], signum, signal_set_name |
|
) |
|
|
|
# Test parent death signal handling. |
|
if not args_disabled: |
|
print("Running parent death signal test") |
|
f = tempfile.NamedTemporaryFile() |
|
try: |
|
p = subprocess.Popen( |
|
[os.path.join(src, "test", "pdeathsignal", "stage_1.py"), tini, f.name], |
|
stdout=DEVNULL, |
|
stderr=DEVNULL, |
|
universal_newlines=True, |
|
) |
|
p.wait() |
|
|
|
busy_wait(lambda: open(f.name).read() == "ok", 10) |
|
finally: |
|
f.close() |
|
|
|
print("---------------------------") |
|
print("All done, tests as expected") |
|
print("---------------------------") |
|
|
|
|
|
if __name__ == "__main__": |
|
main()
|
|
|