From 1b38bb4bc745cf9f7ee18728d96698d3652dad9d Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Mon, 14 Dec 2015 17:22:38 -0800 Subject: [PATCH] initial version of run stress tests script --- .../jenkins/grpc_interop_cxx/build_interop.sh | 2 +- tools/run_tests/run_stress_tests.py | 378 ++++++++++++++++++ 2 files changed, 379 insertions(+), 1 deletion(-) create mode 100755 tools/run_tests/run_stress_tests.py diff --git a/tools/jenkins/grpc_interop_cxx/build_interop.sh b/tools/jenkins/grpc_interop_cxx/build_interop.sh index 1c0828d23a6..3634b7123f3 100755 --- a/tools/jenkins/grpc_interop_cxx/build_interop.sh +++ b/tools/jenkins/grpc_interop_cxx/build_interop.sh @@ -42,4 +42,4 @@ cd /var/local/git/grpc make install-certs # build C++ interop client & server -make interop_client interop_server +make interop_client interop_server stress_test diff --git a/tools/run_tests/run_stress_tests.py b/tools/run_tests/run_stress_tests.py new file mode 100755 index 00000000000..529dc4a48de --- /dev/null +++ b/tools/run_tests/run_stress_tests.py @@ -0,0 +1,378 @@ +#!/usr/bin/env python +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +"""Run stress test in C++""" + +import argparse +import atexit +import dockerjob +import itertools +import jobset +import json +import multiprocessing +import os +import re +import report_utils +import subprocess +import sys +import tempfile +import time +import uuid + +# Docker doesn't clean up after itself, so we do it on exit. +atexit.register(lambda: subprocess.call(['stty', 'echo'])) + +ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..')) +os.chdir(ROOT) + +_DEFAULT_SERVER_PORT = 8080 +_DEFAULT_METRICS_PORT = 8081 +_DEFAULT_TEST_CASES = 'empty_unary:20,large_unary:20,client_streaming:20,server_streaming:20,empty_stream:20' +_DEFAULT_NUM_CHANNELS_PER_SERVER = 5 +_DEFAULT_NUM_STUBS_PER_CHANNEL = 10 + +# 15 mins default +#_DEFAULT_TEST_DURATION_SECS = 900 +_DEFAULT_TEST_DURATION_SECS = 10 + +class CXXLanguage: + + def __init__(self): + self.client_cwd = None + self.server_cwd = None + self.safename = 'cxx' + + def client_cmd(self, args): + return ['bins/opt/stress_test'] + args + + def server_cmd(self, args): + return ['bins/opt/interop_server'] + args + + def global_env(self): + return {} + + def __str__(self): + return 'c++' + + +_LANGUAGES = {'c++': CXXLanguage(),} + +# languages supported as cloud_to_cloud servers +_SERVERS = ['c++'] + +DOCKER_WORKDIR_ROOT = '/var/local/git/grpc' + + +def docker_run_cmdline(cmdline, image, docker_args=[], cwd=None, environ=None): + """Wraps given cmdline array to create 'docker run' cmdline from it.""" + docker_cmdline = ['docker', 'run', '-i', '--rm=true'] + + # turn environ into -e docker args + if environ: + for k, v in environ.iteritems(): + docker_cmdline += ['-e', '%s=%s' % (k, v)] + + # set working directory + workdir = DOCKER_WORKDIR_ROOT + if cwd: + workdir = os.path.join(workdir, cwd) + docker_cmdline += ['-w', workdir] + + docker_cmdline += docker_args + [image] + cmdline + return docker_cmdline + + +def bash_login_cmdline(cmdline): + """Creates bash -l -c cmdline from args list.""" + # Use login shell: + # * rvm and nvm require it + # * makes error messages clearer if executables are missing + return ['bash', '-l', '-c', ' '.join(cmdline)] + + +def _job_kill_handler(job): + if job._spec.container_name: + dockerjob.docker_kill(job._spec.container_name) + # When the job times out and we decide to kill it, + # we need to wait a before restarting the job + # to prevent "container name already in use" error. + # TODO(jtattermusch): figure out a cleaner way to to this. + time.sleep(2) + + +def cloud_to_cloud_jobspec(language, + test_cases, + server_addresses, + test_duration_secs, + num_channels_per_server, + num_stubs_per_channel, + metrics_port, + docker_image=None): + """Creates jobspec for cloud-to-cloud interop test""" + cmdline = bash_login_cmdline(language.client_cmd([ + '--test_cases=%s' % test_cases, '--server_addresses=%s' % + server_addresses, '--test_duration_secs=%s' % test_duration_secs, + '--num_stubs_per_channel=%s' % num_stubs_per_channel, + '--num_channels_per_server=%s' % num_channels_per_server, + '--metrics_port=%s' % metrics_port + ])) + print cmdline + cwd = language.client_cwd + environ = language.global_env() + if docker_image: + container_name = dockerjob.random_name('interop_client_%s' % + language.safename) + cmdline = docker_run_cmdline( + cmdline, + image=docker_image, + environ=environ, + cwd=cwd, + docker_args=['--net=host', '--name', container_name]) + cwd = None + + test_job = jobset.JobSpec(cmdline=cmdline, + cwd=cwd, + environ=environ, + shortname='cloud_to_cloud:%s:%s_server:stress_test' % ( + language, server_name), + timeout_seconds=test_duration_secs * 2, + flake_retries=5 if args.allow_flakes else 0, + timeout_retries=2 if args.allow_flakes else 0, + kill_handler=_job_kill_handler) + test_job.container_name = container_name + return test_job + + +def server_jobspec(language, docker_image, test_duration_secs): + """Create jobspec for running a server""" + container_name = dockerjob.random_name('interop_server_%s' % + language.safename) + cmdline = bash_login_cmdline(language.server_cmd(['--port=%s' % + _DEFAULT_SERVER_PORT])) + environ = language.global_env() + docker_cmdline = docker_run_cmdline( + cmdline, + image=docker_image, + cwd=language.server_cwd, + environ=environ, + docker_args=['-p', str(_DEFAULT_SERVER_PORT), '--name', container_name]) + + server_job = jobset.JobSpec(cmdline=docker_cmdline, + environ=environ, + shortname='interop_server_%s' % language, + timeout_seconds=test_duration_secs * 3) + server_job.container_name = container_name + return server_job + + +def build_interop_image_jobspec(language, tag=None): + """Creates jobspec for building stress test docker image for a language""" + if not tag: + tag = 'grpc_interop_%s:%s' % (language.safename, uuid.uuid4()) + env = {'INTEROP_IMAGE': tag, + 'BASE_NAME': 'grpc_interop_%s' % language.safename} + env['TTY_FLAG'] = '-t' + build_job = jobset.JobSpec(cmdline=['tools/jenkins/build_interop_image.sh'], + environ=env, + shortname='build_docker_%s' % (language), + timeout_seconds=30 * 60) + build_job.tag = tag + return build_job + + +def aggregate_http2_results(stdout): + match = re.search(r'\{"cases[^\]]*\]\}', stdout) + if not match: + return None + + results = json.loads(match.group(0)) + skipped = 0 + passed = 0 + failed = 0 + failed_cases = [] + for case in results['cases']: + if case.get('skipped', False): + skipped += 1 + else: + if case.get('passed', False): + passed += 1 + else: + failed += 1 + failed_cases.append(case.get('name', 'NONAME')) + return { + 'passed': passed, + 'failed': failed, + 'skipped': skipped, + 'failed_cases': ', '.join(failed_cases), + 'percent': 1.0 * passed / (passed + failed) + } + + +argp = argparse.ArgumentParser(description='Run stress tests.') +argp.add_argument('-l', + '--language', + choices=['all'] + sorted(_LANGUAGES), + nargs='+', + default=['all'], + help='Clients to run.') +argp.add_argument('-j', '--jobs', default=multiprocessing.cpu_count(), type=int) +argp.add_argument( + '-s', + '--server', + choices=['all'] + sorted(_SERVERS), + action='append', + help='Run cloud_to_cloud servers in a separate docker ' + 'image.', + default=[]) +argp.add_argument( + '--override_server', + action='append', + type=lambda kv: kv.split('='), + help= + 'Use servername=HOST:PORT to explicitly specify a server. E.g. ' + 'csharp=localhost:50000', + default=[]) +argp.add_argument('--test_duration_secs', + action='append', + help='The duration of the test in seconds', + default=[_DEFAULT_TEST_DURATION_SECS]) +argp.add_argument( + '--allow_flakes', + default=False, + action='store_const', + const=True, + help= + 'Allow flaky tests to show as passing (re-runs failed tests up to five times)') + +args = argp.parse_args() + +servers = set( + s + for s in itertools.chain.from_iterable(_SERVERS if x == 'all' else [x] + for x in args.server)) + +languages = set(_LANGUAGES[l] + for l in itertools.chain.from_iterable(_LANGUAGES.iterkeys( + ) if x == 'all' else [x] for x in args.language)) + +docker_images = {} +# languages for which to build docker images +languages_to_build = set( + _LANGUAGES[k] + for k in set([str(l) for l in languages] + [s for s in servers])) +build_jobs = [] +for l in languages_to_build: + job = build_interop_image_jobspec(l) + docker_images[str(l)] = job.tag + build_jobs.append(job) + +if build_jobs: + jobset.message('START', 'Building interop docker images.', do_newline=True) + num_failures, _ = jobset.run(build_jobs, + newline_on_success=True, + maxjobs=args.jobs) + if num_failures == 0: + jobset.message('SUCCESS', + 'All docker images built successfully.', + do_newline=True) + else: + jobset.message('FAILED', + 'Failed to build interop docker images.', + do_newline=True) + for image in docker_images.itervalues(): + dockerjob.remove_image(image, skip_nonexistent=True) + sys.exit(1) + +# Start interop servers. +server_jobs = {} +server_addresses = {} +try: + for s in servers: + lang = str(s) + spec = server_jobspec(_LANGUAGES[lang], docker_images.get(lang), _DEFAULT_TEST_DURATION_SECS) + job = dockerjob.DockerJob(spec) + server_jobs[lang] = job + server_addresses[lang] = ('localhost', + job.mapped_port(_DEFAULT_SERVER_PORT)) + + jobs = [] + + for server in args.override_server: + server_name = server[0] + (server_host, server_port) = server[1].split(':') + server_addresses[server_name] = (server_host, server_port) + + for server_name, server_address in server_addresses.iteritems(): + (server_host, server_port) = server_address + for language in languages: + test_job = cloud_to_cloud_jobspec( + language, + _DEFAULT_TEST_CASES, + ('%s:%s' % (server_host, server_port)), + _DEFAULT_TEST_DURATION_SECS, + _DEFAULT_NUM_CHANNELS_PER_SERVER, + _DEFAULT_NUM_STUBS_PER_CHANNEL, + _DEFAULT_METRICS_PORT, + docker_image=docker_images.get(str(language))) + jobs.append(test_job) + + if not jobs: + print 'No jobs to run.' + for image in docker_images.itervalues(): + dockerjob.remove_image(image, skip_nonexistent=True) + sys.exit(1) + + num_failures, resultset = jobset.run(jobs, + newline_on_success=True, + maxjobs=args.jobs) + if num_failures: + jobset.message('FAILED', 'Some tests failed', do_newline=True) + else: + jobset.message('SUCCESS', 'All tests passed', do_newline=True) + + report_utils.render_junit_xml_report(resultset, 'report.xml') + + for name, job in resultset.iteritems(): + if "http2" in name: + job[0].http2results = aggregate_http2_results(job[0].message) + + report_utils.render_interop_html_report( + set([str(l) for l in languages]), servers, [], [], [], resultset, + num_failures, 0, 0) + +finally: + # Check if servers are still running. + for server, job in server_jobs.iteritems(): + if not job.is_running(): + print 'Server "%s" has exited prematurely.' % server + + dockerjob.finish_jobs([j for j in server_jobs.itervalues()]) + + for image in docker_images.itervalues(): + print 'Removing docker image %s' % image + dockerjob.remove_image(image)