From 05687c3da9848ab79f08417793965de9bbbb52b0 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Mon, 11 Dec 2017 16:54:47 -0800 Subject: [PATCH] yapf tools/run_tests/python_utils --- tools/distrib/yapf_code.sh | 1 + tools/run_tests/python_utils/antagonist.py | 3 +- tools/run_tests/python_utils/comment_on_pr.py | 33 +- tools/run_tests/python_utils/dockerjob.py | 162 ++-- .../python_utils/filter_pull_request_tests.py | 200 +++-- tools/run_tests/python_utils/jobset.py | 826 +++++++++--------- tools/run_tests/python_utils/port_server.py | 253 +++--- tools/run_tests/python_utils/report_utils.py | 219 ++--- .../python_utils/start_port_server.py | 11 +- .../python_utils/upload_test_results.py | 231 ++--- tools/run_tests/python_utils/watch_dirs.py | 79 +- 11 files changed, 1057 insertions(+), 961 deletions(-) diff --git a/tools/distrib/yapf_code.sh b/tools/distrib/yapf_code.sh index 0db4cf35765..9f8fb13572b 100755 --- a/tools/distrib/yapf_code.sh +++ b/tools/distrib/yapf_code.sh @@ -25,6 +25,7 @@ DIRS=( 'tools/distrib' 'tools/interop_matrix' 'tools/profiling' + 'tools/run_tests/python_utils' ) EXCLUSIONS=( 'grpcio/grpc_*.py' diff --git a/tools/run_tests/python_utils/antagonist.py b/tools/run_tests/python_utils/antagonist.py index 0d79ce09868..a928a4cb007 100755 --- a/tools/run_tests/python_utils/antagonist.py +++ b/tools/run_tests/python_utils/antagonist.py @@ -12,8 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """This is used by run_tests.py to create cpu load on a machine""" while True: - pass + pass diff --git a/tools/run_tests/python_utils/comment_on_pr.py b/tools/run_tests/python_utils/comment_on_pr.py index 21b9bb70859..399c996d4db 100644 --- a/tools/run_tests/python_utils/comment_on_pr.py +++ b/tools/run_tests/python_utils/comment_on_pr.py @@ -16,19 +16,22 @@ import os import json import urllib2 + def comment_on_pr(text): - if 'JENKINS_OAUTH_TOKEN' not in os.environ: - print 'Missing JENKINS_OAUTH_TOKEN env var: not commenting' - return - if 'ghprbPullId' not in os.environ: - print 'Missing ghprbPullId env var: not commenting' - return - req = urllib2.Request( - url = 'https://api.github.com/repos/grpc/grpc/issues/%s/comments' % - os.environ['ghprbPullId'], - data = json.dumps({'body': text}), - headers = { - 'Authorization': 'token %s' % os.environ['JENKINS_OAUTH_TOKEN'], - 'Content-Type': 'application/json', - }) - print urllib2.urlopen(req).read() + if 'JENKINS_OAUTH_TOKEN' not in os.environ: + print 'Missing JENKINS_OAUTH_TOKEN env var: not commenting' + return + if 'ghprbPullId' not in os.environ: + print 'Missing ghprbPullId env var: not commenting' + return + req = urllib2.Request( + url='https://api.github.com/repos/grpc/grpc/issues/%s/comments' % + os.environ['ghprbPullId'], + data=json.dumps({ + 'body': text + }), + headers={ + 'Authorization': 'token %s' % os.environ['JENKINS_OAUTH_TOKEN'], + 'Content-Type': 'application/json', + }) + print urllib2.urlopen(req).read() diff --git a/tools/run_tests/python_utils/dockerjob.py b/tools/run_tests/python_utils/dockerjob.py index 2f5285b26c9..d2941c08118 100755 --- a/tools/run_tests/python_utils/dockerjob.py +++ b/tools/run_tests/python_utils/dockerjob.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """Helpers to run docker instances as jobs.""" from __future__ import print_function @@ -28,102 +27,109 @@ _DEVNULL = open(os.devnull, 'w') def random_name(base_name): - """Randomizes given base name.""" - return '%s_%s' % (base_name, uuid.uuid4()) + """Randomizes given base name.""" + return '%s_%s' % (base_name, uuid.uuid4()) def docker_kill(cid): - """Kills a docker container. Returns True if successful.""" - return subprocess.call(['docker','kill', str(cid)], - stdin=subprocess.PIPE, - stdout=_DEVNULL, - stderr=subprocess.STDOUT) == 0 + """Kills a docker container. Returns True if successful.""" + return subprocess.call( + ['docker', 'kill', str(cid)], + stdin=subprocess.PIPE, + stdout=_DEVNULL, + stderr=subprocess.STDOUT) == 0 def docker_mapped_port(cid, port, timeout_seconds=15): - """Get port mapped to internal given internal port for given container.""" - started = time.time() - while time.time() - started < timeout_seconds: - try: - output = subprocess.check_output('docker port %s %s' % (cid, port), - stderr=_DEVNULL, - shell=True) - return int(output.split(':', 2)[1]) - except subprocess.CalledProcessError as e: - pass - raise Exception('Failed to get exposed port %s for container %s.' % - (port, cid)) + """Get port mapped to internal given internal port for given container.""" + started = time.time() + while time.time() - started < timeout_seconds: + try: + output = subprocess.check_output( + 'docker port %s %s' % (cid, port), stderr=_DEVNULL, shell=True) + return int(output.split(':', 2)[1]) + except subprocess.CalledProcessError as e: + pass + raise Exception('Failed to get exposed port %s for container %s.' % + (port, cid)) def wait_for_healthy(cid, shortname, timeout_seconds): - """Wait timeout_seconds for the container to become healthy""" - started = time.time() - while time.time() - started < timeout_seconds: - try: - output = subprocess.check_output( - ['docker', 'inspect', '--format="{{.State.Health.Status}}"', cid], - stderr=_DEVNULL) - if output.strip('\n') == 'healthy': - return - except subprocess.CalledProcessError as e: - pass - time.sleep(1) - raise Exception('Timed out waiting for %s (%s) to pass health check' % - (shortname, cid)) + """Wait timeout_seconds for the container to become healthy""" + started = time.time() + while time.time() - started < timeout_seconds: + try: + output = subprocess.check_output( + [ + 'docker', 'inspect', '--format="{{.State.Health.Status}}"', + cid + ], + stderr=_DEVNULL) + if output.strip('\n') == 'healthy': + return + except subprocess.CalledProcessError as e: + pass + time.sleep(1) + raise Exception('Timed out waiting for %s (%s) to pass health check' % + (shortname, cid)) def finish_jobs(jobs): - """Kills given docker containers and waits for corresponding jobs to finish""" - for job in jobs: - job.kill(suppress_failure=True) + """Kills given docker containers and waits for corresponding jobs to finish""" + for job in jobs: + job.kill(suppress_failure=True) - while any(job.is_running() for job in jobs): - time.sleep(1) + while any(job.is_running() for job in jobs): + time.sleep(1) def image_exists(image): - """Returns True if given docker image exists.""" - return subprocess.call(['docker','inspect', image], - stdin=subprocess.PIPE, - stdout=_DEVNULL, - stderr=subprocess.STDOUT) == 0 + """Returns True if given docker image exists.""" + return subprocess.call( + ['docker', 'inspect', image], + stdin=subprocess.PIPE, + stdout=_DEVNULL, + stderr=subprocess.STDOUT) == 0 def remove_image(image, skip_nonexistent=False, max_retries=10): - """Attempts to remove docker image with retries.""" - if skip_nonexistent and not image_exists(image): - return True - for attempt in range(0, max_retries): - if subprocess.call(['docker','rmi', '-f', image], - stdin=subprocess.PIPE, - stdout=_DEVNULL, - stderr=subprocess.STDOUT) == 0: - return True - time.sleep(2) - print('Failed to remove docker image %s' % image) - return False + """Attempts to remove docker image with retries.""" + if skip_nonexistent and not image_exists(image): + return True + for attempt in range(0, max_retries): + if subprocess.call( + ['docker', 'rmi', '-f', image], + stdin=subprocess.PIPE, + stdout=_DEVNULL, + stderr=subprocess.STDOUT) == 0: + return True + time.sleep(2) + print('Failed to remove docker image %s' % image) + return False class DockerJob: - """Encapsulates a job""" - - def __init__(self, spec): - self._spec = spec - self._job = jobset.Job(spec, newline_on_success=True, travis=True, add_env={}) - self._container_name = spec.container_name - - def mapped_port(self, port): - return docker_mapped_port(self._container_name, port) - - def wait_for_healthy(self, timeout_seconds): - wait_for_healthy(self._container_name, self._spec.shortname, timeout_seconds) - - def kill(self, suppress_failure=False): - """Sends kill signal to the container.""" - if suppress_failure: - self._job.suppress_failure_message() - return docker_kill(self._container_name) - - def is_running(self): - """Polls a job and returns True if given job is still running.""" - return self._job.state() == jobset._RUNNING + """Encapsulates a job""" + + def __init__(self, spec): + self._spec = spec + self._job = jobset.Job( + spec, newline_on_success=True, travis=True, add_env={}) + self._container_name = spec.container_name + + def mapped_port(self, port): + return docker_mapped_port(self._container_name, port) + + def wait_for_healthy(self, timeout_seconds): + wait_for_healthy(self._container_name, self._spec.shortname, + timeout_seconds) + + def kill(self, suppress_failure=False): + """Sends kill signal to the container.""" + if suppress_failure: + self._job.suppress_failure_message() + return docker_kill(self._container_name) + + def is_running(self): + """Polls a job and returns True if given job is still running.""" + return self._job.state() == jobset._RUNNING diff --git a/tools/run_tests/python_utils/filter_pull_request_tests.py b/tools/run_tests/python_utils/filter_pull_request_tests.py index e880734651e..8e0dc708dd7 100644 --- a/tools/run_tests/python_utils/filter_pull_request_tests.py +++ b/tools/run_tests/python_utils/filter_pull_request_tests.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """Filter out tests based on file differences compared to merge target branch""" from __future__ import print_function @@ -23,24 +22,25 @@ from subprocess import check_output class TestSuite: - """ + """ Contains label to identify job as belonging to this test suite and triggers to identify if changed files are relevant """ - def __init__(self, labels): - """ + + def __init__(self, labels): + """ Build TestSuite to group tests based on labeling :param label: strings that should match a jobs's platform, config, language, or test group """ - self.triggers = [] - self.labels = labels + self.triggers = [] + self.labels = labels - def add_trigger(self, trigger): - """ + def add_trigger(self, trigger): + """ Add a regex to list of triggers that determine if a changed file should run tests :param trigger: regex matching file relevant to tests """ - self.triggers.append(trigger) + self.triggers.append(trigger) # Create test suites @@ -55,10 +55,11 @@ _RUBY_TEST_SUITE = TestSuite(['ruby']) _LINUX_TEST_SUITE = TestSuite(['linux']) _WINDOWS_TEST_SUITE = TestSuite(['windows']) _MACOS_TEST_SUITE = TestSuite(['macos']) -_ALL_TEST_SUITES = [_CORE_TEST_SUITE, _CPP_TEST_SUITE, _CSHARP_TEST_SUITE, - _NODE_TEST_SUITE, _OBJC_TEST_SUITE, _PHP_TEST_SUITE, - _PYTHON_TEST_SUITE, _RUBY_TEST_SUITE, _LINUX_TEST_SUITE, - _WINDOWS_TEST_SUITE, _MACOS_TEST_SUITE] +_ALL_TEST_SUITES = [ + _CORE_TEST_SUITE, _CPP_TEST_SUITE, _CSHARP_TEST_SUITE, _NODE_TEST_SUITE, + _OBJC_TEST_SUITE, _PHP_TEST_SUITE, _PYTHON_TEST_SUITE, _RUBY_TEST_SUITE, + _LINUX_TEST_SUITE, _WINDOWS_TEST_SUITE, _MACOS_TEST_SUITE +] # Dictionary of whitelistable files where the key is a regex matching changed files # and the value is a list of tests that should be run. An empty list means that @@ -66,46 +67,46 @@ _ALL_TEST_SUITES = [_CORE_TEST_SUITE, _CPP_TEST_SUITE, _CSHARP_TEST_SUITE, # match any of these regexes will trigger all tests # DO NOT CHANGE THIS UNLESS YOU KNOW WHAT YOU ARE DOING (be careful even if you do) _WHITELIST_DICT = { - '^doc/': [], - '^examples/': [], - '^include/grpc\+\+/': [_CPP_TEST_SUITE], - '^summerofcode/': [], - '^src/cpp/': [_CPP_TEST_SUITE], - '^src/csharp/': [_CSHARP_TEST_SUITE], - '^src/objective\-c/': [_OBJC_TEST_SUITE], - '^src/php/': [_PHP_TEST_SUITE], - '^src/python/': [_PYTHON_TEST_SUITE], - '^src/ruby/': [_RUBY_TEST_SUITE], - '^templates/': [], - '^test/core/': [_CORE_TEST_SUITE, _CPP_TEST_SUITE], - '^test/cpp/': [_CPP_TEST_SUITE], - '^test/distrib/cpp/': [_CPP_TEST_SUITE], - '^test/distrib/csharp/': [_CSHARP_TEST_SUITE], - '^test/distrib/php/': [_PHP_TEST_SUITE], - '^test/distrib/python/': [_PYTHON_TEST_SUITE], - '^test/distrib/ruby/': [_RUBY_TEST_SUITE], - '^vsprojects/': [_WINDOWS_TEST_SUITE], - 'composer\.json$': [_PHP_TEST_SUITE], - 'config\.m4$': [_PHP_TEST_SUITE], - 'CONTRIBUTING\.md$': [], - 'Gemfile$': [_RUBY_TEST_SUITE], - 'grpc\.def$': [_WINDOWS_TEST_SUITE], - 'grpc\.gemspec$': [_RUBY_TEST_SUITE], - 'gRPC\.podspec$': [_OBJC_TEST_SUITE], - 'gRPC\-Core\.podspec$': [_OBJC_TEST_SUITE], - 'gRPC\-ProtoRPC\.podspec$': [_OBJC_TEST_SUITE], - 'gRPC\-RxLibrary\.podspec$': [_OBJC_TEST_SUITE], - 'INSTALL\.md$': [], - 'LICENSE$': [], - 'MANIFEST\.md$': [], - 'package\.json$': [_PHP_TEST_SUITE], - 'package\.xml$': [_PHP_TEST_SUITE], - 'PATENTS$': [], - 'PYTHON\-MANIFEST\.in$': [_PYTHON_TEST_SUITE], - 'README\.md$': [], - 'requirements\.txt$': [_PYTHON_TEST_SUITE], - 'setup\.cfg$': [_PYTHON_TEST_SUITE], - 'setup\.py$': [_PYTHON_TEST_SUITE] + '^doc/': [], + '^examples/': [], + '^include/grpc\+\+/': [_CPP_TEST_SUITE], + '^summerofcode/': [], + '^src/cpp/': [_CPP_TEST_SUITE], + '^src/csharp/': [_CSHARP_TEST_SUITE], + '^src/objective\-c/': [_OBJC_TEST_SUITE], + '^src/php/': [_PHP_TEST_SUITE], + '^src/python/': [_PYTHON_TEST_SUITE], + '^src/ruby/': [_RUBY_TEST_SUITE], + '^templates/': [], + '^test/core/': [_CORE_TEST_SUITE, _CPP_TEST_SUITE], + '^test/cpp/': [_CPP_TEST_SUITE], + '^test/distrib/cpp/': [_CPP_TEST_SUITE], + '^test/distrib/csharp/': [_CSHARP_TEST_SUITE], + '^test/distrib/php/': [_PHP_TEST_SUITE], + '^test/distrib/python/': [_PYTHON_TEST_SUITE], + '^test/distrib/ruby/': [_RUBY_TEST_SUITE], + '^vsprojects/': [_WINDOWS_TEST_SUITE], + 'composer\.json$': [_PHP_TEST_SUITE], + 'config\.m4$': [_PHP_TEST_SUITE], + 'CONTRIBUTING\.md$': [], + 'Gemfile$': [_RUBY_TEST_SUITE], + 'grpc\.def$': [_WINDOWS_TEST_SUITE], + 'grpc\.gemspec$': [_RUBY_TEST_SUITE], + 'gRPC\.podspec$': [_OBJC_TEST_SUITE], + 'gRPC\-Core\.podspec$': [_OBJC_TEST_SUITE], + 'gRPC\-ProtoRPC\.podspec$': [_OBJC_TEST_SUITE], + 'gRPC\-RxLibrary\.podspec$': [_OBJC_TEST_SUITE], + 'INSTALL\.md$': [], + 'LICENSE$': [], + 'MANIFEST\.md$': [], + 'package\.json$': [_PHP_TEST_SUITE], + 'package\.xml$': [_PHP_TEST_SUITE], + 'PATENTS$': [], + 'PYTHON\-MANIFEST\.in$': [_PYTHON_TEST_SUITE], + 'README\.md$': [], + 'requirements\.txt$': [_PYTHON_TEST_SUITE], + 'setup\.cfg$': [_PYTHON_TEST_SUITE], + 'setup\.py$': [_PYTHON_TEST_SUITE] } # Regex that combines all keys in _WHITELIST_DICT @@ -113,83 +114,88 @@ _ALL_TRIGGERS = "(" + ")|(".join(_WHITELIST_DICT.keys()) + ")" # Add all triggers to their respective test suites for trigger, test_suites in six.iteritems(_WHITELIST_DICT): - for test_suite in test_suites: - test_suite.add_trigger(trigger) + for test_suite in test_suites: + test_suite.add_trigger(trigger) def _get_changed_files(base_branch): - """ + """ Get list of changed files between current branch and base of target merge branch """ - # Get file changes between branch and merge-base of specified branch - # Not combined to be Windows friendly - base_commit = check_output(["git", "merge-base", base_branch, "HEAD"]).rstrip() - return check_output(["git", "diff", base_commit, "--name-only", "HEAD"]).splitlines() + # Get file changes between branch and merge-base of specified branch + # Not combined to be Windows friendly + base_commit = check_output( + ["git", "merge-base", base_branch, "HEAD"]).rstrip() + return check_output( + ["git", "diff", base_commit, "--name-only", "HEAD"]).splitlines() def _can_skip_tests(file_names, triggers): - """ + """ Determines if tests are skippable based on if all files do not match list of regexes :param file_names: list of changed files generated by _get_changed_files() :param triggers: list of regexes matching file name that indicates tests should be run :return: safe to skip tests """ - for file_name in file_names: - if any(re.match(trigger, file_name) for trigger in triggers): - return False - return True + for file_name in file_names: + if any(re.match(trigger, file_name) for trigger in triggers): + return False + return True def _remove_irrelevant_tests(tests, skippable_labels): - """ + """ Filters out tests by config or language - will not remove sanitizer tests :param tests: list of all tests generated by run_tests_matrix.py :param skippable_labels: list of languages and platforms with skippable tests :return: list of relevant tests """ - # test.labels[0] is platform and test.labels[2] is language - # We skip a test if both are considered safe to skip - return [test for test in tests if test.labels[0] not in skippable_labels or \ - test.labels[2] not in skippable_labels] + # test.labels[0] is platform and test.labels[2] is language + # We skip a test if both are considered safe to skip + return [test for test in tests if test.labels[0] not in skippable_labels or \ + test.labels[2] not in skippable_labels] def affects_c_cpp(base_branch): - """ + """ Determines if a pull request's changes affect C/C++. This function exists because there are pull request tests that only test C/C++ code :param base_branch: branch that a pull request is requesting to merge into :return: boolean indicating whether C/C++ changes are made in pull request """ - changed_files = _get_changed_files(base_branch) - # Run all tests if any changed file is not in the whitelist dictionary - for changed_file in changed_files: - if not re.match(_ALL_TRIGGERS, changed_file): - return True - return not _can_skip_tests(changed_files, _CPP_TEST_SUITE.triggers + _CORE_TEST_SUITE.triggers) + changed_files = _get_changed_files(base_branch) + # Run all tests if any changed file is not in the whitelist dictionary + for changed_file in changed_files: + if not re.match(_ALL_TRIGGERS, changed_file): + return True + return not _can_skip_tests( + changed_files, _CPP_TEST_SUITE.triggers + _CORE_TEST_SUITE.triggers) def filter_tests(tests, base_branch): - """ + """ Filters out tests that are safe to ignore :param tests: list of all tests generated by run_tests_matrix.py :return: list of relevant tests """ - print('Finding file differences between gRPC %s branch and pull request...\n' % base_branch) - changed_files = _get_changed_files(base_branch) - for changed_file in changed_files: - print(' %s' % changed_file) - print('') - - # Run all tests if any changed file is not in the whitelist dictionary - for changed_file in changed_files: - if not re.match(_ALL_TRIGGERS, changed_file): - return(tests) - # Figure out which language and platform tests to run - skippable_labels = [] - for test_suite in _ALL_TEST_SUITES: - if _can_skip_tests(changed_files, test_suite.triggers): - for label in test_suite.labels: - print(' %s tests safe to skip' % label) - skippable_labels.append(label) - tests = _remove_irrelevant_tests(tests, skippable_labels) - return tests + print( + 'Finding file differences between gRPC %s branch and pull request...\n' + % base_branch) + changed_files = _get_changed_files(base_branch) + for changed_file in changed_files: + print(' %s' % changed_file) + print('') + + # Run all tests if any changed file is not in the whitelist dictionary + for changed_file in changed_files: + if not re.match(_ALL_TRIGGERS, changed_file): + return (tests) + # Figure out which language and platform tests to run + skippable_labels = [] + for test_suite in _ALL_TEST_SUITES: + if _can_skip_tests(changed_files, test_suite.triggers): + for label in test_suite.labels: + print(' %s tests safe to skip' % label) + skippable_labels.append(label) + tests = _remove_irrelevant_tests(tests, skippable_labels) + return tests diff --git a/tools/run_tests/python_utils/jobset.py b/tools/run_tests/python_utils/jobset.py index 85eef444ef8..424b4404a8d 100755 --- a/tools/run_tests/python_utils/jobset.py +++ b/tools/run_tests/python_utils/jobset.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """Run a group of subprocesses and then finish.""" from __future__ import print_function @@ -28,11 +27,9 @@ import tempfile import time import errno - # cpu cost measurement measure_cpu_costs = False - _DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count() _MAX_RESULT_SIZE = 8192 @@ -42,63 +39,60 @@ _MAX_RESULT_SIZE = 8192 # characters to the PR description, which leak into the environment here # and cause failures. def strip_non_ascii_chars(s): - return ''.join(c for c in s if ord(c) < 128) + return ''.join(c for c in s if ord(c) < 128) def sanitized_environment(env): - sanitized = {} - for key, value in env.items(): - sanitized[strip_non_ascii_chars(key)] = strip_non_ascii_chars(value) - return sanitized + sanitized = {} + for key, value in env.items(): + sanitized[strip_non_ascii_chars(key)] = strip_non_ascii_chars(value) + return sanitized def platform_string(): - if platform.system() == 'Windows': - return 'windows' - elif platform.system()[:7] == 'MSYS_NT': - return 'windows' - elif platform.system() == 'Darwin': - return 'mac' - elif platform.system() == 'Linux': - return 'linux' - else: - return 'posix' + if platform.system() == 'Windows': + return 'windows' + elif platform.system()[:7] == 'MSYS_NT': + return 'windows' + elif platform.system() == 'Darwin': + return 'mac' + elif platform.system() == 'Linux': + return 'linux' + else: + return 'posix' # setup a signal handler so that signal.pause registers 'something' # when a child finishes # not using futures and threading to avoid a dependency on subprocess32 if platform_string() == 'windows': - pass -else: - def alarm_handler(unused_signum, unused_frame): pass +else: - signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None) - signal.signal(signal.SIGALRM, alarm_handler) + def alarm_handler(unused_signum, unused_frame): + pass + signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None) + signal.signal(signal.SIGALRM, alarm_handler) _SUCCESS = object() _FAILURE = object() _RUNNING = object() _KILLED = object() - _COLORS = { - 'red': [ 31, 0 ], - 'green': [ 32, 0 ], - 'yellow': [ 33, 0 ], - 'lightgray': [ 37, 0], - 'gray': [ 30, 1 ], - 'purple': [ 35, 0 ], - 'cyan': [ 36, 0 ] - } - + 'red': [31, 0], + 'green': [32, 0], + 'yellow': [33, 0], + 'lightgray': [37, 0], + 'gray': [30, 1], + 'purple': [35, 0], + 'cyan': [36, 0] +} _BEGINNING_OF_LINE = '\x1b[0G' _CLEAR_LINE = '\x1b[2K' - _TAG_COLOR = { 'FAILED': 'red', 'FLAKE': 'purple', @@ -111,392 +105,436 @@ _TAG_COLOR = { 'SUCCESS': 'green', 'IDLE': 'gray', 'SKIPPED': 'cyan' - } +} _FORMAT = '%(asctime)-15s %(message)s' logging.basicConfig(level=logging.INFO, format=_FORMAT) def eintr_be_gone(fn): - """Run fn until it doesn't stop because of EINTR""" - while True: - try: - return fn() - except IOError, e: - if e.errno != errno.EINTR: - raise - + """Run fn until it doesn't stop because of EINTR""" + while True: + try: + return fn() + except IOError, e: + if e.errno != errno.EINTR: + raise def message(tag, msg, explanatory_text=None, do_newline=False): - if message.old_tag == tag and message.old_msg == msg and not explanatory_text: - return - message.old_tag = tag - message.old_msg = msg - while True: - try: - if platform_string() == 'windows' or not sys.stdout.isatty(): - if explanatory_text: - logging.info(explanatory_text) - logging.info('%s: %s', tag, msg) - else: - sys.stdout.write('%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % ( - _BEGINNING_OF_LINE, - _CLEAR_LINE, - '\n%s' % explanatory_text if explanatory_text is not None else '', - _COLORS[_TAG_COLOR[tag]][1], - _COLORS[_TAG_COLOR[tag]][0], - tag, - msg, - '\n' if do_newline or explanatory_text is not None else '')) - sys.stdout.flush() - return - except IOError, e: - if e.errno != errno.EINTR: - raise + if message.old_tag == tag and message.old_msg == msg and not explanatory_text: + return + message.old_tag = tag + message.old_msg = msg + while True: + try: + if platform_string() == 'windows' or not sys.stdout.isatty(): + if explanatory_text: + logging.info(explanatory_text) + logging.info('%s: %s', tag, msg) + else: + sys.stdout.write('%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % ( + _BEGINNING_OF_LINE, _CLEAR_LINE, '\n%s' % explanatory_text + if explanatory_text is not None else '', + _COLORS[_TAG_COLOR[tag]][1], _COLORS[_TAG_COLOR[tag]][0], + tag, msg, '\n' + if do_newline or explanatory_text is not None else '')) + sys.stdout.flush() + return + except IOError, e: + if e.errno != errno.EINTR: + raise + message.old_tag = '' message.old_msg = '' + def which(filename): - if '/' in filename: - return filename - for path in os.environ['PATH'].split(os.pathsep): - if os.path.exists(os.path.join(path, filename)): - return os.path.join(path, filename) - raise Exception('%s not found' % filename) + if '/' in filename: + return filename + for path in os.environ['PATH'].split(os.pathsep): + if os.path.exists(os.path.join(path, filename)): + return os.path.join(path, filename) + raise Exception('%s not found' % filename) class JobSpec(object): - """Specifies what to run for a job.""" - - def __init__(self, cmdline, shortname=None, environ=None, - cwd=None, shell=False, timeout_seconds=5*60, flake_retries=0, - timeout_retries=0, kill_handler=None, cpu_cost=1.0, - verbose_success=False): - """ + """Specifies what to run for a job.""" + + def __init__(self, + cmdline, + shortname=None, + environ=None, + cwd=None, + shell=False, + timeout_seconds=5 * 60, + flake_retries=0, + timeout_retries=0, + kill_handler=None, + cpu_cost=1.0, + verbose_success=False): + """ Arguments: cmdline: a list of arguments to pass as the command line environ: a dictionary of environment variables to set in the child process kill_handler: a handler that will be called whenever job.kill() is invoked cpu_cost: number of cores per second this job needs """ - if environ is None: - environ = {} - self.cmdline = cmdline - self.environ = environ - self.shortname = cmdline[0] if shortname is None else shortname - self.cwd = cwd - self.shell = shell - self.timeout_seconds = timeout_seconds - self.flake_retries = flake_retries - self.timeout_retries = timeout_retries - self.kill_handler = kill_handler - self.cpu_cost = cpu_cost - self.verbose_success = verbose_success - - def identity(self): - return '%r %r' % (self.cmdline, self.environ) - - def __hash__(self): - return hash(self.identity()) - - def __cmp__(self, other): - return self.identity() == other.identity() - - def __repr__(self): - return 'JobSpec(shortname=%s, cmdline=%s)' % (self.shortname, self.cmdline) - - def __str__(self): - return '%s: %s %s' % (self.shortname, - ' '.join('%s=%s' % kv for kv in self.environ.items()), - ' '.join(self.cmdline)) + if environ is None: + environ = {} + self.cmdline = cmdline + self.environ = environ + self.shortname = cmdline[0] if shortname is None else shortname + self.cwd = cwd + self.shell = shell + self.timeout_seconds = timeout_seconds + self.flake_retries = flake_retries + self.timeout_retries = timeout_retries + self.kill_handler = kill_handler + self.cpu_cost = cpu_cost + self.verbose_success = verbose_success + + def identity(self): + return '%r %r' % (self.cmdline, self.environ) + + def __hash__(self): + return hash(self.identity()) + + def __cmp__(self, other): + return self.identity() == other.identity() + + def __repr__(self): + return 'JobSpec(shortname=%s, cmdline=%s)' % (self.shortname, + self.cmdline) + + def __str__(self): + return '%s: %s %s' % (self.shortname, + ' '.join('%s=%s' % kv + for kv in self.environ.items()), + ' '.join(self.cmdline)) class JobResult(object): - def __init__(self): - self.state = 'UNKNOWN' - self.returncode = -1 - self.elapsed_time = 0 - self.num_failures = 0 - self.retries = 0 - self.message = '' - self.cpu_estimated = 1 - self.cpu_measured = 1 + + def __init__(self): + self.state = 'UNKNOWN' + self.returncode = -1 + self.elapsed_time = 0 + self.num_failures = 0 + self.retries = 0 + self.message = '' + self.cpu_estimated = 1 + self.cpu_measured = 1 def read_from_start(f): - f.seek(0) - return f.read() + f.seek(0) + return f.read() class Job(object): - """Manages one job.""" - - def __init__(self, spec, newline_on_success, travis, add_env, - quiet_success=False): - self._spec = spec - self._newline_on_success = newline_on_success - self._travis = travis - self._add_env = add_env.copy() - self._retries = 0 - self._timeout_retries = 0 - self._suppress_failure_message = False - self._quiet_success = quiet_success - if not self._quiet_success: - message('START', spec.shortname, do_newline=self._travis) - self.result = JobResult() - self.start() - - def GetSpec(self): - return self._spec - - def start(self): - self._tempfile = tempfile.TemporaryFile() - env = dict(os.environ) - env.update(self._spec.environ) - env.update(self._add_env) - env = sanitized_environment(env) - self._start = time.time() - cmdline = self._spec.cmdline - # The Unix time command is finicky when used with MSBuild, so we don't use it - # with jobs that run MSBuild. - global measure_cpu_costs - if measure_cpu_costs and not 'vsprojects\\build' in cmdline[0]: - cmdline = ['time', '-p'] + cmdline - else: - measure_cpu_costs = False - try_start = lambda: subprocess.Popen(args=cmdline, - stderr=subprocess.STDOUT, - stdout=self._tempfile, - cwd=self._spec.cwd, - shell=self._spec.shell, - env=env) - delay = 0.3 - for i in range(0, 4): - try: - self._process = try_start() - break - except OSError: - message('WARNING', 'Failed to start %s, retrying in %f seconds' % (self._spec.shortname, delay)) - time.sleep(delay) - delay *= 2 - else: - self._process = try_start() - self._state = _RUNNING - - def state(self): - """Poll current state of the job. Prints messages at completion.""" - def stdout(self=self): - stdout = read_from_start(self._tempfile) - self.result.message = stdout[-_MAX_RESULT_SIZE:] - return stdout - if self._state == _RUNNING and self._process.poll() is not None: - elapsed = time.time() - self._start - self.result.elapsed_time = elapsed - if self._process.returncode != 0: - if self._retries < self._spec.flake_retries: - message('FLAKE', '%s [ret=%d, pid=%d]' % ( - self._spec.shortname, self._process.returncode, self._process.pid), - stdout(), do_newline=True) - self._retries += 1 - self.result.num_failures += 1 - self.result.retries = self._timeout_retries + self._retries - # NOTE: job is restarted regardless of jobset's max_time setting - self.start() - else: - self._state = _FAILURE - if not self._suppress_failure_message: - message('FAILED', '%s [ret=%d, pid=%d, time=%.1fsec]' % ( - self._spec.shortname, self._process.returncode, self._process.pid, elapsed), - stdout(), do_newline=True) - self.result.state = 'FAILED' - self.result.num_failures += 1 - self.result.returncode = self._process.returncode - else: - self._state = _SUCCESS - measurement = '' - if measure_cpu_costs: - m = re.search(r'real\s+([0-9.]+)\nuser\s+([0-9.]+)\nsys\s+([0-9.]+)', stdout()) - real = float(m.group(1)) - user = float(m.group(2)) - sys = float(m.group(3)) - if real > 0.5: - cores = (user + sys) / real - self.result.cpu_measured = float('%.01f' % cores) - self.result.cpu_estimated = float('%.01f' % self._spec.cpu_cost) - measurement = '; cpu_cost=%.01f; estimated=%.01f' % (self.result.cpu_measured, self.result.cpu_estimated) + """Manages one job.""" + + def __init__(self, + spec, + newline_on_success, + travis, + add_env, + quiet_success=False): + self._spec = spec + self._newline_on_success = newline_on_success + self._travis = travis + self._add_env = add_env.copy() + self._retries = 0 + self._timeout_retries = 0 + self._suppress_failure_message = False + self._quiet_success = quiet_success if not self._quiet_success: - message('PASSED', '%s [time=%.1fsec, retries=%d:%d%s]' % ( - self._spec.shortname, elapsed, self._retries, self._timeout_retries, measurement), - stdout() if self._spec.verbose_success else None, - do_newline=self._newline_on_success or self._travis) - self.result.state = 'PASSED' - elif (self._state == _RUNNING and - self._spec.timeout_seconds is not None and - time.time() - self._start > self._spec.timeout_seconds): - elapsed = time.time() - self._start - self.result.elapsed_time = elapsed - if self._timeout_retries < self._spec.timeout_retries: - message('TIMEOUT_FLAKE', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout(), do_newline=True) - self._timeout_retries += 1 - self.result.num_failures += 1 - self.result.retries = self._timeout_retries + self._retries - if self._spec.kill_handler: - self._spec.kill_handler(self) - self._process.terminate() - # NOTE: job is restarted regardless of jobset's max_time setting + message('START', spec.shortname, do_newline=self._travis) + self.result = JobResult() self.start() - else: - message('TIMEOUT', '%s [pid=%d, time=%.1fsec]' % (self._spec.shortname, self._process.pid, elapsed), stdout(), do_newline=True) - self.kill() - self.result.state = 'TIMEOUT' - self.result.num_failures += 1 - return self._state - def kill(self): - if self._state == _RUNNING: - self._state = _KILLED - if self._spec.kill_handler: - self._spec.kill_handler(self) - self._process.terminate() - - def suppress_failure_message(self): - self._suppress_failure_message = True + def GetSpec(self): + return self._spec + + def start(self): + self._tempfile = tempfile.TemporaryFile() + env = dict(os.environ) + env.update(self._spec.environ) + env.update(self._add_env) + env = sanitized_environment(env) + self._start = time.time() + cmdline = self._spec.cmdline + # The Unix time command is finicky when used with MSBuild, so we don't use it + # with jobs that run MSBuild. + global measure_cpu_costs + if measure_cpu_costs and not 'vsprojects\\build' in cmdline[0]: + cmdline = ['time', '-p'] + cmdline + else: + measure_cpu_costs = False + try_start = lambda: subprocess.Popen(args=cmdline, + stderr=subprocess.STDOUT, + stdout=self._tempfile, + cwd=self._spec.cwd, + shell=self._spec.shell, + env=env) + delay = 0.3 + for i in range(0, 4): + try: + self._process = try_start() + break + except OSError: + message('WARNING', 'Failed to start %s, retrying in %f seconds' + % (self._spec.shortname, delay)) + time.sleep(delay) + delay *= 2 + else: + self._process = try_start() + self._state = _RUNNING + + def state(self): + """Poll current state of the job. Prints messages at completion.""" + + def stdout(self=self): + stdout = read_from_start(self._tempfile) + self.result.message = stdout[-_MAX_RESULT_SIZE:] + return stdout + + if self._state == _RUNNING and self._process.poll() is not None: + elapsed = time.time() - self._start + self.result.elapsed_time = elapsed + if self._process.returncode != 0: + if self._retries < self._spec.flake_retries: + message( + 'FLAKE', + '%s [ret=%d, pid=%d]' % + (self._spec.shortname, self._process.returncode, + self._process.pid), + stdout(), + do_newline=True) + self._retries += 1 + self.result.num_failures += 1 + self.result.retries = self._timeout_retries + self._retries + # NOTE: job is restarted regardless of jobset's max_time setting + self.start() + else: + self._state = _FAILURE + if not self._suppress_failure_message: + message( + 'FAILED', + '%s [ret=%d, pid=%d, time=%.1fsec]' % + (self._spec.shortname, self._process.returncode, + self._process.pid, elapsed), + stdout(), + do_newline=True) + self.result.state = 'FAILED' + self.result.num_failures += 1 + self.result.returncode = self._process.returncode + else: + self._state = _SUCCESS + measurement = '' + if measure_cpu_costs: + m = re.search( + r'real\s+([0-9.]+)\nuser\s+([0-9.]+)\nsys\s+([0-9.]+)', + stdout()) + real = float(m.group(1)) + user = float(m.group(2)) + sys = float(m.group(3)) + if real > 0.5: + cores = (user + sys) / real + self.result.cpu_measured = float('%.01f' % cores) + self.result.cpu_estimated = float('%.01f' % + self._spec.cpu_cost) + measurement = '; cpu_cost=%.01f; estimated=%.01f' % ( + self.result.cpu_measured, self.result.cpu_estimated) + if not self._quiet_success: + message( + 'PASSED', + '%s [time=%.1fsec, retries=%d:%d%s]' % + (self._spec.shortname, elapsed, self._retries, + self._timeout_retries, measurement), + stdout() if self._spec.verbose_success else None, + do_newline=self._newline_on_success or self._travis) + self.result.state = 'PASSED' + elif (self._state == _RUNNING and + self._spec.timeout_seconds is not None and + time.time() - self._start > self._spec.timeout_seconds): + elapsed = time.time() - self._start + self.result.elapsed_time = elapsed + if self._timeout_retries < self._spec.timeout_retries: + message( + 'TIMEOUT_FLAKE', + '%s [pid=%d]' % (self._spec.shortname, self._process.pid), + stdout(), + do_newline=True) + self._timeout_retries += 1 + self.result.num_failures += 1 + self.result.retries = self._timeout_retries + self._retries + if self._spec.kill_handler: + self._spec.kill_handler(self) + self._process.terminate() + # NOTE: job is restarted regardless of jobset's max_time setting + self.start() + else: + message( + 'TIMEOUT', + '%s [pid=%d, time=%.1fsec]' % + (self._spec.shortname, self._process.pid, elapsed), + stdout(), + do_newline=True) + self.kill() + self.result.state = 'TIMEOUT' + self.result.num_failures += 1 + return self._state + + def kill(self): + if self._state == _RUNNING: + self._state = _KILLED + if self._spec.kill_handler: + self._spec.kill_handler(self) + self._process.terminate() + + def suppress_failure_message(self): + self._suppress_failure_message = True class Jobset(object): - """Manages one run of jobs.""" - - def __init__(self, check_cancelled, maxjobs, maxjobs_cpu_agnostic, newline_on_success, travis, - stop_on_failure, add_env, quiet_success, max_time): - self._running = set() - self._check_cancelled = check_cancelled - self._cancelled = False - self._failures = 0 - self._completed = 0 - self._maxjobs = maxjobs - self._maxjobs_cpu_agnostic = maxjobs_cpu_agnostic - self._newline_on_success = newline_on_success - self._travis = travis - self._stop_on_failure = stop_on_failure - self._add_env = add_env - self._quiet_success = quiet_success - self._max_time = max_time - self.resultset = {} - self._remaining = None - self._start_time = time.time() - - def set_remaining(self, remaining): - self._remaining = remaining - - def get_num_failures(self): - return self._failures - - def cpu_cost(self): - c = 0 - for job in self._running: - c += job._spec.cpu_cost - return c - - def start(self, spec): - """Start a job. Return True on success, False on failure.""" - while True: - if self._max_time > 0 and time.time() - self._start_time > self._max_time: - skipped_job_result = JobResult() - skipped_job_result.state = 'SKIPPED' - message('SKIPPED', spec.shortname, do_newline=True) - self.resultset[spec.shortname] = [skipped_job_result] + """Manages one run of jobs.""" + + def __init__(self, check_cancelled, maxjobs, maxjobs_cpu_agnostic, + newline_on_success, travis, stop_on_failure, add_env, + quiet_success, max_time): + self._running = set() + self._check_cancelled = check_cancelled + self._cancelled = False + self._failures = 0 + self._completed = 0 + self._maxjobs = maxjobs + self._maxjobs_cpu_agnostic = maxjobs_cpu_agnostic + self._newline_on_success = newline_on_success + self._travis = travis + self._stop_on_failure = stop_on_failure + self._add_env = add_env + self._quiet_success = quiet_success + self._max_time = max_time + self.resultset = {} + self._remaining = None + self._start_time = time.time() + + def set_remaining(self, remaining): + self._remaining = remaining + + def get_num_failures(self): + return self._failures + + def cpu_cost(self): + c = 0 + for job in self._running: + c += job._spec.cpu_cost + return c + + def start(self, spec): + """Start a job. Return True on success, False on failure.""" + while True: + if self._max_time > 0 and time.time( + ) - self._start_time > self._max_time: + skipped_job_result = JobResult() + skipped_job_result.state = 'SKIPPED' + message('SKIPPED', spec.shortname, do_newline=True) + self.resultset[spec.shortname] = [skipped_job_result] + return True + if self.cancelled(): return False + current_cpu_cost = self.cpu_cost() + if current_cpu_cost == 0: break + if current_cpu_cost + spec.cpu_cost <= self._maxjobs: + if len(self._running) < self._maxjobs_cpu_agnostic: + break + self.reap(spec.shortname, spec.cpu_cost) + if self.cancelled(): return False + job = Job(spec, self._newline_on_success, self._travis, self._add_env, + self._quiet_success) + self._running.add(job) + if job.GetSpec().shortname not in self.resultset: + self.resultset[job.GetSpec().shortname] = [] return True - if self.cancelled(): return False - current_cpu_cost = self.cpu_cost() - if current_cpu_cost == 0: break - if current_cpu_cost + spec.cpu_cost <= self._maxjobs: - if len(self._running) < self._maxjobs_cpu_agnostic: - break - self.reap(spec.shortname, spec.cpu_cost) - if self.cancelled(): return False - job = Job(spec, - self._newline_on_success, - self._travis, - self._add_env, - self._quiet_success) - self._running.add(job) - if job.GetSpec().shortname not in self.resultset: - self.resultset[job.GetSpec().shortname] = [] - return True - - def reap(self, waiting_for=None, waiting_for_cost=None): - """Collect the dead jobs.""" - while self._running: - dead = set() - for job in self._running: - st = eintr_be_gone(lambda: job.state()) - if st == _RUNNING: continue - if st == _FAILURE or st == _KILLED: - self._failures += 1 - if self._stop_on_failure: - self._cancelled = True + + def reap(self, waiting_for=None, waiting_for_cost=None): + """Collect the dead jobs.""" + while self._running: + dead = set() for job in self._running: - job.kill() - dead.add(job) - break - for job in dead: - self._completed += 1 - if not self._quiet_success or job.result.state != 'PASSED': - self.resultset[job.GetSpec().shortname].append(job.result) - self._running.remove(job) - if dead: return - if not self._travis and platform_string() != 'windows': - rstr = '' if self._remaining is None else '%d queued, ' % self._remaining - if self._remaining is not None and self._completed > 0: - now = time.time() - sofar = now - self._start_time - remaining = sofar / self._completed * (self._remaining + len(self._running)) - rstr = 'ETA %.1f sec; %s' % (remaining, rstr) - if waiting_for is not None: - wstr = ' next: %s @ %.2f cpu' % (waiting_for, waiting_for_cost) - else: - wstr = '' - message('WAITING', '%s%d jobs running, %d complete, %d failed (load %.2f)%s' % ( - rstr, len(self._running), self._completed, self._failures, self.cpu_cost(), wstr)) - if platform_string() == 'windows': - time.sleep(0.1) - else: - signal.alarm(10) - signal.pause() - - def cancelled(self): - """Poll for cancellation.""" - if self._cancelled: return True - if not self._check_cancelled(): return False - for job in self._running: - job.kill() - self._cancelled = True - return True - - def finish(self): - while self._running: - if self.cancelled(): pass # poll cancellation - self.reap() - if platform_string() != 'windows': - signal.alarm(0) - return not self.cancelled() and self._failures == 0 + st = eintr_be_gone(lambda: job.state()) + if st == _RUNNING: continue + if st == _FAILURE or st == _KILLED: + self._failures += 1 + if self._stop_on_failure: + self._cancelled = True + for job in self._running: + job.kill() + dead.add(job) + break + for job in dead: + self._completed += 1 + if not self._quiet_success or job.result.state != 'PASSED': + self.resultset[job.GetSpec().shortname].append(job.result) + self._running.remove(job) + if dead: return + if not self._travis and platform_string() != 'windows': + rstr = '' if self._remaining is None else '%d queued, ' % self._remaining + if self._remaining is not None and self._completed > 0: + now = time.time() + sofar = now - self._start_time + remaining = sofar / self._completed * ( + self._remaining + len(self._running)) + rstr = 'ETA %.1f sec; %s' % (remaining, rstr) + if waiting_for is not None: + wstr = ' next: %s @ %.2f cpu' % (waiting_for, + waiting_for_cost) + else: + wstr = '' + message( + 'WAITING', + '%s%d jobs running, %d complete, %d failed (load %.2f)%s' % + (rstr, len(self._running), self._completed, self._failures, + self.cpu_cost(), wstr)) + if platform_string() == 'windows': + time.sleep(0.1) + else: + signal.alarm(10) + signal.pause() + + def cancelled(self): + """Poll for cancellation.""" + if self._cancelled: return True + if not self._check_cancelled(): return False + for job in self._running: + job.kill() + self._cancelled = True + return True + + def finish(self): + while self._running: + if self.cancelled(): pass # poll cancellation + self.reap() + if platform_string() != 'windows': + signal.alarm(0) + return not self.cancelled() and self._failures == 0 def _never_cancelled(): - return False + return False def tag_remaining(xs): - staging = [] - for x in xs: - staging.append(x) - if len(staging) > 5000: - yield (staging.pop(0), None) - n = len(staging) - for i, x in enumerate(staging): - yield (x, n - i - 1) + staging = [] + for x in xs: + staging.append(x) + if len(staging) > 5000: + yield (staging.pop(0), None) + n = len(staging) + for i, x in enumerate(staging): + yield (x, n - i - 1) def run(cmdlines, @@ -511,23 +549,23 @@ def run(cmdlines, skip_jobs=False, quiet_success=False, max_time=-1): - if skip_jobs: - resultset = {} - skipped_job_result = JobResult() - skipped_job_result.state = 'SKIPPED' - for job in cmdlines: - message('SKIPPED', job.shortname, do_newline=True) - resultset[job.shortname] = [skipped_job_result] - return 0, resultset - js = Jobset(check_cancelled, - maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS, - maxjobs_cpu_agnostic if maxjobs_cpu_agnostic is not None else _DEFAULT_MAX_JOBS, - newline_on_success, travis, stop_on_failure, add_env, - quiet_success, max_time) - for cmdline, remaining in tag_remaining(cmdlines): - if not js.start(cmdline): - break - if remaining is not None: - js.set_remaining(remaining) - js.finish() - return js.get_num_failures(), js.resultset + if skip_jobs: + resultset = {} + skipped_job_result = JobResult() + skipped_job_result.state = 'SKIPPED' + for job in cmdlines: + message('SKIPPED', job.shortname, do_newline=True) + resultset[job.shortname] = [skipped_job_result] + return 0, resultset + js = Jobset(check_cancelled, maxjobs if maxjobs is not None else + _DEFAULT_MAX_JOBS, maxjobs_cpu_agnostic + if maxjobs_cpu_agnostic is not None else _DEFAULT_MAX_JOBS, + newline_on_success, travis, stop_on_failure, add_env, + quiet_success, max_time) + for cmdline, remaining in tag_remaining(cmdlines): + if not js.start(cmdline): + break + if remaining is not None: + js.set_remaining(remaining) + js.finish() + return js.get_num_failures(), js.resultset diff --git a/tools/run_tests/python_utils/port_server.py b/tools/run_tests/python_utils/port_server.py index e8ac71af8d7..15f55f46dd8 100755 --- a/tools/run_tests/python_utils/port_server.py +++ b/tools/run_tests/python_utils/port_server.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """Manage TCP ports for unit tests; started by run_tests.py""" import argparse @@ -27,17 +26,14 @@ from SocketServer import ThreadingMixIn import threading import platform - # increment this number whenever making a change to ensure that # the changes are picked up by running CI servers # note that all changes must be backwards compatible _MY_VERSION = 20 - if len(sys.argv) == 2 and sys.argv[1] == 'dump_version': - print _MY_VERSION - sys.exit(0) - + print _MY_VERSION + sys.exit(0) argp = argparse.ArgumentParser(description='Server for httpcli_test') argp.add_argument('-p', '--port', default=12345, type=int) @@ -45,11 +41,11 @@ argp.add_argument('-l', '--logfile', default=None, type=str) args = argp.parse_args() if args.logfile is not None: - sys.stdin.close() - sys.stderr.close() - sys.stdout.close() - sys.stderr = open(args.logfile, 'w') - sys.stdout = sys.stderr + sys.stdin.close() + sys.stderr.close() + sys.stdout.close() + sys.stderr = open(args.logfile, 'w') + sys.stdout = sys.stderr print 'port server running on port %d' % args.port @@ -61,74 +57,81 @@ mu = threading.Lock() # https://cs.chromium.org/chromium/src/net/base/port_util.cc). When one of these # ports is used in a Cronet test, the test would fail (see issue #12149). These # ports must be excluded from pool. -cronet_restricted_ports = [1, 7, 9, 11, 13, 15, 17, 19, 20, 21, 22, 23, 25, 37, - 42, 43, 53, 77, 79, 87, 95, 101, 102, 103, 104, 109, - 110, 111, 113, 115, 117, 119, 123, 135, 139, 143, - 179, 389, 465, 512, 513, 514, 515, 526, 530, 531, - 532, 540, 556, 563, 587, 601, 636, 993, 995, 2049, - 3659, 4045, 6000, 6665, 6666, 6667, 6668, 6669, 6697] +cronet_restricted_ports = [ + 1, 7, 9, 11, 13, 15, 17, 19, 20, 21, 22, 23, 25, 37, 42, 43, 53, 77, 79, 87, + 95, 101, 102, 103, 104, 109, 110, 111, 113, 115, 117, 119, 123, 135, 139, + 143, 179, 389, 465, 512, 513, 514, 515, 526, 530, 531, 532, 540, 556, 563, + 587, 601, 636, 993, 995, 2049, 3659, 4045, 6000, 6665, 6666, 6667, 6668, + 6669, 6697 +] + def can_connect(port): - # this test is only really useful on unices where SO_REUSE_PORT is available - # so on Windows, where this test is expensive, skip it - if platform.system() == 'Windows': return False - s = socket.socket() - try: - s.connect(('localhost', port)) - return True - except socket.error, e: - return False - finally: - s.close() + # this test is only really useful on unices where SO_REUSE_PORT is available + # so on Windows, where this test is expensive, skip it + if platform.system() == 'Windows': return False + s = socket.socket() + try: + s.connect(('localhost', port)) + return True + except socket.error, e: + return False + finally: + s.close() + def can_bind(port, proto): - s = socket.socket(proto, socket.SOCK_STREAM) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - try: - s.bind(('localhost', port)) - return True - except socket.error, e: - return False - finally: - s.close() + s = socket.socket(proto, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + s.bind(('localhost', port)) + return True + except socket.error, e: + return False + finally: + s.close() def refill_pool(max_timeout, req): - """Scan for ports not marked for being in use""" - chk = [port for port in list(range(1025, 32766)) if port not in cronet_restricted_ports] - random.shuffle(chk) - for i in chk: - if len(pool) > 100: break - if i in in_use: - age = time.time() - in_use[i] - if age < max_timeout: - continue - req.log_message("kill old request %d" % i) - del in_use[i] - if can_bind(i, socket.AF_INET) and can_bind(i, socket.AF_INET6) and not can_connect(i): - req.log_message("found available port %d" % i) - pool.append(i) + """Scan for ports not marked for being in use""" + chk = [ + port for port in list(range(1025, 32766)) + if port not in cronet_restricted_ports + ] + random.shuffle(chk) + for i in chk: + if len(pool) > 100: break + if i in in_use: + age = time.time() - in_use[i] + if age < max_timeout: + continue + req.log_message("kill old request %d" % i) + del in_use[i] + if can_bind(i, socket.AF_INET) and can_bind( + i, socket.AF_INET6) and not can_connect(i): + req.log_message("found available port %d" % i) + pool.append(i) def allocate_port(req): - global pool - global in_use - global mu - mu.acquire() - max_timeout = 600 - while not pool: - refill_pool(max_timeout, req) - if not pool: - req.log_message("failed to find ports: retrying soon") - mu.release() - time.sleep(1) - mu.acquire() - max_timeout /= 2 - port = pool[0] - pool = pool[1:] - in_use[port] = time.time() - mu.release() - return port + global pool + global in_use + global mu + mu.acquire() + max_timeout = 600 + while not pool: + refill_pool(max_timeout, req) + if not pool: + req.log_message("failed to find ports: retrying soon") + mu.release() + time.sleep(1) + mu.acquire() + max_timeout /= 2 + port = pool[0] + pool = pool[1:] + in_use[port] = time.time() + mu.release() + return port keep_running = True @@ -136,61 +139,67 @@ keep_running = True class Handler(BaseHTTPRequestHandler): - def setup(self): - # If the client is unreachable for 5 seconds, close the connection - self.timeout = 5 - BaseHTTPRequestHandler.setup(self) + def setup(self): + # If the client is unreachable for 5 seconds, close the connection + self.timeout = 5 + BaseHTTPRequestHandler.setup(self) + + def do_GET(self): + global keep_running + global mu + if self.path == '/get': + # allocate a new port, it will stay bound for ten minutes and until + # it's unused + self.send_response(200) + self.send_header('Content-Type', 'text/plain') + self.end_headers() + p = allocate_port(self) + self.log_message('allocated port %d' % p) + self.wfile.write('%d' % p) + elif self.path[0:6] == '/drop/': + self.send_response(200) + self.send_header('Content-Type', 'text/plain') + self.end_headers() + p = int(self.path[6:]) + mu.acquire() + if p in in_use: + del in_use[p] + pool.append(p) + k = 'known' + else: + k = 'unknown' + mu.release() + self.log_message('drop %s port %d' % (k, p)) + elif self.path == '/version_number': + # fetch a version string and the current process pid + self.send_response(200) + self.send_header('Content-Type', 'text/plain') + self.end_headers() + self.wfile.write(_MY_VERSION) + elif self.path == '/dump': + # yaml module is not installed on Macs and Windows machines by default + # so we import it lazily (/dump action is only used for debugging) + import yaml + self.send_response(200) + self.send_header('Content-Type', 'text/plain') + self.end_headers() + mu.acquire() + now = time.time() + out = yaml.dump( + { + 'pool': pool, + 'in_use': dict((k, now - v) for k, v in in_use.items()) + }) + mu.release() + self.wfile.write(out) + elif self.path == '/quitquitquit': + self.send_response(200) + self.end_headers() + self.server.shutdown() - def do_GET(self): - global keep_running - global mu - if self.path == '/get': - # allocate a new port, it will stay bound for ten minutes and until - # it's unused - self.send_response(200) - self.send_header('Content-Type', 'text/plain') - self.end_headers() - p = allocate_port(self) - self.log_message('allocated port %d' % p) - self.wfile.write('%d' % p) - elif self.path[0:6] == '/drop/': - self.send_response(200) - self.send_header('Content-Type', 'text/plain') - self.end_headers() - p = int(self.path[6:]) - mu.acquire() - if p in in_use: - del in_use[p] - pool.append(p) - k = 'known' - else: - k = 'unknown' - mu.release() - self.log_message('drop %s port %d' % (k, p)) - elif self.path == '/version_number': - # fetch a version string and the current process pid - self.send_response(200) - self.send_header('Content-Type', 'text/plain') - self.end_headers() - self.wfile.write(_MY_VERSION) - elif self.path == '/dump': - # yaml module is not installed on Macs and Windows machines by default - # so we import it lazily (/dump action is only used for debugging) - import yaml - self.send_response(200) - self.send_header('Content-Type', 'text/plain') - self.end_headers() - mu.acquire() - now = time.time() - out = yaml.dump({'pool': pool, 'in_use': dict((k, now - v) for k, v in in_use.items())}) - mu.release() - self.wfile.write(out) - elif self.path == '/quitquitquit': - self.send_response(200) - self.end_headers() - self.server.shutdown() class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): - """Handle requests in a separate thread""" + """Handle requests in a separate thread""" + ThreadedHTTPServer(('', args.port), Handler).serve_forever() diff --git a/tools/run_tests/python_utils/report_utils.py b/tools/run_tests/python_utils/report_utils.py index a3867808b54..e4fddb8a7da 100644 --- a/tools/run_tests/python_utils/report_utils.py +++ b/tools/run_tests/python_utils/report_utils.py @@ -11,17 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """Generate XML and HTML test reports.""" from __future__ import print_function try: - from mako.runtime import Context - from mako.template import Template - from mako import exceptions + from mako.runtime import Context + from mako.template import Template + from mako import exceptions except (ImportError): - pass # Mako not installed but it is ok. + pass # Mako not installed but it is ok. import datetime import os import string @@ -30,111 +29,127 @@ import six def _filter_msg(msg, output_format): - """Filters out nonprintable and illegal characters from the message.""" - if output_format in ['XML', 'HTML']: - # keep whitespaces but remove formfeed and vertical tab characters - # that make XML report unparseable. - filtered_msg = filter( - lambda x: x in string.printable and x != '\f' and x != '\v', - msg.decode('UTF-8', 'ignore')) - if output_format == 'HTML': - filtered_msg = filtered_msg.replace('"', '"') - return filtered_msg - else: - return msg + """Filters out nonprintable and illegal characters from the message.""" + if output_format in ['XML', 'HTML']: + # keep whitespaces but remove formfeed and vertical tab characters + # that make XML report unparseable. + filtered_msg = filter( + lambda x: x in string.printable and x != '\f' and x != '\v', + msg.decode('UTF-8', 'ignore')) + if output_format == 'HTML': + filtered_msg = filtered_msg.replace('"', '"') + return filtered_msg + else: + return msg def new_junit_xml_tree(): - return ET.ElementTree(ET.Element('testsuites')) + return ET.ElementTree(ET.Element('testsuites')) + -def render_junit_xml_report(resultset, report_file, suite_package='grpc', +def render_junit_xml_report(resultset, + report_file, + suite_package='grpc', suite_name='tests'): - """Generate JUnit-like XML report.""" - tree = new_junit_xml_tree() - append_junit_xml_results(tree, resultset, suite_package, suite_name, '1') - create_xml_report_file(tree, report_file) + """Generate JUnit-like XML report.""" + tree = new_junit_xml_tree() + append_junit_xml_results(tree, resultset, suite_package, suite_name, '1') + create_xml_report_file(tree, report_file) + def create_xml_report_file(tree, report_file): - """Generate JUnit-like report file from xml tree .""" - # ensure the report directory exists - report_dir = os.path.dirname(os.path.abspath(report_file)) - if not os.path.exists(report_dir): - os.makedirs(report_dir) - tree.write(report_file, encoding='UTF-8') + """Generate JUnit-like report file from xml tree .""" + # ensure the report directory exists + report_dir = os.path.dirname(os.path.abspath(report_file)) + if not os.path.exists(report_dir): + os.makedirs(report_dir) + tree.write(report_file, encoding='UTF-8') + def append_junit_xml_results(tree, resultset, suite_package, suite_name, id): - """Append a JUnit-like XML report tree with test results as a new suite.""" - testsuite = ET.SubElement(tree.getroot(), 'testsuite', - id=id, package=suite_package, name=suite_name, - timestamp=datetime.datetime.now().isoformat()) - failure_count = 0 - error_count = 0 - for shortname, results in six.iteritems(resultset): - for result in results: - xml_test = ET.SubElement(testsuite, 'testcase', name=shortname) - if result.elapsed_time: - xml_test.set('time', str(result.elapsed_time)) - filtered_msg = _filter_msg(result.message, 'XML') - if result.state == 'FAILED': - ET.SubElement(xml_test, 'failure', message='Failure').text = filtered_msg - failure_count += 1 - elif result.state == 'TIMEOUT': - ET.SubElement(xml_test, 'error', message='Timeout').text = filtered_msg - error_count += 1 - elif result.state == 'SKIPPED': - ET.SubElement(xml_test, 'skipped', message='Skipped') - testsuite.set('failures', str(failure_count)) - testsuite.set('errors', str(error_count)) - -def render_interop_html_report( - client_langs, server_langs, test_cases, auth_test_cases, http2_cases, - http2_server_cases, resultset, - num_failures, cloud_to_prod, prod_servers, http2_interop): - """Generate HTML report for interop tests.""" - template_file = 'tools/run_tests/interop/interop_html_report.template' - try: - mytemplate = Template(filename=template_file, format_exceptions=True) - except NameError: - print('Mako template is not installed. Skipping HTML report generation.') - return - except IOError as e: - print('Failed to find the template %s: %s' % (template_file, e)) - return - - sorted_test_cases = sorted(test_cases) - sorted_auth_test_cases = sorted(auth_test_cases) - sorted_http2_cases = sorted(http2_cases) - sorted_http2_server_cases = sorted(http2_server_cases) - sorted_client_langs = sorted(client_langs) - sorted_server_langs = sorted(server_langs) - sorted_prod_servers = sorted(prod_servers) - - args = {'client_langs': sorted_client_langs, - 'server_langs': sorted_server_langs, - 'test_cases': sorted_test_cases, - 'auth_test_cases': sorted_auth_test_cases, - 'http2_cases': sorted_http2_cases, - 'http2_server_cases': sorted_http2_server_cases, - 'resultset': resultset, - 'num_failures': num_failures, - 'cloud_to_prod': cloud_to_prod, - 'prod_servers': sorted_prod_servers, - 'http2_interop': http2_interop} - - html_report_out_dir = 'reports' - if not os.path.exists(html_report_out_dir): - os.mkdir(html_report_out_dir) - html_file_path = os.path.join(html_report_out_dir, 'index.html') - try: - with open(html_file_path, 'w') as output_file: - mytemplate.render_context(Context(output_file, **args)) - except: - print(exceptions.text_error_template().render()) - raise + """Append a JUnit-like XML report tree with test results as a new suite.""" + testsuite = ET.SubElement( + tree.getroot(), + 'testsuite', + id=id, + package=suite_package, + name=suite_name, + timestamp=datetime.datetime.now().isoformat()) + failure_count = 0 + error_count = 0 + for shortname, results in six.iteritems(resultset): + for result in results: + xml_test = ET.SubElement(testsuite, 'testcase', name=shortname) + if result.elapsed_time: + xml_test.set('time', str(result.elapsed_time)) + filtered_msg = _filter_msg(result.message, 'XML') + if result.state == 'FAILED': + ET.SubElement( + xml_test, 'failure', message='Failure').text = filtered_msg + failure_count += 1 + elif result.state == 'TIMEOUT': + ET.SubElement( + xml_test, 'error', message='Timeout').text = filtered_msg + error_count += 1 + elif result.state == 'SKIPPED': + ET.SubElement(xml_test, 'skipped', message='Skipped') + testsuite.set('failures', str(failure_count)) + testsuite.set('errors', str(error_count)) + + +def render_interop_html_report(client_langs, server_langs, test_cases, + auth_test_cases, http2_cases, http2_server_cases, + resultset, num_failures, cloud_to_prod, + prod_servers, http2_interop): + """Generate HTML report for interop tests.""" + template_file = 'tools/run_tests/interop/interop_html_report.template' + try: + mytemplate = Template(filename=template_file, format_exceptions=True) + except NameError: + print( + 'Mako template is not installed. Skipping HTML report generation.') + return + except IOError as e: + print('Failed to find the template %s: %s' % (template_file, e)) + return + + sorted_test_cases = sorted(test_cases) + sorted_auth_test_cases = sorted(auth_test_cases) + sorted_http2_cases = sorted(http2_cases) + sorted_http2_server_cases = sorted(http2_server_cases) + sorted_client_langs = sorted(client_langs) + sorted_server_langs = sorted(server_langs) + sorted_prod_servers = sorted(prod_servers) + + args = { + 'client_langs': sorted_client_langs, + 'server_langs': sorted_server_langs, + 'test_cases': sorted_test_cases, + 'auth_test_cases': sorted_auth_test_cases, + 'http2_cases': sorted_http2_cases, + 'http2_server_cases': sorted_http2_server_cases, + 'resultset': resultset, + 'num_failures': num_failures, + 'cloud_to_prod': cloud_to_prod, + 'prod_servers': sorted_prod_servers, + 'http2_interop': http2_interop + } + + html_report_out_dir = 'reports' + if not os.path.exists(html_report_out_dir): + os.mkdir(html_report_out_dir) + html_file_path = os.path.join(html_report_out_dir, 'index.html') + try: + with open(html_file_path, 'w') as output_file: + mytemplate.render_context(Context(output_file, **args)) + except: + print(exceptions.text_error_template().render()) + raise + def render_perf_profiling_results(output_filepath, profile_names): - with open(output_filepath, 'w') as output_file: - output_file.write('\n') + with open(output_filepath, 'w') as output_file: + output_file.write('\n') diff --git a/tools/run_tests/python_utils/start_port_server.py b/tools/run_tests/python_utils/start_port_server.py index 786103ccdfa..5572cdcfe7c 100644 --- a/tools/run_tests/python_utils/start_port_server.py +++ b/tools/run_tests/python_utils/start_port_server.py @@ -22,10 +22,10 @@ import sys import tempfile import time - # must be synchronized with test/core/utils/port_server_client.h _PORT_SERVER_PORT = 32766 + def start_port_server(): # check if a compatible port server is running # if incompatible (version mismatch) ==> start a new one @@ -33,9 +33,8 @@ def start_port_server(): # otherwise, leave it up try: version = int( - urllib.urlopen( - 'http://localhost:%d/version_number' % - _PORT_SERVER_PORT).read()) + urllib.urlopen('http://localhost:%d/version_number' % + _PORT_SERVER_PORT).read()) logging.info('detected port server running version %d', version) running = True except Exception as e: @@ -92,8 +91,8 @@ def start_port_server(): # try one final time: maybe another build managed to start one time.sleep(1) try: - urllib.urlopen( - 'http://localhost:%d/get' % _PORT_SERVER_PORT).read() + urllib.urlopen('http://localhost:%d/get' % + _PORT_SERVER_PORT).read() logging.info( 'last ditch attempt to contact port server succeeded') break diff --git a/tools/run_tests/python_utils/upload_test_results.py b/tools/run_tests/python_utils/upload_test_results.py index ea97bc0aec1..9eb8e2a8620 100644 --- a/tools/run_tests/python_utils/upload_test_results.py +++ b/tools/run_tests/python_utils/upload_test_results.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """Helper to upload Jenkins test results to BQ""" from __future__ import print_function @@ -23,8 +22,8 @@ import sys import time import uuid -gcp_utils_dir = os.path.abspath(os.path.join( - os.path.dirname(__file__), '../../gcp/utils')) +gcp_utils_dir = os.path.abspath( + os.path.join(os.path.dirname(__file__), '../../gcp/utils')) sys.path.append(gcp_utils_dir) import big_query_utils @@ -35,55 +34,57 @@ _EXPIRATION_MS = 90 * 24 * 60 * 60 * 1000 _PARTITION_TYPE = 'DAY' _PROJECT_ID = 'grpc-testing' _RESULTS_SCHEMA = [ - ('job_name', 'STRING', 'Name of Jenkins job'), - ('build_id', 'INTEGER', 'Build ID of Jenkins job'), - ('build_url', 'STRING', 'URL of Jenkins job'), - ('test_name', 'STRING', 'Individual test name'), - ('language', 'STRING', 'Language of test'), - ('platform', 'STRING', 'Platform used for test'), - ('config', 'STRING', 'Config used for test'), - ('compiler', 'STRING', 'Compiler used for test'), - ('iomgr_platform', 'STRING', 'Iomgr used for test'), - ('result', 'STRING', 'Test result: PASSED, TIMEOUT, FAILED, or SKIPPED'), - ('timestamp', 'TIMESTAMP', 'Timestamp of test run'), - ('elapsed_time', 'FLOAT', 'How long test took to run'), - ('cpu_estimated', 'FLOAT', 'Estimated CPU usage of test'), - ('cpu_measured', 'FLOAT', 'Actual CPU usage of test'), - ('return_code', 'INTEGER', 'Exit code of test'), + ('job_name', 'STRING', 'Name of Jenkins job'), + ('build_id', 'INTEGER', 'Build ID of Jenkins job'), + ('build_url', 'STRING', 'URL of Jenkins job'), + ('test_name', 'STRING', 'Individual test name'), + ('language', 'STRING', 'Language of test'), + ('platform', 'STRING', 'Platform used for test'), + ('config', 'STRING', 'Config used for test'), + ('compiler', 'STRING', 'Compiler used for test'), + ('iomgr_platform', 'STRING', 'Iomgr used for test'), + ('result', 'STRING', 'Test result: PASSED, TIMEOUT, FAILED, or SKIPPED'), + ('timestamp', 'TIMESTAMP', 'Timestamp of test run'), + ('elapsed_time', 'FLOAT', 'How long test took to run'), + ('cpu_estimated', 'FLOAT', 'Estimated CPU usage of test'), + ('cpu_measured', 'FLOAT', 'Actual CPU usage of test'), + ('return_code', 'INTEGER', 'Exit code of test'), ] _INTEROP_RESULTS_SCHEMA = [ - ('job_name', 'STRING', 'Name of Jenkins/Kokoro job'), - ('build_id', 'INTEGER', 'Build ID of Jenkins/Kokoro job'), - ('build_url', 'STRING', 'URL of Jenkins/Kokoro job'), - ('test_name', 'STRING', 'Unique test name combining client, server, and test_name'), - ('suite', 'STRING', 'Test suite: cloud_to_cloud, cloud_to_prod, or cloud_to_prod_auth'), - ('client', 'STRING', 'Client language'), - ('server', 'STRING', 'Server host name'), - ('test_case', 'STRING', 'Name of test case'), - ('result', 'STRING', 'Test result: PASSED, TIMEOUT, FAILED, or SKIPPED'), - ('timestamp', 'TIMESTAMP', 'Timestamp of test run'), - ('elapsed_time', 'FLOAT', 'How long test took to run'), + ('job_name', 'STRING', 'Name of Jenkins/Kokoro job'), + ('build_id', 'INTEGER', 'Build ID of Jenkins/Kokoro job'), + ('build_url', 'STRING', 'URL of Jenkins/Kokoro job'), + ('test_name', 'STRING', + 'Unique test name combining client, server, and test_name'), + ('suite', 'STRING', + 'Test suite: cloud_to_cloud, cloud_to_prod, or cloud_to_prod_auth'), + ('client', 'STRING', 'Client language'), + ('server', 'STRING', 'Server host name'), + ('test_case', 'STRING', 'Name of test case'), + ('result', 'STRING', 'Test result: PASSED, TIMEOUT, FAILED, or SKIPPED'), + ('timestamp', 'TIMESTAMP', 'Timestamp of test run'), + ('elapsed_time', 'FLOAT', 'How long test took to run'), ] def _get_build_metadata(test_results): - """Add Jenkins/Kokoro build metadata to test_results based on environment + """Add Jenkins/Kokoro build metadata to test_results based on environment variables set by Jenkins/Kokoro. """ - build_id = os.getenv('BUILD_ID') or os.getenv('KOKORO_BUILD_NUMBER') - build_url = os.getenv('BUILD_URL') or os.getenv('KOKORO_BUILD_URL') - job_name = os.getenv('JOB_BASE_NAME') or os.getenv('KOKORO_JOB_NAME') + build_id = os.getenv('BUILD_ID') or os.getenv('KOKORO_BUILD_NUMBER') + build_url = os.getenv('BUILD_URL') or os.getenv('KOKORO_BUILD_URL') + job_name = os.getenv('JOB_BASE_NAME') or os.getenv('KOKORO_JOB_NAME') - if build_id: - test_results['build_id'] = build_id - if build_url: - test_results['build_url'] = build_url - if job_name: - test_results['job_name'] = job_name + if build_id: + test_results['build_id'] = build_id + if build_url: + test_results['build_url'] = build_url + if job_name: + test_results['job_name'] = job_name def upload_results_to_bq(resultset, bq_table, args, platform): - """Upload test results to a BQ table. + """Upload test results to a BQ table. Args: resultset: dictionary generated by jobset.run @@ -91,77 +92,97 @@ def upload_results_to_bq(resultset, bq_table, args, platform): args: args in run_tests.py, generated by argparse platform: string name of platform tests were run on """ - bq = big_query_utils.create_big_query() - big_query_utils.create_partitioned_table(bq, _PROJECT_ID, _DATASET_ID, bq_table, _RESULTS_SCHEMA, _DESCRIPTION, - partition_type=_PARTITION_TYPE, expiration_ms= _EXPIRATION_MS) - - for shortname, results in six.iteritems(resultset): - for result in results: - test_results = {} - _get_build_metadata(test_results) - test_results['compiler'] = args.compiler - test_results['config'] = args.config - test_results['cpu_estimated'] = result.cpu_estimated - test_results['cpu_measured'] = result.cpu_measured - test_results['elapsed_time'] = '%.2f' % result.elapsed_time - test_results['iomgr_platform'] = args.iomgr_platform - # args.language is a list, but will always have one element in the contexts - # this function is used. - test_results['language'] = args.language[0] - test_results['platform'] = platform - test_results['result'] = result.state - test_results['return_code'] = result.returncode - test_results['test_name'] = shortname - test_results['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S') - - row = big_query_utils.make_row(str(uuid.uuid4()), test_results) - - # TODO(jtattermusch): rows are inserted one by one, very inefficient - max_retries = 3 - for attempt in range(max_retries): - if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID, bq_table, [row]): - break - else: - if attempt < max_retries - 1: - print('Error uploading result to bigquery, will retry.') - else: - print('Error uploading result to bigquery, all attempts failed.') - sys.exit(1) + bq = big_query_utils.create_big_query() + big_query_utils.create_partitioned_table( + bq, + _PROJECT_ID, + _DATASET_ID, + bq_table, + _RESULTS_SCHEMA, + _DESCRIPTION, + partition_type=_PARTITION_TYPE, + expiration_ms=_EXPIRATION_MS) + + for shortname, results in six.iteritems(resultset): + for result in results: + test_results = {} + _get_build_metadata(test_results) + test_results['compiler'] = args.compiler + test_results['config'] = args.config + test_results['cpu_estimated'] = result.cpu_estimated + test_results['cpu_measured'] = result.cpu_measured + test_results['elapsed_time'] = '%.2f' % result.elapsed_time + test_results['iomgr_platform'] = args.iomgr_platform + # args.language is a list, but will always have one element in the contexts + # this function is used. + test_results['language'] = args.language[0] + test_results['platform'] = platform + test_results['result'] = result.state + test_results['return_code'] = result.returncode + test_results['test_name'] = shortname + test_results['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S') + + row = big_query_utils.make_row(str(uuid.uuid4()), test_results) + + # TODO(jtattermusch): rows are inserted one by one, very inefficient + max_retries = 3 + for attempt in range(max_retries): + if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID, + bq_table, [row]): + break + else: + if attempt < max_retries - 1: + print('Error uploading result to bigquery, will retry.') + else: + print( + 'Error uploading result to bigquery, all attempts failed.' + ) + sys.exit(1) def upload_interop_results_to_bq(resultset, bq_table, args): - """Upload interop test results to a BQ table. + """Upload interop test results to a BQ table. Args: resultset: dictionary generated by jobset.run bq_table: string name of table to create/upload results to in BQ args: args in run_interop_tests.py, generated by argparse """ - bq = big_query_utils.create_big_query() - big_query_utils.create_partitioned_table(bq, _PROJECT_ID, _DATASET_ID, bq_table, _INTEROP_RESULTS_SCHEMA, _DESCRIPTION, - partition_type=_PARTITION_TYPE, expiration_ms= _EXPIRATION_MS) - - for shortname, results in six.iteritems(resultset): - for result in results: - test_results = {} - _get_build_metadata(test_results) - test_results['elapsed_time'] = '%.2f' % result.elapsed_time - test_results['result'] = result.state - test_results['test_name'] = shortname - test_results['suite'] = shortname.split(':')[0] - test_results['client'] = shortname.split(':')[1] - test_results['server'] = shortname.split(':')[2] - test_results['test_case'] = shortname.split(':')[3] - test_results['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S') - row = big_query_utils.make_row(str(uuid.uuid4()), test_results) - # TODO(jtattermusch): rows are inserted one by one, very inefficient - max_retries = 3 - for attempt in range(max_retries): - if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID, bq_table, [row]): - break - else: - if attempt < max_retries - 1: - print('Error uploading result to bigquery, will retry.') - else: - print('Error uploading result to bigquery, all attempts failed.') - sys.exit(1) + bq = big_query_utils.create_big_query() + big_query_utils.create_partitioned_table( + bq, + _PROJECT_ID, + _DATASET_ID, + bq_table, + _INTEROP_RESULTS_SCHEMA, + _DESCRIPTION, + partition_type=_PARTITION_TYPE, + expiration_ms=_EXPIRATION_MS) + + for shortname, results in six.iteritems(resultset): + for result in results: + test_results = {} + _get_build_metadata(test_results) + test_results['elapsed_time'] = '%.2f' % result.elapsed_time + test_results['result'] = result.state + test_results['test_name'] = shortname + test_results['suite'] = shortname.split(':')[0] + test_results['client'] = shortname.split(':')[1] + test_results['server'] = shortname.split(':')[2] + test_results['test_case'] = shortname.split(':')[3] + test_results['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S') + row = big_query_utils.make_row(str(uuid.uuid4()), test_results) + # TODO(jtattermusch): rows are inserted one by one, very inefficient + max_retries = 3 + for attempt in range(max_retries): + if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID, + bq_table, [row]): + break + else: + if attempt < max_retries - 1: + print('Error uploading result to bigquery, will retry.') + else: + print( + 'Error uploading result to bigquery, all attempts failed.' + ) + sys.exit(1) diff --git a/tools/run_tests/python_utils/watch_dirs.py b/tools/run_tests/python_utils/watch_dirs.py index 7bd085efaf4..d2ad303a07b 100755 --- a/tools/run_tests/python_utils/watch_dirs.py +++ b/tools/run_tests/python_utils/watch_dirs.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """Helper to watch a (set) of directories for modifications.""" import os @@ -19,42 +18,42 @@ import time class DirWatcher(object): - """Helper to watch a (set) of directories for modifications.""" - - def __init__(self, paths): - if isinstance(paths, basestring): - paths = [paths] - self._done = False - self.paths = list(paths) - self.lastrun = time.time() - self._cache = self._calculate() - - def _calculate(self): - """Walk over all subscribed paths, check most recent mtime.""" - most_recent_change = None - for path in self.paths: - if not os.path.exists(path): - continue - if not os.path.isdir(path): - continue - for root, _, files in os.walk(path): - for f in files: - if f and f[0] == '.': continue - try: - st = os.stat(os.path.join(root, f)) - except OSError as e: - if e.errno == os.errno.ENOENT: - continue - raise - if most_recent_change is None: - most_recent_change = st.st_mtime - else: - most_recent_change = max(most_recent_change, st.st_mtime) - return most_recent_change - - def most_recent_change(self): - if time.time() - self.lastrun > 1: - self._cache = self._calculate() - self.lastrun = time.time() - return self._cache - + """Helper to watch a (set) of directories for modifications.""" + + def __init__(self, paths): + if isinstance(paths, basestring): + paths = [paths] + self._done = False + self.paths = list(paths) + self.lastrun = time.time() + self._cache = self._calculate() + + def _calculate(self): + """Walk over all subscribed paths, check most recent mtime.""" + most_recent_change = None + for path in self.paths: + if not os.path.exists(path): + continue + if not os.path.isdir(path): + continue + for root, _, files in os.walk(path): + for f in files: + if f and f[0] == '.': continue + try: + st = os.stat(os.path.join(root, f)) + except OSError as e: + if e.errno == os.errno.ENOENT: + continue + raise + if most_recent_change is None: + most_recent_change = st.st_mtime + else: + most_recent_change = max(most_recent_change, + st.st_mtime) + return most_recent_change + + def most_recent_change(self): + if time.time() - self.lastrun > 1: + self._cache = self._calculate() + self.lastrun = time.time() + return self._cache