Various cosmetic improvements to run_tests.py

-) Adding a new command line flag to be able to see which tests are passing.
  -) Adding more status when the tool is idle in forever mode.
  -) Adding a last status when the tool is finished so to avoid leaving the console in a weird state.
  -) Adding a status message in the forever mode if the previous run failed.
  -) Swapped the message and its explanation, so you don't have to scroll up to see which test failed.
  -) Fixed a race condition in the watch_dir.py code if a file is deleted during the loop.
pull/37/head
Nicolas Noble 10 years ago
parent e55e1155d3
commit 044db7422a
  1. 37
      tools/run_tests/jobset.py
  2. 21
      tools/run_tests/run_tests.py
  3. 7
      tools/run_tests/watch_dirs.py

@ -40,9 +40,11 @@ _KILLED = object()
_COLORS = {
'red': 31,
'green': 32,
'yellow': 33,
'red': [ 31, 0 ],
'green': [ 32, 0 ],
'yellow': [ 33, 0 ],
'lightgray': [ 37, 0],
'gray': [ 30, 1 ],
}
@ -53,32 +55,37 @@ _CLEAR_LINE = '\x1b[2K'
_TAG_COLOR = {
'FAILED': 'red',
'PASSED': 'green',
'START': 'yellow',
'START': 'gray',
'WAITING': 'yellow',
'SUCCESS': 'green',
'IDLE': 'gray',
}
def message(tag, message, explanatory_text=None):
sys.stdout.write('%s%s\x1b[%dm%s\x1b[0m: %s%s' % (
def message(tag, message, explanatory_text=None, do_newline=False):
sys.stdout.write('%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % (
_BEGINNING_OF_LINE,
_CLEAR_LINE,
_COLORS[_TAG_COLOR[tag]],
'\n%s' % explanatory_text if explanatory_text is not None else '',
_COLORS[_TAG_COLOR[tag]][1],
_COLORS[_TAG_COLOR[tag]][0],
tag,
message,
'\n%s\n' % explanatory_text if explanatory_text is not None else ''))
'\n' if do_newline or explanatory_text is not None else ''))
sys.stdout.flush()
class Job(object):
"""Manages one job."""
def __init__(self, cmdline):
def __init__(self, cmdline, newline_on_success):
self._cmdline = ' '.join(cmdline)
self._tempfile = tempfile.TemporaryFile()
self._process = subprocess.Popen(args=cmdline,
stderr=subprocess.STDOUT,
stdout=self._tempfile)
self._state = _RUNNING
self._newline_on_success = newline_on_success
message('START', self._cmdline)
def state(self):
@ -91,7 +98,7 @@ class Job(object):
message('FAILED', '%s [ret=%d]' % (self._cmdline, self._process.returncode), stdout)
else:
self._state = _SUCCESS
message('PASSED', '%s' % self._cmdline)
message('PASSED', '%s' % self._cmdline, do_newline=self._newline_on_success)
return self._state
def kill(self):
@ -103,13 +110,14 @@ class Job(object):
class Jobset(object):
"""Manages one run of jobs."""
def __init__(self, check_cancelled, maxjobs):
def __init__(self, check_cancelled, maxjobs, newline_on_success):
self._running = set()
self._check_cancelled = check_cancelled
self._cancelled = False
self._failures = 0
self._completed = 0
self._maxjobs = maxjobs
self._newline_on_success = newline_on_success
def start(self, cmdline):
"""Start a job. Return True on success, False on failure."""
@ -117,7 +125,7 @@ class Jobset(object):
if self.cancelled(): return False
self.reap()
if self.cancelled(): return False
self._running.add(Job(cmdline))
self._running.add(Job(cmdline, self._newline_on_success))
return True
def reap(self):
@ -157,9 +165,10 @@ def _never_cancelled():
return False
def run(cmdlines, check_cancelled=_never_cancelled, maxjobs=None):
def run(cmdlines, check_cancelled=_never_cancelled, maxjobs=None, newline_on_success=False):
js = Jobset(check_cancelled,
maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS)
maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS,
newline_on_success)
for cmdline in shuffle_iteratable(cmdlines):
if not js.start(cmdline):
break

@ -56,6 +56,10 @@ argp.add_argument('-f', '--forever',
default=False,
action='store_const',
const=True)
argp.add_argument('--newline_on_success',
default=False,
action='store_const',
const=True)
args = argp.parse_args()
# grab config
@ -69,7 +73,7 @@ runs_per_test = args.runs_per_test
forever = args.forever
def _build_and_run(check_cancelled):
def _build_and_run(check_cancelled, newline_on_success, forever=False):
"""Do one pass of building & running tests."""
# build latest, sharing cpu between the various makes
if not jobset.run(
@ -88,20 +92,29 @@ def _build_and_run(check_cancelled):
for x in itertools.chain.from_iterable(itertools.repeat(
glob.glob('bins/%s/%s_test' % (
config.build_config, filt)),
runs_per_test))), check_cancelled):
runs_per_test))), check_cancelled, newline_on_success=newline_on_success):
if not forever:
jobset.message('FAILED', 'Some tests failed', do_newline=True)
return 2
if not forever:
jobset.message('SUCCESS', 'All tests passed', do_newline=True)
return 0
if forever:
success = True
while True:
dw = watch_dirs.DirWatcher(['src', 'include', 'test'])
initial_time = dw.most_recent_change()
have_files_changed = lambda: dw.most_recent_change() != initial_time
_build_and_run(have_files_changed)
previous_success = success
success = _build_and_run(have_files_changed, newline_on_success=False, forever=True) == 0
if not previous_success and success:
jobset.message('SUCCESS', 'All tests are now passing properly', do_newline=True)
jobset.message('IDLE', 'No change detected')
while not have_files_changed():
time.sleep(1)
else:
sys.exit(_build_and_run(lambda: False))
sys.exit(_build_and_run(lambda: False, newline_on_success=args.newline_on_success))

@ -25,7 +25,12 @@ class DirWatcher(object):
continue
for root, _, files in os.walk(path):
for f in files:
st = os.stat(os.path.join(root, f))
try:
st = os.stat(os.path.join(root, f))
except OSError as e:
if e.errno == os.errno.ENOENT:
continue
raise
if most_recent_change is None:
most_recent_change = st.st_mtime
else:

Loading…
Cancel
Save