@ -1,5 +1,5 @@
#!/usr/bin/env python
#!/usr/bin/env python3
#coding:utf-8
# coding:utf-8
import os
import os
import sys
import sys
import signal
import signal
@ -11,16 +11,18 @@ import re
import itertools
import itertools
import tempfile
import tempfile
DEVNULL = open ( os . devnull , ' wb ' )
DEVNULL = open ( os . devnull , " wb " )
SIGNUM_TO_SIGNAME = dict ( ( v , k ) for k , v in signal . __dict__ . items ( ) if re . match ( " ^SIG[A-Z]+$ " , k ) )
SIGNUM_TO_SIGNAME = dict (
( v , k ) for k , v in signal . __dict__ . items ( ) if re . match ( " ^SIG[A-Z]+$ " , k )
)
def busy_wait ( condition_callable , timeout ) :
def busy_wait ( condition_callable , timeout ) :
checks = 100
checks = 100
increment = float ( timeout ) / checks
increment = float ( timeout ) / checks
for _ in x range( checks ) :
for _ in range ( checks ) :
if condition_callable ( ) :
if condition_callable ( ) :
return
return
time . sleep ( increment )
time . sleep ( increment )
@ -42,27 +44,37 @@ def main():
# Run the exit code test. We use POSIXLY_CORRECT here to not need --
# Run the exit code test. We use POSIXLY_CORRECT here to not need --
# until that's the default in Tini anyways.
# until that's the default in Tini anyways.
if not args_disabled :
if not args_disabled :
print " Running exit code test for {0} " . format ( tini )
print ( " Running exit code test for {0} " . format ( tini ) )
for code in range ( 0 , 256 ) :
for code in range ( 0 , 256 ) :
p = subprocess . Popen (
p = subprocess . Popen (
[ tini , ' -e ' , str ( code ) , ' -- ' , ' sh ' , ' -c ' , " exit {0} " . format ( code ) ] ,
[ tini , " -e " , str ( code ) , " -- " , " sh " , " -c " , " exit {0} " . format ( code ) ] ,
stdout = DEVNULL , stderr = DEVNULL
stdout = DEVNULL ,
stderr = DEVNULL ,
universal_newlines = True ,
)
)
ret = p . wait ( )
ret = p . wait ( )
assert ret == 0 , " Inclusive exit code test failed for %s , exit: %s " % ( code , ret )
assert ret == 0 , " Inclusive exit code test failed for %s , exit: %s " % (
code ,
ret ,
)
other_codes = [ x for x in range ( 0 , 256 ) if x != code ]
other_codes = [ x for x in range ( 0 , 256 ) if x != code ]
args = list ( itertools . chain ( * [ [ ' -e ' , str ( x ) ] for x in other_codes ] ) )
args = list ( itertools . chain ( * [ [ " -e " , str ( x ) ] for x in other_codes ] ) )
p = subprocess . Popen (
p = subprocess . Popen (
[ tini ] + args + [ ' sh ' , ' -c ' , " exit {0} " . format ( code ) ] ,
[ tini ] + args + [ " sh " , " -c " , " exit {0} " . format ( code ) ] ,
env = dict ( os . environ , POSIXLY_CORRECT = " 1 " ) ,
env = dict ( os . environ , POSIXLY_CORRECT = " 1 " ) ,
stdout = DEVNULL , stderr = DEVNULL
stdout = DEVNULL ,
stderr = DEVNULL ,
universal_newlines = True ,
)
)
ret = p . wait ( )
ret = p . wait ( )
assert ret == code , " Exclusive exit code test failed for %s , exit: %s " % ( code , ret )
assert ret == code , " Exclusive exit code test failed for %s , exit: %s " % (
code ,
ret ,
)
tests = [ ( [ proxy , tini ] , { } ) , ]
tests = [ ( [ proxy , tini ] , { } ) ]
if subreaper_support :
if subreaper_support :
if not args_disabled :
if not args_disabled :
@ -71,10 +83,14 @@ def main():
for target , env in tests :
for target , env in tests :
# Run the reaping test
# Run the reaping test
print " Running reaping test ( {0} with env {1} ) " . format ( " " . join ( target ) , env )
print ( " Running reaping test ( {0} with env {1} ) " . format ( " " . join ( target ) , env ) )
p = subprocess . Popen ( target + [ os . path . join ( src , " test " , " reaping " , " stage_1.py " ) ] ,
p = subprocess . Popen (
target + [ os . path . join ( src , " test " , " reaping " , " stage_1.py " ) ] ,
env = dict ( os . environ , * * env ) ,
env = dict ( os . environ , * * env ) ,
stdout = subprocess . PIPE , stderr = subprocess . PIPE )
stdout = subprocess . PIPE ,
stderr = subprocess . PIPE ,
universal_newlines = True ,
)
out , err = p . communicate ( )
out , err = p . communicate ( )
@ -83,48 +99,81 @@ def main():
# and will output the error message here.
# and will output the error message here.
assert " zombie reaping won ' t work " not in err , " Warning message was output! "
assert " zombie reaping won ' t work " not in err , " Warning message was output! "
ret = p . wait ( )
ret = p . wait ( )
assert " Reaped zombie process with pid= " not in err , " Warning message was output! "
assert (
" Reaped zombie process with pid= " not in err
) , " Warning message was output! "
assert ret == 0 , " Reaping test failed! \n OUT: %s \n ERR: %s " % ( out , err )
assert ret == 0 , " Reaping test failed! \n OUT: %s \n ERR: %s " % ( out , err )
if not args_disabled :
if not args_disabled :
print " Running reaping display test ( {0} with env {1} ) " . format ( " " . join ( target ) , env )
print (
p = subprocess . Popen ( target + [ " -w " , os . path . join ( src , " test " , " reaping " , " stage_1.py " ) ] ,
" Running reaping display test ( {0} with env {1} ) " . format (
" " . join ( target ) , env
)
)
p = subprocess . Popen (
target + [ " -w " , os . path . join ( src , " test " , " reaping " , " stage_1.py " ) ] ,
env = dict ( os . environ , * * env ) ,
env = dict ( os . environ , * * env ) ,
stdout = subprocess . PIPE , stderr = subprocess . PIPE )
stdout = subprocess . PIPE ,
stderr = subprocess . PIPE ,
universal_newlines = True ,
)
out , err = p . communicate ( )
out , err = p . communicate ( )
ret = p . wait ( )
ret = p . wait ( )
assert " Reaped zombie process with pid= " in err , " Warning message was output! "
assert (
assert ret == 0 , " Reaping display test failed! \n OUT: %s \n ERR: %s " % ( out , err )
" Reaped zombie process with pid= " in err
) , " Warning message was output! "
assert ret == 0 , " Reaping display test failed! \n OUT: %s \n ERR: %s " % (
out ,
err ,
)
# Run the signals test
# Run the signals test
for signum in [ signal . SIGTERM , signal . SIGUSR1 , signal . SIGUSR2 ] :
for signum in [ signal . SIGTERM , signal . SIGUSR1 , signal . SIGUSR2 ] :
print " running signal test for: {0} ( {1} with env {2} ) " . format ( signum , " " . join ( target ) , env )
print (
p = subprocess . Popen ( target + [ os . path . join ( src , " test " , " signals " , " test.py " ) ] , env = dict ( os . environ , * * env ) )
" running signal test for: {0} ( {1} with env {2} ) " . format (
busy_wait ( lambda : len ( psutil . Process ( p . pid ) . children ( recursive = True ) ) > 1 , 10 )
signum , " " . join ( target ) , env
)
)
p = subprocess . Popen (
target + [ os . path . join ( src , " test " , " signals " , " test.py " ) ] ,
env = dict ( os . environ , * * env ) ,
universal_newlines = True ,
)
busy_wait (
lambda : len ( psutil . Process ( p . pid ) . children ( recursive = True ) ) > 1 , 10
)
p . send_signal ( signum )
p . send_signal ( signum )
ret = p . wait ( )
ret = p . wait ( )
assert ret == 128 + signum , " Signals test failed (ret was {0} , expected {1} ) " . format ( ret , 128 + signum )
assert (
ret == 128 + signum
) , " Signals test failed (ret was {0} , expected {1} ) " . format (
ret , 128 + signum
)
# Run the process group test
# Run the process group test
# This test has Tini spawn a process that ignores SIGUSR1 and spawns a child that doesn't (and waits on the child)
# This test has Tini spawn a process that ignores SIGUSR1 and spawns a child that doesn't (and waits on the child)
# We send SIGUSR1 to Tini, and expect the grand-child to terminate, then the child, and then Tini.
# We send SIGUSR1 to Tini, and expect the grand-child to terminate, then the child, and then Tini.
if not args_disabled :
if not args_disabled :
print " Running process group test (arguments) "
print ( " Running process group test (arguments) " )
p = subprocess . Popen ( [ tini , ' -g ' , os . path . join ( src , " test " , " pgroup " , " stage_1.py " ) ] , stdout = subprocess . PIPE , stderr = subprocess . PIPE )
p = subprocess . Popen (
[ tini , " -g " , os . path . join ( src , " test " , " pgroup " , " stage_1.py " ) ] ,
stdout = subprocess . PIPE ,
stderr = subprocess . PIPE ,
universal_newlines = True ,
)
busy_wait ( lambda : len ( psutil . Process ( p . pid ) . children ( recursive = True ) ) == 2 , 10 )
busy_wait ( lambda : len ( psutil . Process ( p . pid ) . children ( recursive = True ) ) == 2 , 10 )
p . send_signal ( signal . SIGUSR1 )
p . send_signal ( signal . SIGUSR1 )
busy_wait ( lambda : p . poll ( ) is not None , 10 )
busy_wait ( lambda : p . poll ( ) is not None , 10 )
print " Running process group test (environment variable) "
print ( " Running process group test (environment variable) " )
p = subprocess . Popen (
p = subprocess . Popen (
[ tini , os . path . join ( src , " test " , " pgroup " , " stage_1.py " ) ] ,
[ tini , os . path . join ( src , " test " , " pgroup " , " stage_1.py " ) ] ,
stdout = subprocess . PIPE , stderr = subprocess . PIPE ,
stdout = subprocess . PIPE ,
env = dict ( os . environ , TINI_KILL_PROCESS_GROUP = " 1 " )
stderr = subprocess . PIPE ,
env = dict ( os . environ , TINI_KILL_PROCESS_GROUP = " 1 " ) ,
universal_newlines = True ,
)
)
busy_wait ( lambda : len ( psutil . Process ( p . pid ) . children ( recursive = True ) ) == 2 , 10 )
busy_wait ( lambda : len ( psutil . Process ( p . pid ) . children ( recursive = True ) ) == 2 , 10 )
@ -133,11 +182,13 @@ def main():
# Run failing test. Force verbosity to 1 so we see the subreaper warning
# Run failing test. Force verbosity to 1 so we see the subreaper warning
# regardless of whether MINIMAL is set.
# regardless of whether MINIMAL is set.
print " Running zombie reaping failure test (Tini should warn) "
print ( " Running zombie reaping failure test (Tini should warn) " )
p = subprocess . Popen (
p = subprocess . Popen (
[ tini , os . path . join ( src , " test " , " reaping " , " stage_1.py " ) ] ,
[ tini , os . path . join ( src , " test " , " reaping " , " stage_1.py " ) ] ,
stdout = subprocess . PIPE , stderr = subprocess . PIPE ,
stdout = subprocess . PIPE ,
env = { ' TINI_VERBOSITY ' : ' 1 ' }
stderr = subprocess . PIPE ,
env = dict ( os . environ , TINI_VERBOSITY = " 1 " ) ,
universal_newlines = True ,
)
)
out , err = p . communicate ( )
out , err = p . communicate ( )
assert " zombie reaping won ' t work " in err , " No warning message was output! "
assert " zombie reaping won ' t work " in err , " No warning message was output! "
@ -145,37 +196,62 @@ def main():
assert ret == 1 , " Reaping test succeeded (it should have failed)! "
assert ret == 1 , " Reaping test succeeded (it should have failed)! "
# Test that the signals are properly in place here.
# Test that the signals are properly in place here.
print " Running signal configuration test "
print ( " Running signal configuration test " )
p = subprocess . Popen ( [ os . path . join ( build , " sigconf-test " ) , tini , " cat " , " /proc/self/status " ] , stdout = subprocess . PIPE , stderr = subprocess . PIPE )
p = subprocess . Popen (
[ os . path . join ( build , " sigconf-test " ) , tini , " cat " , " /proc/self/status " ] ,
stdout = subprocess . PIPE ,
stderr = subprocess . PIPE ,
universal_newlines = True ,
)
out , err = p . communicate ( )
out , err = p . communicate ( )
# Extract the signal properties, and add a zero at the end.
# Extract the signal properties, and add a zero at the end.
props = [ line . split ( " : " ) for line in out . splitlines ( ) ]
props = [ line . split ( " : " ) for line in out . splitlines ( ) ]
props = [ ( k . strip ( ) , v . strip ( ) ) for ( k , v ) in props ]
props = [ ( k . strip ( ) , v . strip ( ) ) for ( k , v ) in props ]
props = [ ( k , bitmap . BitMap . fromstring ( bin ( int ( v , 16 ) ) [ 2 : ] . zfill ( 32 ) ) ) for ( k , v ) in props if k in [ " SigBlk " , " SigIgn " , " SigCgt " ] ]
props = [
( k , bitmap . BitMap . fromstring ( bin ( int ( v , 16 ) ) [ 2 : ] . zfill ( 32 ) ) )
for ( k , v ) in props
if k in [ " SigBlk " , " SigIgn " , " SigCgt " ]
]
props = dict ( props )
props = dict ( props )
# Print actual handling configuration
# Print actual handling configuration
for k , bmp in props . items ( ) :
for k , bmp in props . items ( ) :
print " {0} : {1} " . format ( k , " , " . join ( [ " {0} ( {1} ) " . format ( SIGNUM_TO_SIGNAME [ n + 1 ] , n + 1 ) for n in bmp . nonzero ( ) ] ) )
print (
" {0} : {1} " . format (
k ,
" , " . join (
[
" {0} ( {1} ) " . format ( SIGNUM_TO_SIGNAME [ n + 1 ] , n + 1 )
for n in bmp . nonzero ( )
]
) ,
)
)
for signal_set_name , signals_to_test_for in [
for signal_set_name , signals_to_test_for in [
( " SigIgn " , [ signal . SIGTTOU , signal . SIGSEGV , signal . SIGINT , ] ) ,
( " SigIgn " , [ signal . SIGTTOU , signal . SIGSEGV , signal . SIGINT ] ) ,
( " SigBlk " , [ signal . SIGTTIN , signal . SIGILL , signal . SIGTERM , ] ) ,
( " SigBlk " , [ signal . SIGTTIN , signal . SIGILL , signal . SIGTERM ] ) ,
] :
] :
for signum in signals_to_test_for :
for signum in signals_to_test_for :
# Use signum - 1 because the bitmap is 0-indexed but represents signals strting at 1
# Use signum - 1 because the bitmap is 0-indexed but represents signals strting at 1
assert ( signum - 1 ) in props [ signal_set_name ] . nonzero ( ) , " {0} ( {1} ) is missing in {2} ! " . format ( SIGNUM_TO_SIGNAME [ signum ] , signum , signal_set_name )
assert ( signum - 1 ) in props [
signal_set_name
] . nonzero ( ) , " {0} ( {1} ) is missing in {2} ! " . format (
SIGNUM_TO_SIGNAME [ signum ] , signum , signal_set_name
)
# Test parent death signal handling.
# Test parent death signal handling.
if not args_disabled :
if not args_disabled :
print " Running parent death signal test "
print ( " Running parent death signal test " )
f = tempfile . NamedTemporaryFile ( )
f = tempfile . NamedTemporaryFile ( )
try :
try :
p = subprocess . Popen (
p = subprocess . Popen (
[ os . path . join ( src , " test " , " pdeathsignal " , " stage_1.py " ) , tini , f . name ] ,
[ os . path . join ( src , " test " , " pdeathsignal " , " stage_1.py " ) , tini , f . name ] ,
stdout = DEVNULL , stderr = DEVNULL
stdout = DEVNULL ,
stderr = DEVNULL ,
universal_newlines = True ,
)
)
p . wait ( )
p . wait ( )
@ -183,9 +259,9 @@ def main():
finally :
finally :
f . close ( )
f . close ( )
print " --------------------------- "
print ( " --------------------------- " )
print " All done, tests as expected "
print ( " All done, tests as expected " )
print " --------------------------- "
print ( " --------------------------- " )
if __name__ == " __main__ " :
if __name__ == " __main__ " :