Add a --forever flag, to continuously run tests as things change.

Change on 2015/01/07 by ctiller <ctiller@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=83451760
pull/1/merge
ctiller 10 years ago committed by Tim Emiola
parent e5206aace9
commit 3040cb7c43
  1. 155
      tools/run_tests/jobset.py
  2. 57
      tools/run_tests/run_tests.py
  3. 46
      tools/run_tests/watch_dirs.py

@ -4,14 +4,11 @@ import multiprocessing
import random
import subprocess
import sys
import threading
import tempfile
import time
# multiplicative factor to over subscribe CPU cores
# (many tests sleep for a long time)
_OVERSUBSCRIBE = 32
_active_jobs = threading.Semaphore(
multiprocessing.cpu_count() * _OVERSUBSCRIBE)
_output_lock = threading.Lock()
_MAX_JOBS = 16 * multiprocessing.cpu_count()
def shuffle_iteratable(it):
@ -25,7 +22,7 @@ def shuffle_iteratable(it):
p = 1
for val in it:
if random.randint(0, p) == 0:
p *= 2
p = min(p*2, 100)
yield val
else:
nextit.append(val)
@ -36,53 +33,107 @@ def shuffle_iteratable(it):
yield val
_SUCCESS = object()
_FAILURE = object()
_RUNNING = object()
_KILLED = object()
class Job(object):
"""Manages one job."""
def __init__(self, cmdline):
self._cmdline = ' '.join(cmdline)
self._tempfile = tempfile.TemporaryFile()
self._process = subprocess.Popen(args=cmdline,
stderr=subprocess.STDOUT,
stdout=self._tempfile)
self._state = _RUNNING
sys.stdout.write('\x1b[0G\x1b[2K\x1b[33mSTART\x1b[0m: %s' %
self._cmdline)
sys.stdout.flush()
def state(self):
"""Poll current state of the job. Prints messages at completion."""
if self._state == _RUNNING and self._process.poll() is not None:
if self._process.returncode != 0:
self._state = _FAILURE
self._tempfile.seek(0)
stdout = self._tempfile.read()
sys.stdout.write('\x1b[0G\x1b[2K\x1b[31mFAILED\x1b[0m: %s'
' [ret=%d]\n'
'%s\n' % (
self._cmdline, self._process.returncode, stdout))
sys.stdout.flush()
else:
self._state = _SUCCESS
sys.stdout.write('\x1b[0G\x1b[2K\x1b[32mPASSED\x1b[0m: %s' %
self._cmdline)
sys.stdout.flush()
return self._state
def kill(self):
if self._state == _RUNNING:
self._state = _KILLED
self._process.terminate()
class Jobset(object):
"""Manages one run of jobs."""
def __init__(self, cmdlines):
self._cmdlines = shuffle_iteratable(cmdlines)
def __init__(self, check_cancelled):
self._running = set()
self._check_cancelled = check_cancelled
self._cancelled = False
self._failures = 0
def _run_thread(self, cmdline):
try:
# start the process
p = subprocess.Popen(args=cmdline,
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE)
stdout, _ = p.communicate()
# log output (under a lock)
_output_lock.acquire()
try:
if p.returncode != 0:
sys.stdout.write('\x1b[0G\x1b[2K\x1b[31mFAILED\x1b[0m: %s'
' [ret=%d]\n'
'%s\n' % (
' '.join(cmdline), p.returncode,
stdout))
self._failures += 1
else:
sys.stdout.write('\x1b[0G\x1b[2K\x1b[32mPASSED\x1b[0m: %s' %
' '.join(cmdline))
sys.stdout.flush()
finally:
_output_lock.release()
finally:
_active_jobs.release()
def run(self):
threads = []
for cmdline in self._cmdlines:
# cap number of active jobs - release in _run_thread
_active_jobs.acquire()
t = threading.Thread(target=self._run_thread,
args=[cmdline])
t.start()
threads.append(t)
for thread in threads:
thread.join()
return self._failures == 0
def run(cmdlines):
return Jobset(cmdlines).run()
def start(self, cmdline):
"""Start a job. Return True on success, False on failure."""
while len(self._running) >= _MAX_JOBS:
if self.cancelled(): return False
self.reap()
if self.cancelled(): return False
self._running.add(Job(cmdline))
return True
def reap(self):
"""Collect the dead jobs."""
while self._running:
dead = set()
for job in self._running:
st = job.state()
if st == _RUNNING: continue
if st == _FAILURE: self._failures += 1
dead.add(job)
for job in dead:
self._running.remove(job)
if not dead: return
time.sleep(0.1)
def cancelled(self):
"""Poll for cancellation."""
if self._cancelled: return True
if not self._check_cancelled(): return False
for job in self._running:
job.kill()
self._cancelled = True
return True
def finish(self):
while self._running:
if self.cancelled(): pass # poll cancellation
self.reap()
return not self.cancelled() and self._failures == 0
def _never_cancelled():
return False
def run(cmdlines, check_cancelled=_never_cancelled):
js = Jobset(check_cancelled)
for cmdline in shuffle_iteratable(cmdlines):
if not js.start(cmdline):
break
return js.finish()

