Merge branch 'faster_runtests' into %s

pull/7025/head
Craig Tiller 9 years ago
commit 019021b39f
  1. 68
      tools/run_tests/jobset.py
  2. 63
      tools/run_tests/run_tests.py

@ -29,7 +29,6 @@
"""Run a group of subprocesses and then finish.""" """Run a group of subprocesses and then finish."""
import hashlib
import multiprocessing import multiprocessing
import os import os
import platform import platform
@ -149,7 +148,7 @@ def which(filename):
class JobSpec(object): class JobSpec(object):
"""Specifies what to run for a job.""" """Specifies what to run for a job."""
def __init__(self, cmdline, shortname=None, environ=None, hash_targets=None, def __init__(self, cmdline, shortname=None, environ=None,
cwd=None, shell=False, timeout_seconds=5*60, flake_retries=0, cwd=None, shell=False, timeout_seconds=5*60, flake_retries=0,
timeout_retries=0, kill_handler=None, cpu_cost=1.0, timeout_retries=0, kill_handler=None, cpu_cost=1.0,
verbose_success=False): verbose_success=False):
@ -157,19 +156,14 @@ class JobSpec(object):
Arguments: Arguments:
cmdline: a list of arguments to pass as the command line cmdline: a list of arguments to pass as the command line
environ: a dictionary of environment variables to set in the child process environ: a dictionary of environment variables to set in the child process
hash_targets: which files to include in the hash representing the jobs version
(or empty, indicating the job should not be hashed)
kill_handler: a handler that will be called whenever job.kill() is invoked kill_handler: a handler that will be called whenever job.kill() is invoked
cpu_cost: number of cores per second this job needs cpu_cost: number of cores per second this job needs
""" """
if environ is None: if environ is None:
environ = {} environ = {}
if hash_targets is None:
hash_targets = []
self.cmdline = cmdline self.cmdline = cmdline
self.environ = environ self.environ = environ
self.shortname = cmdline[0] if shortname is None else shortname self.shortname = cmdline[0] if shortname is None else shortname
self.hash_targets = hash_targets or []
self.cwd = cwd self.cwd = cwd
self.shell = shell self.shell = shell
self.timeout_seconds = timeout_seconds self.timeout_seconds = timeout_seconds
@ -180,7 +174,7 @@ class JobSpec(object):
self.verbose_success = verbose_success self.verbose_success = verbose_success
def identity(self): def identity(self):
return '%r %r %r' % (self.cmdline, self.environ, self.hash_targets) return '%r %r' % (self.cmdline, self.environ)
def __hash__(self): def __hash__(self):
return hash(self.identity()) return hash(self.identity())
@ -205,9 +199,8 @@ class JobResult(object):
class Job(object): class Job(object):
"""Manages one job.""" """Manages one job."""
def __init__(self, spec, bin_hash, newline_on_success, travis, add_env): def __init__(self, spec, newline_on_success, travis, add_env):
self._spec = spec self._spec = spec
self._bin_hash = bin_hash
self._newline_on_success = newline_on_success self._newline_on_success = newline_on_success
self._travis = travis self._travis = travis
self._add_env = add_env.copy() self._add_env = add_env.copy()
@ -249,7 +242,7 @@ class Job(object):
self._process = try_start() self._process = try_start()
self._state = _RUNNING self._state = _RUNNING
def state(self, update_cache): def state(self):
"""Poll current state of the job. Prints messages at completion.""" """Poll current state of the job. Prints messages at completion."""
def stdout(self=self): def stdout(self=self):
self._tempfile.seek(0) self._tempfile.seek(0)
@ -293,8 +286,6 @@ class Job(object):
stdout() if self._spec.verbose_success else None, stdout() if self._spec.verbose_success else None,
do_newline=self._newline_on_success or self._travis) do_newline=self._newline_on_success or self._travis)
self.result.state = 'PASSED' self.result.state = 'PASSED'
if self._bin_hash:
update_cache.finished(self._spec.identity(), self._bin_hash)
elif (self._state == _RUNNING and elif (self._state == _RUNNING and
self._spec.timeout_seconds is not None and self._spec.timeout_seconds is not None and
time.time() - self._start > self._spec.timeout_seconds): time.time() - self._start > self._spec.timeout_seconds):
@ -329,7 +320,7 @@ class Jobset(object):
"""Manages one run of jobs.""" """Manages one run of jobs."""
def __init__(self, check_cancelled, maxjobs, newline_on_success, travis, def __init__(self, check_cancelled, maxjobs, newline_on_success, travis,
stop_on_failure, add_env, cache): stop_on_failure, add_env):
self._running = set() self._running = set()
self._check_cancelled = check_cancelled self._check_cancelled = check_cancelled
self._cancelled = False self._cancelled = False
@ -338,9 +329,7 @@ class Jobset(object):
self._maxjobs = maxjobs self._maxjobs = maxjobs
self._newline_on_success = newline_on_success self._newline_on_success = newline_on_success
self._travis = travis self._travis = travis
self._cache = cache
self._stop_on_failure = stop_on_failure self._stop_on_failure = stop_on_failure
self._hashes = {}
self._add_env = add_env self._add_env = add_env
self.resultset = {} self.resultset = {}
self._remaining = None self._remaining = None
@ -367,37 +356,21 @@ class Jobset(object):
if current_cpu_cost + spec.cpu_cost <= self._maxjobs: break if current_cpu_cost + spec.cpu_cost <= self._maxjobs: break
self.reap() self.reap()
if self.cancelled(): return False if self.cancelled(): return False
if spec.hash_targets: job = Job(spec,
if spec.identity() in self._hashes: self._newline_on_success,
bin_hash = self._hashes[spec.identity()] self._travis,
else: self._add_env)
bin_hash = hashlib.sha1() self._running.add(job)
for fn in spec.hash_targets: if not self.resultset.has_key(job.GetSpec().shortname):
with open(which(fn)) as f: self.resultset[job.GetSpec().shortname] = []
bin_hash.update(f.read()) return True
bin_hash = bin_hash.hexdigest()
self._hashes[spec.identity()] = bin_hash
should_run = self._cache.should_run(spec.identity(), bin_hash)
else:
bin_hash = None
should_run = True
if should_run:
job = Job(spec,
bin_hash,
self._newline_on_success,
self._travis,
self._add_env)
self._running.add(job)
if not self.resultset.has_key(job.GetSpec().shortname):
self.resultset[job.GetSpec().shortname] = []
return True
def reap(self): def reap(self):
"""Collect the dead jobs.""" """Collect the dead jobs."""
while self._running: while self._running:
dead = set() dead = set()
for job in self._running: for job in self._running:
st = job.state(self._cache) st = job.state()
if st == _RUNNING: continue if st == _RUNNING: continue
if st == _FAILURE or st == _KILLED: if st == _FAILURE or st == _KILLED:
self._failures += 1 self._failures += 1
@ -450,15 +423,6 @@ def _never_cancelled():
return False return False
# cache class that caches nothing
class NoCache(object):
def should_run(self, cmdline, bin_hash):
return True
def finished(self, cmdline, bin_hash):
pass
def tag_remaining(xs): def tag_remaining(xs):
staging = [] staging = []
for x in xs: for x in xs:
@ -477,12 +441,10 @@ def run(cmdlines,
travis=False, travis=False,
infinite_runs=False, infinite_runs=False,
stop_on_failure=False, stop_on_failure=False,
cache=None,
add_env={}): add_env={}):
js = Jobset(check_cancelled, js = Jobset(check_cancelled,
maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS, maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS,
newline_on_success, travis, stop_on_failure, add_env, newline_on_success, travis, stop_on_failure, add_env)
cache if cache is not None else NoCache())
for cmdline, remaining in tag_remaining(cmdlines): for cmdline, remaining in tag_remaining(cmdlines):
if not js.start(cmdline): if not js.start(cmdline):
break break

