#!/usr/bin/env python # Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following disclaimer # in the documentation and/or other materials provided with the # distribution. # * Neither the name of Google Inc. nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """Run stress test in C++""" import argparse import atexit import dockerjob import itertools import jobset import json import multiprocessing import os import re import subprocess import sys import tempfile import time import uuid # Docker doesn't clean up after itself, so we do it on exit. atexit.register(lambda: subprocess.call(['stty', 'echo'])) ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..')) os.chdir(ROOT) _DEFAULT_SERVER_PORT = 8080 _DEFAULT_METRICS_PORT = 8081 _DEFAULT_TEST_CASES = 'empty_unary:20,large_unary:20,client_streaming:20,server_streaming:20,empty_stream:20' _DEFAULT_NUM_CHANNELS_PER_SERVER = 5 _DEFAULT_NUM_STUBS_PER_CHANNEL = 10 # 15 mins default _DEFAULT_TEST_DURATION_SECS = 900 class CXXLanguage: def __init__(self): self.client_cwd = None self.server_cwd = None self.safename = 'cxx' def client_cmd(self, args): return ['bins/opt/stress_test'] + args def server_cmd(self, args): return ['bins/opt/interop_server'] + args def global_env(self): return {} def __str__(self): return 'c++' _LANGUAGES = {'c++': CXXLanguage(),} # languages supported as cloud_to_cloud servers _SERVERS = ['c++'] DOCKER_WORKDIR_ROOT = '/var/local/git/grpc' def docker_run_cmdline(cmdline, image, docker_args=[], cwd=None, environ=None): """Wraps given cmdline array to create 'docker run' cmdline from it.""" docker_cmdline = ['docker', 'run', '-i', '--rm=true'] # turn environ into -e docker args if environ: for k, v in environ.iteritems(): docker_cmdline += ['-e', '%s=%s' % (k, v)] # set working directory workdir = DOCKER_WORKDIR_ROOT if cwd: workdir = os.path.join(workdir, cwd) docker_cmdline += ['-w', workdir] docker_cmdline += docker_args + [image] + cmdline return docker_cmdline def bash_login_cmdline(cmdline): """Creates bash -l -c cmdline from args list.""" # Use login shell: # * rvm and nvm require it # * makes error messages clearer if executables are missing return ['bash', '-l', '-c', ' '.join(cmdline)] def _job_kill_handler(job): if job._spec.container_name: dockerjob.docker_kill(job._spec.container_name) # When the job times out and we decide to kill it, # we need to wait a before restarting the job # to prevent "container name already in use" error. # TODO(jtattermusch): figure out a cleaner way to to this. time.sleep(2) def cloud_to_cloud_jobspec(language, test_cases, server_addresses, test_duration_secs, num_channels_per_server, num_stubs_per_channel, metrics_port, docker_image=None): """Creates jobspec for cloud-to-cloud interop test""" cmdline = bash_login_cmdline(language.client_cmd([ '--test_cases=%s' % test_cases, '--server_addresses=%s' % server_addresses, '--test_duration_secs=%s' % test_duration_secs, '--num_stubs_per_channel=%s' % num_stubs_per_channel, '--num_channels_per_server=%s' % num_channels_per_server, '--metrics_port=%s' % metrics_port ])) print cmdline cwd = language.client_cwd environ = language.global_env() if docker_image: container_name = dockerjob.random_name('interop_client_%s' % language.safename) cmdline = docker_run_cmdline( cmdline, image=docker_image, environ=environ, cwd=cwd, docker_args=['--net=host', '--name', container_name]) cwd = None test_job = jobset.JobSpec(cmdline=cmdline, cwd=cwd, environ=environ, shortname='cloud_to_cloud:%s:%s_server:stress_test' % ( language, server_name), timeout_seconds=test_duration_secs * 2, flake_retries=0, timeout_retries=0, kill_handler=_job_kill_handler) test_job.container_name = container_name return test_job def server_jobspec(language, docker_image, test_duration_secs): """Create jobspec for running a server""" container_name = dockerjob.random_name('interop_server_%s' % language.safename) cmdline = bash_login_cmdline(language.server_cmd(['--port=%s' % _DEFAULT_SERVER_PORT])) environ = language.global_env() docker_cmdline = docker_run_cmdline( cmdline, image=docker_image, cwd=language.server_cwd, environ=environ, docker_args=['-p', str(_DEFAULT_SERVER_PORT), '--name', container_name]) server_job = jobset.JobSpec(cmdline=docker_cmdline, environ=environ, shortname='interop_server_%s' % language, timeout_seconds=test_duration_secs * 3) server_job.container_name = container_name return server_job def build_interop_stress_image_jobspec(language, tag=None): """Creates jobspec for building stress test docker image for a language""" if not tag: tag = 'grpc_interop_stress_%s:%s' % (language.safename, uuid.uuid4()) env = {'INTEROP_IMAGE': tag, 'BASE_NAME': 'grpc_interop_stress_%s' % language.safename} build_job = jobset.JobSpec(cmdline=['tools/jenkins/build_interop_stress_image.sh'], environ=env, shortname='build_docker_%s' % (language), timeout_seconds=30 * 60) build_job.tag = tag return build_job argp = argparse.ArgumentParser(description='Run stress tests.') argp.add_argument('-l', '--language', choices=['all'] + sorted(_LANGUAGES), nargs='+', default=['all'], help='Clients to run.') argp.add_argument('-j', '--jobs', default=multiprocessing.cpu_count(), type=int) argp.add_argument( '-s', '--server', choices=['all'] + sorted(_SERVERS), action='append', help='Run cloud_to_cloud servers in a separate docker ' + 'image.', default=[]) argp.add_argument( '--override_server', action='append', type=lambda kv: kv.split('='), help= 'Use servername=HOST:PORT to explicitly specify a server. E.g. ' 'csharp=localhost:50000', default=[]) argp.add_argument('--test_duration_secs', help='The duration of the test in seconds', default=_DEFAULT_TEST_DURATION_SECS) args = argp.parse_args() servers = set( s for s in itertools.chain.from_iterable(_SERVERS if x == 'all' else [x] for x in args.server)) languages = set(_LANGUAGES[l] for l in itertools.chain.from_iterable(_LANGUAGES.iterkeys( ) if x == 'all' else [x] for x in args.language)) docker_images = {} # languages for which to build docker images languages_to_build = set( _LANGUAGES[k] for k in set([str(l) for l in languages] + [s for s in servers])) build_jobs = [] for l in languages_to_build: job = build_interop_stress_image_jobspec(l) docker_images[str(l)] = job.tag build_jobs.append(job) if build_jobs: jobset.message('START', 'Building interop docker images.', do_newline=True) num_failures, _ = jobset.run(build_jobs, newline_on_success=True, maxjobs=args.jobs) if num_failures == 0: jobset.message('SUCCESS', 'All docker images built successfully.', do_newline=True) else: jobset.message('FAILED', 'Failed to build interop docker images.', do_newline=True) for image in docker_images.itervalues(): dockerjob.remove_image(image, skip_nonexistent=True) sys.exit(1) # Start interop servers. server_jobs = {} server_addresses = {} try: for s in servers: lang = str(s) spec = server_jobspec(_LANGUAGES[lang], docker_images.get(lang), args.test_duration_secs) job = dockerjob.DockerJob(spec) server_jobs[lang] = job server_addresses[lang] = ('localhost', job.mapped_port(_DEFAULT_SERVER_PORT)) jobs = [] for server in args.override_server: server_name = server[0] (server_host, server_port) = server[1].split(':') server_addresses[server_name] = (server_host, server_port) for server_name, server_address in server_addresses.iteritems(): (server_host, server_port) = server_address for language in languages: test_job = cloud_to_cloud_jobspec( language, _DEFAULT_TEST_CASES, ('%s:%s' % (server_host, server_port)), args.test_duration_secs, _DEFAULT_NUM_CHANNELS_PER_SERVER, _DEFAULT_NUM_STUBS_PER_CHANNEL, _DEFAULT_METRICS_PORT, docker_image=docker_images.get(str(language))) jobs.append(test_job) if not jobs: print 'No jobs to run.' for image in docker_images.itervalues(): dockerjob.remove_image(image, skip_nonexistent=True) sys.exit(1) num_failures, resultset = jobset.run(jobs, newline_on_success=True, maxjobs=args.jobs) if num_failures: jobset.message('FAILED', 'Some tests failed', do_newline=True) else: jobset.message('SUCCESS', 'All tests passed', do_newline=True) finally: # Check if servers are still running. for server, job in server_jobs.iteritems(): if not job.is_running(): print 'Server "%s" has exited prematurely.' % server dockerjob.finish_jobs([j for j in server_jobs.itervalues()]) for image in docker_images.itervalues(): print 'Removing docker image %s' % image dockerjob.remove_image(image)