@ -6,8 +6,10 @@ import glob
import itertools
import multiprocessing
import sys
import time
import jobset
import watch_dirs
# flags required for make for each configuration
_CONFIGS = ['dbg', 'opt', 'tsan', 'msan', 'asan']
@ -20,6 +22,10 @@ argp.add_argument('-c', '--config',
default=['all'])
argp.add_argument('-t', '--test-filter', nargs='*', default=['*'])
argp.add_argument('-n', '--runs_per_test', default=1, type=int)
argp.add_argument('-f', '--forever',
default=False,
action='store_const',
const=True)
args = argp.parse_args()
# grab config
@ -29,21 +35,38 @@ configs = [cfg
for x in args.config)]
filters = args.test_filter
runs_per_test = args.runs_per_test
forever = args.forever
def _build_and_run(check_cancelled):
"""Do one pass of building & running tests."""
# build latest, sharing cpu between the various makes
if not jobset.run(
(['make',
'-j', '%d' % max(multiprocessing.cpu_count() / len(configs), 1),
'buildtests_c',
'CONFIG=%s' % cfg]
for cfg in configs), check_cancelled):
sys.exit(1)
# run all the tests
jobset.run(([x]
for x in itertools.chain.from_iterable(
itertools.chain.from_iterable(itertools.repeat(
glob.glob('bins/%s/%s_test' % (config, filt)),
runs_per_test))
for config in configs
for filt in filters)), check_cancelled)
if forever:
while True:
dw = watch_dirs.DirWatcher(['src', 'include', 'test'])
initial_time = dw.most_recent_change()
have_files_changed = lambda: dw.most_recent_change() != initial_time
_build_and_run(have_files_changed)
while not have_files_changed():
time.sleep(1)
else:
_build_and_run(lambda: False)
# build latest, sharing cpu between the various makes
if not jobset.run(
['make',
'-j', '%d' % max(multiprocessing.cpu_count() / len(configs), 1),
'buildtests_c',
'CONFIG=%s' % cfg]
for cfg in configs):
sys.exit(1)
# run all the tests
jobset.run([x]
for x in itertools.chain.from_iterable(
itertools.chain.from_iterable(itertools.repeat(
glob.glob('bins/%s/%s_test' % (config, filt)),
runs_per_test))
for config in configs
for filt in filters))

@ -0,0 +1,46 @@
"""Helper to watch a (set) of directories for modifications."""
import os
import threading
import time
class DirWatcher(object):
"""Helper to watch a (set) of directories for modifications."""
def __init__(self, paths):
if isinstance(paths, basestring):
paths = [paths]
self._mu = threading.Lock()
self._done = False
self.paths = list(paths)
self.lastrun = time.time()
self._cache = self._calculate()
def _calculate(self):
"""Walk over all subscribed paths, check most recent mtime."""
most_recent_change = None
for path in self.paths:
if not os.path.exists(path):
continue
if not os.path.isdir(path):
continue
for root, _, files in os.walk(path):
for f in files:
st = os.stat(os.path.join(root, f))
if most_recent_change is None:
most_recent_change = st.st_mtime
else:
most_recent_change = max(most_recent_change, st.st_mtime)
return most_recent_change
def most_recent_change(self):
self._mu.acquire()
try:
if time.time() - self.lastrun > 1:
self._cache = self._calculate()
self.lastrun = time.time()
return self._cache
finally:
self._mu.release()
Loading…
Cancel
Save