@ -33,7 +33,6 @@
import argparse import argparse
import ast import ast
import glob import glob
import hashlib
import itertools import itertools
import json import json
import multiprocessing import multiprocessing
@ -78,24 +77,18 @@ class Config(object):
if environ is None: if environ is None:
environ = {} environ = {}
self.build_config = config self.build_config = config
self.allow_hashing = (config != 'gcov')
self.environ = environ self.environ = environ
self.environ['CONFIG'] = config self.environ['CONFIG'] = config
self.tool_prefix = tool_prefix self.tool_prefix = tool_prefix
self.timeout_multiplier = timeout_multiplier self.timeout_multiplier = timeout_multiplier
def job_spec(self, cmdline, hash_targets, timeout_seconds=5*60, def job_spec(self, cmdline, timeout_seconds=5*60,
shortname=None, environ={}, cpu_cost=1.0, flaky=False): shortname=None, environ={}, cpu_cost=1.0, flaky=False):
"""Construct a jobset.JobSpec for a test under this config """Construct a jobset.JobSpec for a test under this config
Args: Args:
cmdline: a list of strings specifying the command line the test cmdline: a list of strings specifying the command line the test
would like to run would like to run
hash_targets: either None (don't do caching of test results), or
a list of strings specifying files to include in a
binary hash to check if a test has changed
-- if used, all artifacts needed to run the test must
be listed
""" """
actual_environ = self.environ.copy() actual_environ = self.environ.copy()
for k, v in environ.iteritems(): for k, v in environ.iteritems():
@ -105,8 +98,6 @@ class Config(object):
environ=actual_environ, environ=actual_environ,
cpu_cost=cpu_cost, cpu_cost=cpu_cost,
timeout_seconds=(self.timeout_multiplier * timeout_seconds if timeout_seconds else None), timeout_seconds=(self.timeout_multiplier * timeout_seconds if timeout_seconds else None),
hash_targets=hash_targets
if self.allow_hashing else None,
flake_retries=5 if flaky or args.allow_flakes else 0, flake_retries=5 if flaky or args.allow_flakes else 0,
timeout_retries=3 if args.allow_flakes else 0) timeout_retries=3 if args.allow_flakes else 0)
@ -425,7 +416,7 @@ class PythonLanguage(object):
return [] return []
def build_steps(self): def build_steps(self):
return [['tools/run_tests/build_python.sh', tox_env] return [['tools/run_tests/build_python.sh', tox_env]
for tox_env in self._tox_envs] for tox_env in self._tox_envs]
def post_tests_steps(self): def post_tests_steps(self):
@ -1058,46 +1049,6 @@ runs_per_test = args.runs_per_test
forever = args.forever forever = args.forever
class TestCache(object):
"""Cache for running tests."""
def __init__(self, use_cache_results):
self._last_successful_run = {}
self._use_cache_results = use_cache_results
self._last_save = time.time()
def should_run(self, cmdline, bin_hash):
if cmdline not in self._last_successful_run:
return True
if self._last_successful_run[cmdline] != bin_hash:
return True
if not self._use_cache_results:
return True
return False
def finished(self, cmdline, bin_hash):
self._last_successful_run[cmdline] = bin_hash
if time.time() - self._last_save > 1:
self.save()
def dump(self):
return [{'cmdline': k, 'hash': v}
for k, v in self._last_successful_run.iteritems()]
def parse(self, exdump):
self._last_successful_run = dict((o['cmdline'], o['hash']) for o in exdump)
def save(self):
with open('.run_tests_cache', 'w') as f:
f.write(json.dumps(self.dump()))
self._last_save = time.time()
def maybe_load(self):
if os.path.exists('.run_tests_cache'):
with open('.run_tests_cache') as f:
self.parse(json.loads(f.read()))
def _start_port_server(port_server_port): def _start_port_server(port_server_port):
# check if a compatible port server is running # check if a compatible port server is running
# if incompatible (version mismatch) ==> start a new one # if incompatible (version mismatch) ==> start a new one
@ -1217,7 +1168,7 @@ class BuildAndRunError(object):
# returns a list of things that failed (or an empty list on success) # returns a list of things that failed (or an empty list on success)
def _build_and_run( def _build_and_run(
check_cancelled, newline_on_success, cache, xml_report=None, build_only=False): check_cancelled, newline_on_success, xml_report=None, build_only=False):
"""Do one pass of building & running tests.""" """Do one pass of building & running tests."""
# build latest sequentially # build latest sequentially
num_failures, resultset = jobset.run( num_failures, resultset = jobset.run(
@ -1266,7 +1217,6 @@ def _build_and_run(
all_runs, check_cancelled, newline_on_success=newline_on_success, all_runs, check_cancelled, newline_on_success=newline_on_success,
travis=args.travis, infinite_runs=infinite_runs, maxjobs=args.jobs, travis=args.travis, infinite_runs=infinite_runs, maxjobs=args.jobs,
stop_on_failure=args.stop_on_failure, stop_on_failure=args.stop_on_failure,
cache=cache if not xml_report else None,
add_env={'GRPC_TEST_PORT_SERVER': 'localhost:%d' % port_server_port}) add_env={'GRPC_TEST_PORT_SERVER': 'localhost:%d' % port_server_port})
if resultset: if resultset:
for k, v in sorted(resultset.items()): for k, v in sorted(resultset.items()):
@ -1295,14 +1245,9 @@ def _build_and_run(
if num_test_failures: if num_test_failures:
out.append(BuildAndRunError.TEST) out.append(BuildAndRunError.TEST)
if cache: cache.save()
return out return out
test_cache = TestCache(runs_per_test == 1)
test_cache.maybe_load()
if forever: if forever:
success = True success = True
while True: while True:
@ -1312,7 +1257,6 @@ if forever:
previous_success = success previous_success = success
errors = _build_and_run(check_cancelled=have_files_changed, errors = _build_and_run(check_cancelled=have_files_changed,
newline_on_success=False, newline_on_success=False,
cache=test_cache,
build_only=args.build_only) == 0 build_only=args.build_only) == 0
if not previous_success and not errors: if not previous_success and not errors:
jobset.message('SUCCESS', jobset.message('SUCCESS',
@ -1324,7 +1268,6 @@ if forever:
else: else:
errors = _build_and_run(check_cancelled=lambda: False, errors = _build_and_run(check_cancelled=lambda: False,
newline_on_success=args.newline_on_success, newline_on_success=args.newline_on_success,
cache=test_cache,
xml_report=args.xml_report, xml_report=args.xml_report,
build_only=args.build_only) build_only=args.build_only)
if not errors: if not errors:

Loading…
Cancel
Save