mirror of https://github.com/grpc/grpc.git
parent
062ad48888
commit
1b38bb4bc7
2 changed files with 379 additions and 1 deletions
@ -0,0 +1,378 @@ |
||||
#!/usr/bin/env python |
||||
# Copyright 2015, Google Inc. |
||||
# All rights reserved. |
||||
# |
||||
# Redistribution and use in source and binary forms, with or without |
||||
# modification, are permitted provided that the following conditions are |
||||
# met: |
||||
# |
||||
# * Redistributions of source code must retain the above copyright |
||||
# notice, this list of conditions and the following disclaimer. |
||||
# * Redistributions in binary form must reproduce the above |
||||
# copyright notice, this list of conditions and the following disclaimer |
||||
# in the documentation and/or other materials provided with the |
||||
# distribution. |
||||
# * Neither the name of Google Inc. nor the names of its |
||||
# contributors may be used to endorse or promote products derived from |
||||
# this software without specific prior written permission. |
||||
# |
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
"""Run stress test in C++""" |
||||
|
||||
import argparse |
||||
import atexit |
||||
import dockerjob |
||||
import itertools |
||||
import jobset |
||||
import json |
||||
import multiprocessing |
||||
import os |
||||
import re |
||||
import report_utils |
||||
import subprocess |
||||
import sys |
||||
import tempfile |
||||
import time |
||||
import uuid |
||||
|
||||
# Docker doesn't clean up after itself, so we do it on exit. |
||||
atexit.register(lambda: subprocess.call(['stty', 'echo'])) |
||||
|
||||
ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..')) |
||||
os.chdir(ROOT) |
||||
|
||||
_DEFAULT_SERVER_PORT = 8080 |
||||
_DEFAULT_METRICS_PORT = 8081 |
||||
_DEFAULT_TEST_CASES = 'empty_unary:20,large_unary:20,client_streaming:20,server_streaming:20,empty_stream:20' |
||||
_DEFAULT_NUM_CHANNELS_PER_SERVER = 5 |
||||
_DEFAULT_NUM_STUBS_PER_CHANNEL = 10 |
||||
|
||||
# 15 mins default |
||||
#_DEFAULT_TEST_DURATION_SECS = 900 |
||||
_DEFAULT_TEST_DURATION_SECS = 10 |
||||
|
||||
class CXXLanguage: |
||||
|
||||
def __init__(self): |
||||
self.client_cwd = None |
||||
self.server_cwd = None |
||||
self.safename = 'cxx' |
||||
|
||||
def client_cmd(self, args): |
||||
return ['bins/opt/stress_test'] + args |
||||
|
||||
def server_cmd(self, args): |
||||
return ['bins/opt/interop_server'] + args |
||||
|
||||
def global_env(self): |
||||
return {} |
||||
|
||||
def __str__(self): |
||||
return 'c++' |
||||
|
||||
|
||||
_LANGUAGES = {'c++': CXXLanguage(),} |
||||
|
||||
# languages supported as cloud_to_cloud servers |
||||
_SERVERS = ['c++'] |
||||
|
||||
DOCKER_WORKDIR_ROOT = '/var/local/git/grpc' |
||||
|
||||
|
||||
def docker_run_cmdline(cmdline, image, docker_args=[], cwd=None, environ=None): |
||||
"""Wraps given cmdline array to create 'docker run' cmdline from it.""" |
||||
docker_cmdline = ['docker', 'run', '-i', '--rm=true'] |
||||
|
||||
# turn environ into -e docker args |
||||
if environ: |
||||
for k, v in environ.iteritems(): |
||||
docker_cmdline += ['-e', '%s=%s' % (k, v)] |
||||
|
||||
# set working directory |
||||
workdir = DOCKER_WORKDIR_ROOT |
||||
if cwd: |
||||
workdir = os.path.join(workdir, cwd) |
||||
docker_cmdline += ['-w', workdir] |
||||
|
||||
docker_cmdline += docker_args + [image] + cmdline |
||||
return docker_cmdline |
||||
|
||||
|
||||
def bash_login_cmdline(cmdline): |
||||
"""Creates bash -l -c cmdline from args list.""" |
||||
# Use login shell: |
||||
# * rvm and nvm require it |
||||
# * makes error messages clearer if executables are missing |
||||
return ['bash', '-l', '-c', ' '.join(cmdline)] |
||||
|
||||
|
||||
def _job_kill_handler(job): |
||||
if job._spec.container_name: |
||||
dockerjob.docker_kill(job._spec.container_name) |
||||
# When the job times out and we decide to kill it, |
||||
# we need to wait a before restarting the job |
||||
# to prevent "container name already in use" error. |
||||
# TODO(jtattermusch): figure out a cleaner way to to this. |
||||
time.sleep(2) |
||||
|
||||
|
||||
def cloud_to_cloud_jobspec(language, |
||||
test_cases, |
||||
server_addresses, |
||||
test_duration_secs, |
||||
num_channels_per_server, |
||||
num_stubs_per_channel, |
||||
metrics_port, |
||||
docker_image=None): |
||||
"""Creates jobspec for cloud-to-cloud interop test""" |
||||
cmdline = bash_login_cmdline(language.client_cmd([ |
||||
'--test_cases=%s' % test_cases, '--server_addresses=%s' % |
||||
server_addresses, '--test_duration_secs=%s' % test_duration_secs, |
||||
'--num_stubs_per_channel=%s' % num_stubs_per_channel, |
||||
'--num_channels_per_server=%s' % num_channels_per_server, |
||||
'--metrics_port=%s' % metrics_port |
||||
])) |
||||
print cmdline |
||||
cwd = language.client_cwd |
||||
environ = language.global_env() |
||||
if docker_image: |
||||
container_name = dockerjob.random_name('interop_client_%s' % |
||||
language.safename) |
||||
cmdline = docker_run_cmdline( |
||||
cmdline, |
||||
image=docker_image, |
||||
environ=environ, |
||||
cwd=cwd, |
||||
docker_args=['--net=host', '--name', container_name]) |
||||
cwd = None |
||||
|
||||
test_job = jobset.JobSpec(cmdline=cmdline, |
||||
cwd=cwd, |
||||
environ=environ, |
||||
shortname='cloud_to_cloud:%s:%s_server:stress_test' % ( |
||||
language, server_name), |
||||
timeout_seconds=test_duration_secs * 2, |
||||
flake_retries=5 if args.allow_flakes else 0, |
||||
timeout_retries=2 if args.allow_flakes else 0, |
||||
kill_handler=_job_kill_handler) |
||||
test_job.container_name = container_name |
||||
return test_job |
||||
|
||||
|
||||
def server_jobspec(language, docker_image, test_duration_secs): |
||||
"""Create jobspec for running a server""" |
||||
container_name = dockerjob.random_name('interop_server_%s' % |
||||
language.safename) |
||||
cmdline = bash_login_cmdline(language.server_cmd(['--port=%s' % |
||||
_DEFAULT_SERVER_PORT])) |
||||
environ = language.global_env() |
||||
docker_cmdline = docker_run_cmdline( |
||||
cmdline, |
||||
image=docker_image, |
||||
cwd=language.server_cwd, |
||||
environ=environ, |
||||
docker_args=['-p', str(_DEFAULT_SERVER_PORT), '--name', container_name]) |
||||
|
||||
server_job = jobset.JobSpec(cmdline=docker_cmdline, |
||||
environ=environ, |
||||
shortname='interop_server_%s' % language, |
||||
timeout_seconds=test_duration_secs * 3) |
||||
server_job.container_name = container_name |
||||
return server_job |
||||
|
||||
|
||||
def build_interop_image_jobspec(language, tag=None): |
||||
"""Creates jobspec for building stress test docker image for a language""" |
||||
if not tag: |
||||
tag = 'grpc_interop_%s:%s' % (language.safename, uuid.uuid4()) |
||||
env = {'INTEROP_IMAGE': tag, |
||||
'BASE_NAME': 'grpc_interop_%s' % language.safename} |
||||
env['TTY_FLAG'] = '-t' |
||||
build_job = jobset.JobSpec(cmdline=['tools/jenkins/build_interop_image.sh'], |
||||
environ=env, |
||||
shortname='build_docker_%s' % (language), |
||||
timeout_seconds=30 * 60) |
||||
build_job.tag = tag |
||||
return build_job |
||||
|
||||
|
||||
def aggregate_http2_results(stdout): |
||||
match = re.search(r'\{"cases[^\]]*\]\}', stdout) |
||||
if not match: |
||||
return None |
||||
|
||||
results = json.loads(match.group(0)) |
||||
skipped = 0 |
||||
passed = 0 |
||||
failed = 0 |
||||
failed_cases = [] |
||||
for case in results['cases']: |
||||
if case.get('skipped', False): |
||||
skipped += 1 |
||||
else: |
||||
if case.get('passed', False): |
||||
passed += 1 |
||||
else: |
||||
failed += 1 |
||||
failed_cases.append(case.get('name', 'NONAME')) |
||||
return { |
||||
'passed': passed, |
||||
'failed': failed, |
||||
'skipped': skipped, |
||||
'failed_cases': ', '.join(failed_cases), |
||||
'percent': 1.0 * passed / (passed + failed) |
||||
} |
||||
|
||||
|
||||
argp = argparse.ArgumentParser(description='Run stress tests.') |
||||
argp.add_argument('-l', |
||||
'--language', |
||||
choices=['all'] + sorted(_LANGUAGES), |
||||
nargs='+', |
||||
default=['all'], |
||||
help='Clients to run.') |
||||
argp.add_argument('-j', '--jobs', default=multiprocessing.cpu_count(), type=int) |
||||
argp.add_argument( |
||||
'-s', |
||||
'--server', |
||||
choices=['all'] + sorted(_SERVERS), |
||||
action='append', |
||||
help='Run cloud_to_cloud servers in a separate docker ' + 'image.', |
||||
default=[]) |
||||
argp.add_argument( |
||||
'--override_server', |
||||
action='append', |
||||
type=lambda kv: kv.split('='), |
||||
help= |
||||
'Use servername=HOST:PORT to explicitly specify a server. E.g. ' |
||||
'csharp=localhost:50000', |
||||
default=[]) |
||||
argp.add_argument('--test_duration_secs', |
||||
action='append', |
||||
help='The duration of the test in seconds', |
||||
default=[_DEFAULT_TEST_DURATION_SECS]) |
||||
argp.add_argument( |
||||
'--allow_flakes', |
||||
default=False, |
||||
action='store_const', |
||||
const=True, |
||||
help= |
||||
'Allow flaky tests to show as passing (re-runs failed tests up to five times)') |
||||
|
||||
args = argp.parse_args() |
||||
|
||||
servers = set( |
||||
s |
||||
for s in itertools.chain.from_iterable(_SERVERS if x == 'all' else [x] |
||||
for x in args.server)) |
||||
|
||||
languages = set(_LANGUAGES[l] |
||||
for l in itertools.chain.from_iterable(_LANGUAGES.iterkeys( |
||||
) if x == 'all' else [x] for x in args.language)) |
||||
|
||||
docker_images = {} |
||||
# languages for which to build docker images |
||||
languages_to_build = set( |
||||
_LANGUAGES[k] |
||||
for k in set([str(l) for l in languages] + [s for s in servers])) |
||||
build_jobs = [] |
||||
for l in languages_to_build: |
||||
job = build_interop_image_jobspec(l) |
||||
docker_images[str(l)] = job.tag |
||||
build_jobs.append(job) |
||||
|
||||
if build_jobs: |
||||
jobset.message('START', 'Building interop docker images.', do_newline=True) |
||||
num_failures, _ = jobset.run(build_jobs, |
||||
newline_on_success=True, |
||||
maxjobs=args.jobs) |
||||
if num_failures == 0: |
||||
jobset.message('SUCCESS', |
||||
'All docker images built successfully.', |
||||
do_newline=True) |
||||
else: |
||||
jobset.message('FAILED', |
||||
'Failed to build interop docker images.', |
||||
do_newline=True) |
||||
for image in docker_images.itervalues(): |
||||
dockerjob.remove_image(image, skip_nonexistent=True) |
||||
sys.exit(1) |
||||
|
||||
# Start interop servers. |
||||
server_jobs = {} |
||||
server_addresses = {} |
||||
try: |
||||
for s in servers: |
||||
lang = str(s) |
||||
spec = server_jobspec(_LANGUAGES[lang], docker_images.get(lang), _DEFAULT_TEST_DURATION_SECS) |
||||
job = dockerjob.DockerJob(spec) |
||||
server_jobs[lang] = job |
||||
server_addresses[lang] = ('localhost', |
||||
job.mapped_port(_DEFAULT_SERVER_PORT)) |
||||
|
||||
jobs = [] |
||||
|
||||
for server in args.override_server: |
||||
server_name = server[0] |
||||
(server_host, server_port) = server[1].split(':') |
||||
server_addresses[server_name] = (server_host, server_port) |
||||
|
||||
for server_name, server_address in server_addresses.iteritems(): |
||||
(server_host, server_port) = server_address |
||||
for language in languages: |
||||
test_job = cloud_to_cloud_jobspec( |
||||
language, |
||||
_DEFAULT_TEST_CASES, |
||||
('%s:%s' % (server_host, server_port)), |
||||
_DEFAULT_TEST_DURATION_SECS, |
||||
_DEFAULT_NUM_CHANNELS_PER_SERVER, |
||||
_DEFAULT_NUM_STUBS_PER_CHANNEL, |
||||
_DEFAULT_METRICS_PORT, |
||||
docker_image=docker_images.get(str(language))) |
||||
jobs.append(test_job) |
||||
|
||||
if not jobs: |
||||
print 'No jobs to run.' |
||||
for image in docker_images.itervalues(): |
||||
dockerjob.remove_image(image, skip_nonexistent=True) |
||||
sys.exit(1) |
||||
|
||||
num_failures, resultset = jobset.run(jobs, |
||||
newline_on_success=True, |
||||
maxjobs=args.jobs) |
||||
if num_failures: |
||||
jobset.message('FAILED', 'Some tests failed', do_newline=True) |
||||
else: |
||||
jobset.message('SUCCESS', 'All tests passed', do_newline=True) |
||||
|
||||
report_utils.render_junit_xml_report(resultset, 'report.xml') |
||||
|
||||
for name, job in resultset.iteritems(): |
||||
if "http2" in name: |
||||
job[0].http2results = aggregate_http2_results(job[0].message) |
||||
|
||||
report_utils.render_interop_html_report( |
||||
set([str(l) for l in languages]), servers, [], [], [], resultset, |
||||
num_failures, 0, 0) |
||||
|
||||
finally: |
||||
# Check if servers are still running. |
||||
for server, job in server_jobs.iteritems(): |
||||
if not job.is_running(): |
||||
print 'Server "%s" has exited prematurely.' % server |
||||
|
||||
dockerjob.finish_jobs([j for j in server_jobs.itervalues()]) |
||||
|
||||
for image in docker_images.itervalues(): |
||||
print 'Removing docker image %s' % image |
||||
dockerjob.remove_image(image) |
Loading…
Reference in new issue