yapf run_tests

pull/13719/head
ncteisen 7 years ago
parent a69c6901f9
commit 888093c6ed
  1. 5
      tools/distrib/yapf_code.sh
  2. 42
      tools/run_tests/run_build_statistics.py
  3. 386
      tools/run_tests/run_interop_tests.py
  4. 112
      tools/run_tests/run_microbenchmark.py
  5. 337
      tools/run_tests/run_performance_tests.py
  6. 842
      tools/run_tests/run_tests.py
  7. 224
      tools/run_tests/run_tests_matrix.py
  8. 1
      tools/run_tests/start_port_server.py
  9. 30
      tools/run_tests/task_runner.py

@ -25,10 +25,7 @@ DIRS=(
'tools/distrib' 'tools/distrib'
'tools/interop_matrix' 'tools/interop_matrix'
'tools/profiling' 'tools/profiling'
'tools/run_tests/python_utils' 'tools/run_tests'
'tools/run_tests/sanity'
'tools/run_tests/performance'
'tools/run_tests/artifacts'
) )
EXCLUSIONS=( EXCLUSIONS=(
'grpcio/grpc_*.py' 'grpcio/grpc_*.py'

@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""Tool to get build statistics from Jenkins and upload to BigQuery.""" """Tool to get build statistics from Jenkins and upload to BigQuery."""
from __future__ import print_function from __future__ import print_function
@ -27,16 +26,15 @@ import re
import sys import sys
import urllib import urllib
gcp_utils_dir = os.path.abspath(
gcp_utils_dir = os.path.abspath(os.path.join( os.path.join(os.path.dirname(__file__), '../gcp/utils'))
os.path.dirname(__file__), '../gcp/utils'))
sys.path.append(gcp_utils_dir) sys.path.append(gcp_utils_dir)
import big_query_utils import big_query_utils
_PROJECT_ID = 'grpc-testing' _PROJECT_ID = 'grpc-testing'
_HAS_MATRIX = True _HAS_MATRIX = True
_BUILDS = {'gRPC_interop_master': not _HAS_MATRIX, _BUILDS = {
'gRPC_interop_master': not _HAS_MATRIX,
'gRPC_master_linux': not _HAS_MATRIX, 'gRPC_master_linux': not _HAS_MATRIX,
'gRPC_master_macos': not _HAS_MATRIX, 'gRPC_master_macos': not _HAS_MATRIX,
'gRPC_master_windows': not _HAS_MATRIX, 'gRPC_master_windows': not _HAS_MATRIX,
@ -104,9 +102,12 @@ def _scrape_for_known_errors(html):
errors = re.findall(known_error, html) errors = re.findall(known_error, html)
this_error_count = len(errors) this_error_count = len(errors)
if this_error_count > 0: if this_error_count > 0:
error_list.append({'description': known_error, error_list.append({
'count': this_error_count}) 'description': known_error,
print('====> %d failures due to %s' % (this_error_count, known_error)) 'count': this_error_count
})
print('====> %d failures due to %s' %
(this_error_count, known_error))
return error_list return error_list
@ -119,8 +120,7 @@ def _get_last_processed_buildnumber(build_name):
_PROJECT_ID, _DATASET_ID, build_name) _PROJECT_ID, _DATASET_ID, build_name)
query_job = big_query_utils.sync_query_job(bq, _PROJECT_ID, query) query_job = big_query_utils.sync_query_job(bq, _PROJECT_ID, query)
page = bq.jobs().getQueryResults( page = bq.jobs().getQueryResults(
pageToken=None, pageToken=None, **query_job['jobReference']).execute(num_retries=3)
**query_job['jobReference']).execute(num_retries=3)
if page['rows'][0]['f'][0]['v']: if page['rows'][0]['f'][0]['v']:
return int(page['rows'][0]['f'][0]['v']) return int(page['rows'][0]['f'][0]['v'])
return 0 return 0
@ -136,8 +136,10 @@ def _process_matrix(build, url_base):
url_base, matrix_tuple[0], matrix_tuple[1], matrix_tuple[2]) url_base, matrix_tuple[0], matrix_tuple[1], matrix_tuple[2])
console_url = '%s/config=%s,language=%s,platform=%s/consoleFull' % ( console_url = '%s/config=%s,language=%s,platform=%s/consoleFull' % (
url_base, matrix_tuple[0], matrix_tuple[1], matrix_tuple[2]) url_base, matrix_tuple[0], matrix_tuple[1], matrix_tuple[2])
matrix_dict = {'name': matrix_str, matrix_dict = {
'duration': matrix.get_duration().total_seconds()} 'name': matrix_str,
'duration': matrix.get_duration().total_seconds()
}
matrix_dict.update(_process_build(json_url, console_url)) matrix_dict.update(_process_build(json_url, console_url))
matrix_list.append(matrix_dict) matrix_list.append(matrix_dict)
@ -184,7 +186,9 @@ def _process_build(json_url, console_url):
# parse command line # parse command line
argp = argparse.ArgumentParser(description='Get build statistics.') argp = argparse.ArgumentParser(description='Get build statistics.')
argp.add_argument('-u', '--username', default='jenkins') argp.add_argument('-u', '--username', default='jenkins')
argp.add_argument('-b', '--builds', argp.add_argument(
'-b',
'--builds',
choices=['all'] + sorted(_BUILDS.keys()), choices=['all'] + sorted(_BUILDS.keys()),
nargs='+', nargs='+',
default=['all']) default=['all'])
@ -227,8 +231,10 @@ for build_name in _BUILDS.keys() if 'all' in args.builds else args.builds:
except KeyError: except KeyError:
print('====> Build %s is missing. Skip.' % build_number) print('====> Build %s is missing. Skip.' % build_number)
continue continue
build_result = {'build_number': build_number, build_result = {
'timestamp': str(build.get_timestamp())} 'build_number': build_number,
'timestamp': str(build.get_timestamp())
}
url_base = json_url = '%s/%s/%d' % (_URL_BASE, build_name, build_number) url_base = json_url = '%s/%s/%d' % (_URL_BASE, build_name, build_number)
if _BUILDS[build_name]: # The build has matrix, such as gRPC_master. if _BUILDS[build_name]: # The build has matrix, such as gRPC_master.
build_result['matrix'] = _process_matrix(build, url_base) build_result['matrix'] = _process_matrix(build, url_base)
@ -239,7 +245,7 @@ for build_name in _BUILDS.keys() if 'all' in args.builds else args.builds:
build_stat = _process_build(json_url, console_url) build_stat = _process_build(json_url, console_url)
build_result.update(build_stat) build_result.update(build_stat)
rows = [big_query_utils.make_row(build_number, build_result)] rows = [big_query_utils.make_row(build_number, build_result)]
if not big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID, build_name, if not big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID,
rows): build_name, rows):
print('====> Error uploading result to bigquery.') print('====> Error uploading result to bigquery.')
sys.exit(1) sys.exit(1)

@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""Run interop (cross-language) tests in parallel.""" """Run interop (cross-language) tests in parallel."""
from __future__ import print_function from __future__ import print_function
@ -49,18 +48,20 @@ os.chdir(ROOT)
_DEFAULT_SERVER_PORT = 8080 _DEFAULT_SERVER_PORT = 8080
_SKIP_CLIENT_COMPRESSION = ['client_compressed_unary', _SKIP_CLIENT_COMPRESSION = [
'client_compressed_streaming'] 'client_compressed_unary', 'client_compressed_streaming'
]
_SKIP_SERVER_COMPRESSION = ['server_compressed_unary', _SKIP_SERVER_COMPRESSION = [
'server_compressed_streaming'] 'server_compressed_unary', 'server_compressed_streaming'
]
_SKIP_COMPRESSION = _SKIP_CLIENT_COMPRESSION + _SKIP_SERVER_COMPRESSION _SKIP_COMPRESSION = _SKIP_CLIENT_COMPRESSION + _SKIP_SERVER_COMPRESSION
_SKIP_ADVANCED = ['status_code_and_message', _SKIP_ADVANCED = [
'custom_metadata', 'status_code_and_message', 'custom_metadata', 'unimplemented_method',
'unimplemented_method', 'unimplemented_service'
'unimplemented_service'] ]
_TEST_TIMEOUT = 3 * 60 _TEST_TIMEOUT = 3 * 60
@ -178,7 +179,9 @@ class JavaLanguage:
return ['./run-test-client.sh'] + args return ['./run-test-client.sh'] + args
def client_cmd_http2interop(self, args): def client_cmd_http2interop(self, args):
return ['./interop-testing/build/install/grpc-interop-testing/bin/http2-client'] + args return [
'./interop-testing/build/install/grpc-interop-testing/bin/http2-client'
] + args
def cloud_to_prod_env(self): def cloud_to_prod_env(self):
return {} return {}
@ -254,12 +257,14 @@ class GoLanguage:
def __str__(self): def __str__(self):
return 'go' return 'go'
class Http2Server: class Http2Server:
"""Represents the HTTP/2 Interop Test server """Represents the HTTP/2 Interop Test server
This pretends to be a language in order to be built and run, but really it This pretends to be a language in order to be built and run, but really it
isn't. isn't.
""" """
def __init__(self): def __init__(self):
self.server_cwd = None self.server_cwd = None
self.safename = str(self) self.safename = str(self)
@ -282,12 +287,14 @@ class Http2Server:
def __str__(self): def __str__(self):
return 'http2' return 'http2'
class Http2Client: class Http2Client:
"""Represents the HTTP/2 Interop Test """Represents the HTTP/2 Interop Test
This pretends to be a language in order to be built and run, but really it This pretends to be a language in order to be built and run, but really it
isn't. isn't.
""" """
def __init__(self): def __init__(self):
self.client_cwd = None self.client_cwd = None
self.safename = str(self) self.safename = str(self)
@ -310,6 +317,7 @@ class Http2Client:
def __str__(self): def __str__(self):
return 'http2' return 'http2'
class NodeLanguage: class NodeLanguage:
def __init__(self): def __init__(self):
@ -318,17 +326,21 @@ class NodeLanguage:
self.safename = str(self) self.safename = str(self)
def client_cmd(self, args): def client_cmd(self, args):
return ['packages/grpc-native-core/deps/grpc/tools/run_tests/interop/with_nvm.sh', return [
'packages/grpc-native-core/deps/grpc/tools/run_tests/interop/with_nvm.sh',
'node', '--require', './test/fixtures/native_native', 'node', '--require', './test/fixtures/native_native',
'test/interop/interop_client.js'] + args 'test/interop/interop_client.js'
] + args
def cloud_to_prod_env(self): def cloud_to_prod_env(self):
return {} return {}
def server_cmd(self, args): def server_cmd(self, args):
return ['packages/grpc-native-core/deps/grpc/tools/run_tests/interop/with_nvm.sh', return [
'packages/grpc-native-core/deps/grpc/tools/run_tests/interop/with_nvm.sh',
'node', '--require', './test/fixtures/native_native', 'node', '--require', './test/fixtures/native_native',
'test/interop/interop_server.js'] + args 'test/interop/interop_server.js'
] + args
def global_env(self): def global_env(self):
return {} return {}
@ -392,6 +404,7 @@ class PHP7Language:
def __str__(self): def __str__(self):
return 'php7' return 'php7'
class ObjcLanguage: class ObjcLanguage:
def __init__(self): def __init__(self):
@ -426,6 +439,7 @@ class ObjcLanguage:
def __str__(self): def __str__(self):
return 'objc' return 'objc'
class RubyLanguage: class RubyLanguage:
def __init__(self): def __init__(self):
@ -434,15 +448,19 @@ class RubyLanguage:
self.safename = str(self) self.safename = str(self)
def client_cmd(self, args): def client_cmd(self, args):
return ['tools/run_tests/interop/with_rvm.sh', return [
'ruby', 'src/ruby/pb/test/client.rb'] + args 'tools/run_tests/interop/with_rvm.sh', 'ruby',
'src/ruby/pb/test/client.rb'
] + args
def cloud_to_prod_env(self): def cloud_to_prod_env(self):
return {} return {}
def server_cmd(self, args): def server_cmd(self, args):
return ['tools/run_tests/interop/with_rvm.sh', return [
'ruby', 'src/ruby/pb/test/server.rb'] + args 'tools/run_tests/interop/with_rvm.sh', 'ruby',
'src/ruby/pb/test/server.rb'
] + args
def global_env(self): def global_env(self):
return {} return {}
@ -456,6 +474,7 @@ class RubyLanguage:
def __str__(self): def __str__(self):
return 'ruby' return 'ruby'
class PythonLanguage: class PythonLanguage:
def __init__(self): def __init__(self):
@ -466,15 +485,13 @@ class PythonLanguage:
def client_cmd(self, args): def client_cmd(self, args):
return [ return [
'py27/bin/python', 'py27/bin/python', 'src/python/grpcio_tests/setup.py',
'src/python/grpcio_tests/setup.py', 'run_interop', '--client', '--args="{}"'.format(' '.join(args))
'run_interop',
'--client',
'--args="{}"'.format(' '.join(args))
] ]
def client_cmd_http2interop(self, args): def client_cmd_http2interop(self, args):
return [ 'py27/bin/python', return [
'py27/bin/python',
'src/python/grpcio_tests/tests/http2/negative_http2_client.py', 'src/python/grpcio_tests/tests/http2/negative_http2_client.py',
] + args ] + args
@ -483,16 +500,15 @@ class PythonLanguage:
def server_cmd(self, args): def server_cmd(self, args):
return [ return [
'py27/bin/python', 'py27/bin/python', 'src/python/grpcio_tests/setup.py',
'src/python/grpcio_tests/setup.py', 'run_interop', '--server', '--args="{}"'.format(' '.join(args))
'run_interop',
'--server',
'--args="{}"'.format(' '.join(args))
] ]
def global_env(self): def global_env(self):
return {'LD_LIBRARY_PATH': '{}/libs/opt'.format(DOCKER_WORKDIR_ROOT), return {
'PYTHONPATH': '{}/src/python/gens'.format(DOCKER_WORKDIR_ROOT)} 'LD_LIBRARY_PATH': '{}/libs/opt'.format(DOCKER_WORKDIR_ROOT),
'PYTHONPATH': '{}/src/python/gens'.format(DOCKER_WORKDIR_ROOT)
}
def unimplemented_test_cases(self): def unimplemented_test_cases(self):
return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING return _SKIP_COMPRESSION + _SKIP_DATA_FRAME_PADDING
@ -520,33 +536,47 @@ _LANGUAGES = {
} }
# languages supported as cloud_to_cloud servers # languages supported as cloud_to_cloud servers
_SERVERS = ['c++', 'node', 'csharp', 'csharpcoreclr', 'java', 'go', 'ruby', 'python'] _SERVERS = [
'c++', 'node', 'csharp', 'csharpcoreclr', 'java', 'go', 'ruby', 'python'
]
_TEST_CASES = ['large_unary', 'empty_unary', 'ping_pong', _TEST_CASES = [
'empty_stream', 'client_streaming', 'server_streaming', 'large_unary', 'empty_unary', 'ping_pong', 'empty_stream',
'cancel_after_begin', 'cancel_after_first_response', 'client_streaming', 'server_streaming', 'cancel_after_begin',
'timeout_on_sleeping_server', 'custom_metadata', 'cancel_after_first_response', 'timeout_on_sleeping_server',
'status_code_and_message', 'unimplemented_method', 'custom_metadata', 'status_code_and_message', 'unimplemented_method',
'client_compressed_unary', 'server_compressed_unary', 'client_compressed_unary', 'server_compressed_unary',
'client_compressed_streaming', 'server_compressed_streaming', 'client_compressed_streaming', 'server_compressed_streaming',
'unimplemented_service'] 'unimplemented_service'
]
_AUTH_TEST_CASES = ['compute_engine_creds', 'jwt_token_creds', _AUTH_TEST_CASES = [
'oauth2_auth_token', 'per_rpc_creds'] 'compute_engine_creds', 'jwt_token_creds', 'oauth2_auth_token',
'per_rpc_creds'
]
_HTTP2_TEST_CASES = ['tls', 'framing'] _HTTP2_TEST_CASES = ['tls', 'framing']
_HTTP2_SERVER_TEST_CASES = ['rst_after_header', 'rst_after_data', 'rst_during_data', _HTTP2_SERVER_TEST_CASES = [
'goaway', 'ping', 'max_streams', 'data_frame_padding', 'no_df_padding_sanity_test'] 'rst_after_header', 'rst_after_data', 'rst_during_data', 'goaway', 'ping',
'max_streams', 'data_frame_padding', 'no_df_padding_sanity_test'
]
_GRPC_CLIENT_TEST_CASES_FOR_HTTP2_SERVER_TEST_CASES = { 'data_frame_padding': 'large_unary', 'no_df_padding_sanity_test': 'large_unary' } _GRPC_CLIENT_TEST_CASES_FOR_HTTP2_SERVER_TEST_CASES = {
'data_frame_padding': 'large_unary',
'no_df_padding_sanity_test': 'large_unary'
}
_HTTP2_SERVER_TEST_CASES_THAT_USE_GRPC_CLIENTS = _GRPC_CLIENT_TEST_CASES_FOR_HTTP2_SERVER_TEST_CASES.keys() _HTTP2_SERVER_TEST_CASES_THAT_USE_GRPC_CLIENTS = _GRPC_CLIENT_TEST_CASES_FOR_HTTP2_SERVER_TEST_CASES.keys(
)
_LANGUAGES_WITH_HTTP2_CLIENTS_FOR_HTTP2_SERVER_TEST_CASES = ['java', 'go', 'python', 'c++'] _LANGUAGES_WITH_HTTP2_CLIENTS_FOR_HTTP2_SERVER_TEST_CASES = [
'java', 'go', 'python', 'c++'
]
DOCKER_WORKDIR_ROOT = '/var/local/git/grpc' DOCKER_WORKDIR_ROOT = '/var/local/git/grpc'
def docker_run_cmdline(cmdline, image, docker_args=[], cwd=None, environ=None): def docker_run_cmdline(cmdline, image, docker_args=[], cwd=None, environ=None):
"""Wraps given cmdline array to create 'docker run' cmdline from it.""" """Wraps given cmdline array to create 'docker run' cmdline from it."""
docker_cmdline = ['docker', 'run', '-i', '--rm=true'] docker_cmdline = ['docker', 'run', '-i', '--rm=true']
@ -612,7 +642,10 @@ def auth_options(language, test_case):
default_account_arg = '--default_service_account=830293263384-compute@developer.gserviceaccount.com' default_account_arg = '--default_service_account=830293263384-compute@developer.gserviceaccount.com'
if test_case in ['jwt_token_creds', 'per_rpc_creds', 'oauth2_auth_token']: if test_case in ['jwt_token_creds', 'per_rpc_creds', 'oauth2_auth_token']:
if language in ['csharp', 'csharpcoreclr', 'node', 'php', 'php7', 'python', 'ruby']: if language in [
'csharp', 'csharpcoreclr', 'node', 'php', 'php7', 'python',
'ruby'
]:
env['GOOGLE_APPLICATION_CREDENTIALS'] = key_filepath env['GOOGLE_APPLICATION_CREDENTIALS'] = key_filepath
else: else:
cmdargs += [key_file_arg] cmdargs += [key_file_arg]
@ -640,17 +673,20 @@ def _job_kill_handler(job):
time.sleep(2) time.sleep(2)
def cloud_to_prod_jobspec(language, test_case, server_host_name, def cloud_to_prod_jobspec(language,
server_host_detail, docker_image=None, auth=False, test_case,
server_host_name,
server_host_detail,
docker_image=None,
auth=False,
manual_cmd_log=None): manual_cmd_log=None):
"""Creates jobspec for cloud-to-prod interop test""" """Creates jobspec for cloud-to-prod interop test"""
container_name = None container_name = None
cmdargs = [ cmdargs = [
'--server_host=%s' % server_host_detail[0], '--server_host=%s' % server_host_detail[0],
'--server_host_override=%s' % server_host_detail[1], '--server_host_override=%s' % server_host_detail[1],
'--server_port=443', '--server_port=443', '--use_tls=true', '--test_case=%s' % test_case
'--use_tls=true', ]
'--test_case=%s' % test_case]
environ = dict(language.cloud_to_prod_env(), **language.global_env()) environ = dict(language.cloud_to_prod_env(), **language.global_env())
if auth: if auth:
auth_cmdargs, auth_env = auth_options(language, test_case) auth_cmdargs, auth_env = auth_options(language, test_case)
@ -662,15 +698,16 @@ def cloud_to_prod_jobspec(language, test_case, server_host_name,
if docker_image: if docker_image:
container_name = dockerjob.random_name('interop_client_%s' % container_name = dockerjob.random_name('interop_client_%s' %
language.safename) language.safename)
cmdline = docker_run_cmdline(cmdline, cmdline = docker_run_cmdline(
cmdline,
image=docker_image, image=docker_image,
cwd=cwd, cwd=cwd,
environ=environ, environ=environ,
docker_args=['--net=host', docker_args=['--net=host', '--name=%s' % container_name])
'--name=%s' % container_name])
if manual_cmd_log is not None: if manual_cmd_log is not None:
if manual_cmd_log == []: if manual_cmd_log == []:
manual_cmd_log.append('echo "Testing ${docker_image:=%s}"' % docker_image) manual_cmd_log.append('echo "Testing ${docker_image:=%s}"' %
docker_image)
manual_cmd_log.append(manual_cmdline(cmdline, docker_image)) manual_cmd_log.append(manual_cmdline(cmdline, docker_image))
cwd = None cwd = None
environ = None environ = None
@ -691,8 +728,13 @@ def cloud_to_prod_jobspec(language, test_case, server_host_name,
return test_job return test_job
def cloud_to_cloud_jobspec(language, test_case, server_name, server_host, def cloud_to_cloud_jobspec(language,
server_port, docker_image=None, insecure=False, test_case,
server_name,
server_host,
server_port,
docker_image=None,
insecure=False,
manual_cmd_log=None): manual_cmd_log=None):
"""Creates jobspec for cloud-to-cloud interop test""" """Creates jobspec for cloud-to-cloud interop test"""
interop_only_options = [ interop_only_options = [
@ -703,9 +745,11 @@ def cloud_to_cloud_jobspec(language, test_case, server_name, server_host,
client_test_case = test_case client_test_case = test_case
if test_case in _HTTP2_SERVER_TEST_CASES_THAT_USE_GRPC_CLIENTS: if test_case in _HTTP2_SERVER_TEST_CASES_THAT_USE_GRPC_CLIENTS:
client_test_case = _GRPC_CLIENT_TEST_CASES_FOR_HTTP2_SERVER_TEST_CASES[test_case] client_test_case = _GRPC_CLIENT_TEST_CASES_FOR_HTTP2_SERVER_TEST_CASES[
test_case]
if client_test_case in language.unimplemented_test_cases(): if client_test_case in language.unimplemented_test_cases():
print('asking client %s to run unimplemented test case %s' % (repr(language), client_test_case)) print('asking client %s to run unimplemented test case %s' %
(repr(language), client_test_case))
sys.exit(1) sys.exit(1)
common_options = [ common_options = [
@ -720,25 +764,29 @@ def cloud_to_cloud_jobspec(language, test_case, server_name, server_host,
cmdline = bash_cmdline(language.client_cmd(client_options)) cmdline = bash_cmdline(language.client_cmd(client_options))
cwd = language.client_cwd cwd = language.client_cwd
else: else:
cmdline = bash_cmdline(language.client_cmd_http2interop(common_options)) cmdline = bash_cmdline(
language.client_cmd_http2interop(common_options))
cwd = language.http2_cwd cwd = language.http2_cwd
else: else:
cmdline = bash_cmdline(language.client_cmd(common_options+interop_only_options)) cmdline = bash_cmdline(
language.client_cmd(common_options + interop_only_options))
cwd = language.client_cwd cwd = language.client_cwd
environ = language.global_env() environ = language.global_env()
if docker_image and language.safename != 'objc': if docker_image and language.safename != 'objc':
# we can't run client in docker for objc. # we can't run client in docker for objc.
container_name = dockerjob.random_name('interop_client_%s' % language.safename) container_name = dockerjob.random_name('interop_client_%s' %
cmdline = docker_run_cmdline(cmdline, language.safename)
cmdline = docker_run_cmdline(
cmdline,
image=docker_image, image=docker_image,
environ=environ, environ=environ,
cwd=cwd, cwd=cwd,
docker_args=['--net=host', docker_args=['--net=host', '--name=%s' % container_name])
'--name=%s' % container_name])
if manual_cmd_log is not None: if manual_cmd_log is not None:
if manual_cmd_log == []: if manual_cmd_log == []:
manual_cmd_log.append('echo "Testing ${docker_image:=%s}"' % docker_image) manual_cmd_log.append('echo "Testing ${docker_image:=%s}"' %
docker_image)
manual_cmd_log.append(manual_cmdline(cmdline, docker_image)) manual_cmd_log.append(manual_cmdline(cmdline, docker_image))
cwd = None cwd = None
@ -759,10 +807,13 @@ def cloud_to_cloud_jobspec(language, test_case, server_name, server_host,
def server_jobspec(language, docker_image, insecure=False, manual_cmd_log=None): def server_jobspec(language, docker_image, insecure=False, manual_cmd_log=None):
"""Create jobspec for running a server""" """Create jobspec for running a server"""
container_name = dockerjob.random_name('interop_server_%s' % language.safename) container_name = dockerjob.random_name('interop_server_%s' %
language.safename)
cmdline = bash_cmdline( cmdline = bash_cmdline(
language.server_cmd(['--port=%s' % _DEFAULT_SERVER_PORT, language.server_cmd([
'--use_tls=%s' % ('false' if insecure else 'true')])) '--port=%s' % _DEFAULT_SERVER_PORT, '--use_tls=%s' % (
'false' if insecure else 'true')
]))
environ = language.global_env() environ = language.global_env()
docker_args = ['--name=%s' % container_name] docker_args = ['--name=%s' % container_name]
if language.safename == 'http2': if language.safename == 'http2':
@ -783,8 +834,8 @@ def server_jobspec(language, docker_image, insecure=False, manual_cmd_log=None):
# command line. # command line.
docker_args += [ docker_args += [
'--health-cmd=python test/http2_test/http2_server_health_check.py ' '--health-cmd=python test/http2_test/http2_server_health_check.py '
'--server_host=%s --server_port=%d' '--server_host=%s --server_port=%d' %
% ('localhost', _DEFAULT_SERVER_PORT), ('localhost', _DEFAULT_SERVER_PORT),
'--health-interval=1s', '--health-interval=1s',
'--health-retries=5', '--health-retries=5',
'--health-timeout=10s', '--health-timeout=10s',
@ -793,14 +844,16 @@ def server_jobspec(language, docker_image, insecure=False, manual_cmd_log=None):
else: else:
docker_args += ['-p', str(_DEFAULT_SERVER_PORT)] docker_args += ['-p', str(_DEFAULT_SERVER_PORT)]
docker_cmdline = docker_run_cmdline(cmdline, docker_cmdline = docker_run_cmdline(
cmdline,
image=docker_image, image=docker_image,
cwd=language.server_cwd, cwd=language.server_cwd,
environ=environ, environ=environ,
docker_args=docker_args) docker_args=docker_args)
if manual_cmd_log is not None: if manual_cmd_log is not None:
if manual_cmd_log == []: if manual_cmd_log == []:
manual_cmd_log.append('echo "Testing ${docker_image:=%s}"' % docker_image) manual_cmd_log.append('echo "Testing ${docker_image:=%s}"' %
docker_image)
manual_cmd_log.append(manual_cmdline(docker_cmdline, docker_image)) manual_cmd_log.append(manual_cmdline(docker_cmdline, docker_image))
server_job = jobset.JobSpec( server_job = jobset.JobSpec(
cmdline=docker_cmdline, cmdline=docker_cmdline,
@ -815,8 +868,10 @@ def build_interop_image_jobspec(language, tag=None):
"""Creates jobspec for building interop docker image for a language""" """Creates jobspec for building interop docker image for a language"""
if not tag: if not tag:
tag = 'grpc_interop_%s:%s' % (language.safename, uuid.uuid4()) tag = 'grpc_interop_%s:%s' % (language.safename, uuid.uuid4())
env = {'INTEROP_IMAGE': tag, env = {
'BASE_NAME': 'grpc_interop_%s' % language.safename} 'INTEROP_IMAGE': tag,
'BASE_NAME': 'grpc_interop_%s' % language.safename
}
if not args.travis: if not args.travis:
env['TTY_FLAG'] = '-t' env['TTY_FLAG'] = '-t'
# This env variable is used to get around the github rate limit # This env variable is used to get around the github rate limit
@ -861,126 +916,141 @@ def aggregate_http2_results(stdout):
'percent': 1.0 * passed / (passed + failed) 'percent': 1.0 * passed / (passed + failed)
} }
# A dictionary of prod servers to test. # A dictionary of prod servers to test.
# Format: server_name: (server_host, server_host_override, errors_allowed) # Format: server_name: (server_host, server_host_override, errors_allowed)
# TODO(adelez): implement logic for errors_allowed where if the indicated tests # TODO(adelez): implement logic for errors_allowed where if the indicated tests
# fail, they don't impact the overall test result. # fail, they don't impact the overall test result.
prod_servers = { prod_servers = {
'default': ('216.239.32.254', 'default': ('216.239.32.254', 'grpc-test.sandbox.googleapis.com', False),
'grpc-test.sandbox.googleapis.com', False), 'gateway_v2': ('216.239.32.254', 'grpc-test2.sandbox.googleapis.com', True),
'gateway_v2': ('216.239.32.254',
'grpc-test2.sandbox.googleapis.com', True),
'cloud_gateway': ('216.239.32.255', 'grpc-test.sandbox.googleapis.com', 'cloud_gateway': ('216.239.32.255', 'grpc-test.sandbox.googleapis.com',
False), False),
'cloud_gateway_v2': ('216.239.32.255', 'grpc-test2.sandbox.googleapis.com', 'cloud_gateway_v2': ('216.239.32.255', 'grpc-test2.sandbox.googleapis.com',
True), True),
'gateway_v4': ('216.239.32.254', 'gateway_v4': ('216.239.32.254', 'grpc-test4.sandbox.googleapis.com', True),
'grpc-test4.sandbox.googleapis.com', True),
'cloud_gateway_v4': ('216.239.32.255', 'grpc-test4.sandbox.googleapis.com', 'cloud_gateway_v4': ('216.239.32.255', 'grpc-test4.sandbox.googleapis.com',
True), True),
} }
argp = argparse.ArgumentParser(description='Run interop tests.') argp = argparse.ArgumentParser(description='Run interop tests.')
argp.add_argument('-l', '--language', argp.add_argument(
'-l',
'--language',
choices=['all'] + sorted(_LANGUAGES), choices=['all'] + sorted(_LANGUAGES),
nargs='+', nargs='+',
default=['all'], default=['all'],
help='Clients to run. Objc client can be only run on OSX.') help='Clients to run. Objc client can be only run on OSX.')
argp.add_argument('-j', '--jobs', default=multiprocessing.cpu_count(), type=int) argp.add_argument('-j', '--jobs', default=multiprocessing.cpu_count(), type=int)
argp.add_argument('--cloud_to_prod', argp.add_argument(
'--cloud_to_prod',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help='Run cloud_to_prod tests.') help='Run cloud_to_prod tests.')
argp.add_argument('--cloud_to_prod_auth', argp.add_argument(
'--cloud_to_prod_auth',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help='Run cloud_to_prod_auth tests.') help='Run cloud_to_prod_auth tests.')
argp.add_argument('--prod_servers', argp.add_argument(
'--prod_servers',
choices=prod_servers.keys(), choices=prod_servers.keys(),
default=['default'], default=['default'],
nargs='+', nargs='+',
help=('The servers to run cloud_to_prod and ' help=('The servers to run cloud_to_prod and '
'cloud_to_prod_auth tests against.')) 'cloud_to_prod_auth tests against.'))
argp.add_argument('-s', '--server', argp.add_argument(
'-s',
'--server',
choices=['all'] + sorted(_SERVERS), choices=['all'] + sorted(_SERVERS),
nargs='+', nargs='+',
help='Run cloud_to_cloud servers in a separate docker ' + help='Run cloud_to_cloud servers in a separate docker ' +
'image. Servers can only be started automatically if ' + 'image. Servers can only be started automatically if ' +
'--use_docker option is enabled.', '--use_docker option is enabled.',
default=[]) default=[])
argp.add_argument('--override_server', argp.add_argument(
'--override_server',
action='append', action='append',
type=lambda kv: kv.split('='), type=lambda kv: kv.split('='),
help='Use servername=HOST:PORT to explicitly specify a server. E.g. csharp=localhost:50000', help='Use servername=HOST:PORT to explicitly specify a server. E.g. csharp=localhost:50000',
default=[]) default=[])
argp.add_argument('-t', '--travis', argp.add_argument(
default=False, '-t', '--travis', default=False, action='store_const', const=True)
action='store_const', argp.add_argument(
const=True) '-v', '--verbose', default=False, action='store_const', const=True)
argp.add_argument('-v', '--verbose', argp.add_argument(
default=False, '--use_docker',
action='store_const',
const=True)
argp.add_argument('--use_docker',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help='Run all the interop tests under docker. That provides ' + help='Run all the interop tests under docker. That provides ' +
'additional isolation and prevents the need to install ' + 'additional isolation and prevents the need to install ' +
'language specific prerequisites. Only available on Linux.') 'language specific prerequisites. Only available on Linux.')
argp.add_argument('--allow_flakes', argp.add_argument(
'--allow_flakes',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help='Allow flaky tests to show as passing (re-runs failed tests up to five times)') help='Allow flaky tests to show as passing (re-runs failed tests up to five times)'
argp.add_argument('--manual_run', )
argp.add_argument(
'--manual_run',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help='Prepare things for running interop tests manually. ' + help='Prepare things for running interop tests manually. ' +
'Preserve docker images after building them and skip ' 'Preserve docker images after building them and skip '
'actually running the tests. Only print commands to run by ' + 'actually running the tests. Only print commands to run by ' + 'hand.')
'hand.') argp.add_argument(
argp.add_argument('--http2_interop', '--http2_interop',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help='Enable HTTP/2 client edge case testing. (Bad client, good server)') help='Enable HTTP/2 client edge case testing. (Bad client, good server)')
argp.add_argument('--http2_server_interop', argp.add_argument(
'--http2_server_interop',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help='Enable HTTP/2 server edge case testing. (Includes positive and negative tests') help='Enable HTTP/2 server edge case testing. (Includes positive and negative tests'
argp.add_argument('--insecure', )
argp.add_argument(
'--insecure',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help='Whether to use secure channel.') help='Whether to use secure channel.')
argp.add_argument('--internal_ci', argp.add_argument(
'--internal_ci',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help=('Put reports into subdirectories to improve ' help=('Put reports into subdirectories to improve '
'presentation of results by Internal CI.')) 'presentation of results by Internal CI.'))
argp.add_argument('--bq_result_table', argp.add_argument(
'--bq_result_table',
default='', default='',
type=str, type=str,
nargs='?', nargs='?',
help='Upload test results to a specified BQ table.') help='Upload test results to a specified BQ table.')
args = argp.parse_args() args = argp.parse_args()
servers = set(s for s in itertools.chain.from_iterable(_SERVERS servers = set(
if x == 'all' else [x] s
for s in itertools.chain.from_iterable(_SERVERS if x == 'all' else [x]
for x in args.server)) for x in args.server))
if args.use_docker: if args.use_docker:
if not args.travis: if not args.travis:
print('Seen --use_docker flag, will run interop tests under docker.') print('Seen --use_docker flag, will run interop tests under docker.')
print('') print('')
print('IMPORTANT: The changes you are testing need to be locally committed') print(
print('because only the committed changes in the current branch will be') 'IMPORTANT: The changes you are testing need to be locally committed'
)
print(
'because only the committed changes in the current branch will be')
print('copied to the docker environment.') print('copied to the docker environment.')
time.sleep(5) time.sleep(5)
@ -989,22 +1059,24 @@ if args.manual_run and not args.use_docker:
sys.exit(1) sys.exit(1)
if not args.use_docker and servers: if not args.use_docker and servers:
print('Running interop servers is only supported with --use_docker option enabled.') print(
'Running interop servers is only supported with --use_docker option enabled.'
)
sys.exit(1) sys.exit(1)
# we want to include everything but objc in 'all' # we want to include everything but objc in 'all'
# because objc won't run on non-mac platforms # because objc won't run on non-mac platforms
all_but_objc = set(six.iterkeys(_LANGUAGES)) - set(['objc']) all_but_objc = set(six.iterkeys(_LANGUAGES)) - set(['objc'])
languages = set(_LANGUAGES[l] languages = set(
for l in itertools.chain.from_iterable( _LANGUAGES[l]
all_but_objc if x == 'all' else [x] for l in itertools.chain.from_iterable(all_but_objc if x == 'all' else [x]
for x in args.language)) for x in args.language))
languages_http2_clients_for_http2_server_interop = set() languages_http2_clients_for_http2_server_interop = set()
if args.http2_server_interop: if args.http2_server_interop:
languages_http2_clients_for_http2_server_interop = set( languages_http2_clients_for_http2_server_interop = set(
_LANGUAGES[l] for l in _LANGUAGES_WITH_HTTP2_CLIENTS_FOR_HTTP2_SERVER_TEST_CASES _LANGUAGES[l]
for l in _LANGUAGES_WITH_HTTP2_CLIENTS_FOR_HTTP2_SERVER_TEST_CASES
if 'all' in args.language or l in args.language) if 'all' in args.language or l in args.language)
http2Interop = Http2Client() if args.http2_interop else None http2Interop = Http2Client() if args.http2_interop else None
@ -1014,7 +1086,8 @@ docker_images={}
if args.use_docker: if args.use_docker:
# languages for which to build docker images # languages for which to build docker images
languages_to_build = set( languages_to_build = set(
_LANGUAGES[k] for k in set([str(l) for l in languages] + [s for s in servers])) _LANGUAGES[k]
for k in set([str(l) for l in languages] + [s for s in servers]))
languages_to_build = languages_to_build | languages_http2_clients_for_http2_server_interop languages_to_build = languages_to_build | languages_http2_clients_for_http2_server_interop
if args.http2_interop: if args.http2_interop:
@ -1033,17 +1106,22 @@ if args.use_docker:
build_jobs.append(job) build_jobs.append(job)
if build_jobs: if build_jobs:
jobset.message('START', 'Building interop docker images.', do_newline=True) jobset.message(
'START', 'Building interop docker images.', do_newline=True)
if args.verbose: if args.verbose:
print('Jobs to run: \n%s\n' % '\n'.join(str(j) for j in build_jobs)) print('Jobs to run: \n%s\n' % '\n'.join(str(j) for j in build_jobs))
num_failures, _ = jobset.run( num_failures, _ = jobset.run(
build_jobs, newline_on_success=True, maxjobs=args.jobs) build_jobs, newline_on_success=True, maxjobs=args.jobs)
if num_failures == 0: if num_failures == 0:
jobset.message('SUCCESS', 'All docker images built successfully.', jobset.message(
'SUCCESS',
'All docker images built successfully.',
do_newline=True) do_newline=True)
else: else:
jobset.message('FAILED', 'Failed to build interop docker images.', jobset.message(
'FAILED',
'Failed to build interop docker images.',
do_newline=True) do_newline=True)
for image in six.itervalues(docker_images): for image in six.itervalues(docker_images):
dockerjob.remove_image(image, skip_nonexistent=True) dockerjob.remove_image(image, skip_nonexistent=True)
@ -1058,12 +1136,16 @@ server_addresses = {}
try: try:
for s in servers: for s in servers:
lang = str(s) lang = str(s)
spec = server_jobspec(_LANGUAGES[lang], docker_images.get(lang), spec = server_jobspec(
args.insecure, manual_cmd_log=server_manual_cmd_log) _LANGUAGES[lang],
docker_images.get(lang),
args.insecure,
manual_cmd_log=server_manual_cmd_log)
if not args.manual_run: if not args.manual_run:
job = dockerjob.DockerJob(spec) job = dockerjob.DockerJob(spec)
server_jobs[lang] = job server_jobs[lang] = job
server_addresses[lang] = ('localhost', job.mapped_port(_DEFAULT_SERVER_PORT)) server_addresses[lang] = ('localhost',
job.mapped_port(_DEFAULT_SERVER_PORT))
else: else:
# don't run the server, set server port to a placeholder value # don't run the server, set server port to a placeholder value
server_addresses[lang] = ('localhost', '${SERVER_PORT}') server_addresses[lang] = ('localhost', '${SERVER_PORT}')
@ -1072,7 +1154,9 @@ try:
if args.http2_server_interop: if args.http2_server_interop:
# launch a HTTP2 server emulator that creates edge cases # launch a HTTP2 server emulator that creates edge cases
lang = str(http2InteropServer) lang = str(http2InteropServer)
spec = server_jobspec(http2InteropServer, docker_images.get(lang), spec = server_jobspec(
http2InteropServer,
docker_images.get(lang),
manual_cmd_log=server_manual_cmd_log) manual_cmd_log=server_manual_cmd_log)
if not args.manual_run: if not args.manual_run:
http2_server_job = dockerjob.DockerJob(spec) http2_server_job = dockerjob.DockerJob(spec)
@ -1091,7 +1175,9 @@ try:
if not test_case in language.unimplemented_test_cases(): if not test_case in language.unimplemented_test_cases():
if not test_case in _SKIP_ADVANCED + _SKIP_COMPRESSION: if not test_case in _SKIP_ADVANCED + _SKIP_COMPRESSION:
test_job = cloud_to_prod_jobspec( test_job = cloud_to_prod_jobspec(
language, test_case, server_host_name, language,
test_case,
server_host_name,
prod_servers[server_host_name], prod_servers[server_host_name],
docker_image=docker_images.get(str(language)), docker_image=docker_images.get(str(language)),
manual_cmd_log=client_manual_cmd_log) manual_cmd_log=client_manual_cmd_log)
@ -1100,7 +1186,9 @@ try:
if args.http2_interop: if args.http2_interop:
for test_case in _HTTP2_TEST_CASES: for test_case in _HTTP2_TEST_CASES:
test_job = cloud_to_prod_jobspec( test_job = cloud_to_prod_jobspec(
http2Interop, test_case, server_host_name, http2Interop,
test_case,
server_host_name,
prod_servers[server_host_name], prod_servers[server_host_name],
docker_image=docker_images.get(str(http2Interop)), docker_image=docker_images.get(str(http2Interop)),
manual_cmd_log=client_manual_cmd_log) manual_cmd_log=client_manual_cmd_log)
@ -1114,9 +1202,12 @@ try:
for test_case in _AUTH_TEST_CASES: for test_case in _AUTH_TEST_CASES:
if not test_case in language.unimplemented_test_cases(): if not test_case in language.unimplemented_test_cases():
test_job = cloud_to_prod_jobspec( test_job = cloud_to_prod_jobspec(
language, test_case, server_host_name, language,
test_case,
server_host_name,
prod_servers[server_host_name], prod_servers[server_host_name],
docker_image=docker_images.get(str(language)), auth=True, docker_image=docker_images.get(str(language)),
auth=True,
manual_cmd_log=client_manual_cmd_log) manual_cmd_log=client_manual_cmd_log)
jobs.append(test_job) jobs.append(test_job)
@ -1135,7 +1226,8 @@ try:
for test_case in _TEST_CASES: for test_case in _TEST_CASES:
if not test_case in language.unimplemented_test_cases(): if not test_case in language.unimplemented_test_cases():
if not test_case in skip_server: if not test_case in skip_server:
test_job = cloud_to_cloud_jobspec(language, test_job = cloud_to_cloud_jobspec(
language,
test_case, test_case,
server_name, server_name,
server_host, server_host,
@ -1150,7 +1242,8 @@ try:
if server_name == "go": if server_name == "go":
# TODO(carl-mastrangelo): Reenable after https://github.com/grpc/grpc-go/issues/434 # TODO(carl-mastrangelo): Reenable after https://github.com/grpc/grpc-go/issues/434
continue continue
test_job = cloud_to_cloud_jobspec(http2Interop, test_job = cloud_to_cloud_jobspec(
http2Interop,
test_case, test_case,
server_name, server_name,
server_host, server_host,
@ -1164,12 +1257,14 @@ try:
if not args.manual_run: if not args.manual_run:
http2_server_job.wait_for_healthy(timeout_seconds=600) http2_server_job.wait_for_healthy(timeout_seconds=600)
for language in languages_http2_clients_for_http2_server_interop: for language in languages_http2_clients_for_http2_server_interop:
for test_case in set(_HTTP2_SERVER_TEST_CASES) - set(_HTTP2_SERVER_TEST_CASES_THAT_USE_GRPC_CLIENTS): for test_case in set(_HTTP2_SERVER_TEST_CASES) - set(
_HTTP2_SERVER_TEST_CASES_THAT_USE_GRPC_CLIENTS):
offset = sorted(_HTTP2_SERVER_TEST_CASES).index(test_case) offset = sorted(_HTTP2_SERVER_TEST_CASES).index(test_case)
server_port = _DEFAULT_SERVER_PORT + offset server_port = _DEFAULT_SERVER_PORT + offset
if not args.manual_run: if not args.manual_run:
server_port = http2_server_job.mapped_port(server_port) server_port = http2_server_job.mapped_port(server_port)
test_job = cloud_to_cloud_jobspec(language, test_job = cloud_to_cloud_jobspec(
language,
test_case, test_case,
str(http2InteropServer), str(http2InteropServer),
'localhost', 'localhost',
@ -1191,9 +1286,12 @@ try:
if not args.manual_run: if not args.manual_run:
server_port = http2_server_job.mapped_port(server_port) server_port = http2_server_job.mapped_port(server_port)
if not args.insecure: if not args.insecure:
print(('Creating grpc cient to http2 server test case with insecure connection, even though' print((
' args.insecure is False. Http2 test server only supports insecure connections.')) 'Creating grpc cient to http2 server test case with insecure connection, even though'
test_job = cloud_to_cloud_jobspec(language, ' args.insecure is False. Http2 test server only supports insecure connections.'
))
test_job = cloud_to_cloud_jobspec(
language,
test_case, test_case,
str(http2InteropServer), str(http2InteropServer),
'localhost', 'localhost',
@ -1215,7 +1313,9 @@ try:
if args.verbose: if args.verbose:
print('Jobs to run: \n%s\n' % '\n'.join(str(job) for job in jobs)) print('Jobs to run: \n%s\n' % '\n'.join(str(job) for job in jobs))
num_failures, resultset = jobset.run(jobs, newline_on_success=True, num_failures, resultset = jobset.run(
jobs,
newline_on_success=True,
maxjobs=args.jobs, maxjobs=args.jobs,
skip_jobs=args.manual_run) skip_jobs=args.manual_run)
if args.bq_result_table and resultset: if args.bq_result_table and resultset:
@ -1237,14 +1337,14 @@ try:
if "http2" in name: if "http2" in name:
job[0].http2results = aggregate_http2_results(job[0].message) job[0].http2results = aggregate_http2_results(job[0].message)
http2_server_test_cases = ( http2_server_test_cases = (_HTTP2_SERVER_TEST_CASES
_HTTP2_SERVER_TEST_CASES if args.http2_server_interop else []) if args.http2_server_interop else [])
report_utils.render_interop_html_report( report_utils.render_interop_html_report(
set([str(l) for l in languages]), servers, _TEST_CASES, _AUTH_TEST_CASES, set([str(l) for l in languages]), servers, _TEST_CASES,
_HTTP2_TEST_CASES, http2_server_test_cases, resultset, num_failures, _AUTH_TEST_CASES, _HTTP2_TEST_CASES, http2_server_test_cases, resultset,
args.cloud_to_prod_auth or args.cloud_to_prod, args.prod_servers, num_failures, args.cloud_to_prod_auth or args.cloud_to_prod,
args.http2_interop) args.prod_servers, args.http2_interop)
if num_failures: if num_failures:
sys.exit(1) sys.exit(1)

@ -23,7 +23,10 @@ import argparse
import python_utils.jobset as jobset import python_utils.jobset as jobset
import python_utils.start_port_server as start_port_server import python_utils.start_port_server as start_port_server
sys.path.append(os.path.join(os.path.dirname(sys.argv[0]), '..', 'profiling', 'microbenchmarks', 'bm_diff')) sys.path.append(
os.path.join(
os.path.dirname(sys.argv[0]), '..', 'profiling', 'microbenchmarks',
'bm_diff'))
import bm_constants import bm_constants
flamegraph_dir = os.path.join(os.path.expanduser('~'), 'FlameGraph') flamegraph_dir = os.path.join(os.path.expanduser('~'), 'FlameGraph')
@ -34,6 +37,7 @@ if not os.path.exists('reports'):
start_port_server.start_port_server() start_port_server.start_port_server()
def fnize(s): def fnize(s):
out = '' out = ''
for c in s: for c in s:
@ -44,6 +48,7 @@ def fnize(s):
out += c out += c
return out return out
# index html # index html
index_html = """ index_html = """
<html> <html>
@ -53,19 +58,23 @@ index_html = """
<body> <body>
""" """
def heading(name): def heading(name):
global index_html global index_html
index_html += "<h1>%s</h1>\n" % name index_html += "<h1>%s</h1>\n" % name
def link(txt, tgt): def link(txt, tgt):
global index_html global index_html
index_html += "<p><a href=\"%s\">%s</a></p>\n" % ( index_html += "<p><a href=\"%s\">%s</a></p>\n" % (
cgi.escape(tgt, quote=True), cgi.escape(txt)) cgi.escape(tgt, quote=True), cgi.escape(txt))
def text(txt): def text(txt):
global index_html global index_html
index_html += "<p><pre>%s</pre></p>\n" % cgi.escape(txt) index_html += "<p><pre>%s</pre></p>\n" % cgi.escape(txt)
def collect_latency(bm_name, args): def collect_latency(bm_name, args):
"""generate latency profiles""" """generate latency profiles"""
benchmarks = [] benchmarks = []
@ -73,23 +82,30 @@ def collect_latency(bm_name, args):
cleanup = [] cleanup = []
heading('Latency Profiles: %s' % bm_name) heading('Latency Profiles: %s' % bm_name)
subprocess.check_call( subprocess.check_call([
['make', bm_name, 'make', bm_name, 'CONFIG=basicprof', '-j',
'CONFIG=basicprof', '-j', '%d' % multiprocessing.cpu_count()]) '%d' % multiprocessing.cpu_count()
for line in subprocess.check_output(['bins/basicprof/%s' % bm_name, ])
'--benchmark_list_tests']).splitlines(): for line in subprocess.check_output(
['bins/basicprof/%s' % bm_name, '--benchmark_list_tests']).splitlines():
link(line, '%s.txt' % fnize(line)) link(line, '%s.txt' % fnize(line))
benchmarks.append( benchmarks.append(
jobset.JobSpec(['bins/basicprof/%s' % bm_name, jobset.JobSpec(
'--benchmark_filter=^%s$' % line, [
'--benchmark_min_time=0.05'], 'bins/basicprof/%s' % bm_name, '--benchmark_filter=^%s$' %
line, '--benchmark_min_time=0.05'
],
environ={'LATENCY_TRACE': '%s.trace' % fnize(line)}, environ={'LATENCY_TRACE': '%s.trace' % fnize(line)},
shortname='profile-%s' % fnize(line))) shortname='profile-%s' % fnize(line)))
profile_analysis.append( profile_analysis.append(
jobset.JobSpec([sys.executable, jobset.JobSpec(
[
sys.executable,
'tools/profiling/latency_profile/profile_analyzer.py', 'tools/profiling/latency_profile/profile_analyzer.py',
'--source', '%s.trace' % fnize(line), '--fmt', 'simple', '--source', '%s.trace' % fnize(line), '--fmt', 'simple',
'--out', 'reports/%s.txt' % fnize(line)], timeout_seconds=20*60, '--out', 'reports/%s.txt' % fnize(line)
],
timeout_seconds=20 * 60,
shortname='analyze-%s' % fnize(line))) shortname='analyze-%s' % fnize(line)))
cleanup.append(jobset.JobSpec(['rm', '%s.trace' % fnize(line)])) cleanup.append(jobset.JobSpec(['rm', '%s.trace' % fnize(line)]))
# periodically flush out the list of jobs: profile_analysis jobs at least # periodically flush out the list of jobs: profile_analysis jobs at least
@ -99,7 +115,8 @@ def collect_latency(bm_name, args):
if len(benchmarks) >= min(16, multiprocessing.cpu_count()): if len(benchmarks) >= min(16, multiprocessing.cpu_count()):
# run up to half the cpu count: each benchmark can use up to two cores # run up to half the cpu count: each benchmark can use up to two cores
# (one for the microbenchmark, one for the data flush) # (one for the microbenchmark, one for the data flush)
jobset.run(benchmarks, maxjobs=max(1, multiprocessing.cpu_count()/2)) jobset.run(
benchmarks, maxjobs=max(1, multiprocessing.cpu_count() / 2))
jobset.run(profile_analysis, maxjobs=multiprocessing.cpu_count()) jobset.run(profile_analysis, maxjobs=multiprocessing.cpu_count())
jobset.run(cleanup, maxjobs=multiprocessing.cpu_count()) jobset.run(cleanup, maxjobs=multiprocessing.cpu_count())
benchmarks = [] benchmarks = []
@ -111,27 +128,33 @@ def collect_latency(bm_name, args):
jobset.run(profile_analysis, maxjobs=multiprocessing.cpu_count()) jobset.run(profile_analysis, maxjobs=multiprocessing.cpu_count())
jobset.run(cleanup, maxjobs=multiprocessing.cpu_count()) jobset.run(cleanup, maxjobs=multiprocessing.cpu_count())
def collect_perf(bm_name, args): def collect_perf(bm_name, args):
"""generate flamegraphs""" """generate flamegraphs"""
heading('Flamegraphs: %s' % bm_name) heading('Flamegraphs: %s' % bm_name)
subprocess.check_call( subprocess.check_call([
['make', bm_name, 'make', bm_name, 'CONFIG=mutrace', '-j',
'CONFIG=mutrace', '-j', '%d' % multiprocessing.cpu_count()]) '%d' % multiprocessing.cpu_count()
])
benchmarks = [] benchmarks = []
profile_analysis = [] profile_analysis = []
cleanup = [] cleanup = []
for line in subprocess.check_output(['bins/mutrace/%s' % bm_name, for line in subprocess.check_output(
'--benchmark_list_tests']).splitlines(): ['bins/mutrace/%s' % bm_name, '--benchmark_list_tests']).splitlines():
link(line, '%s.svg' % fnize(line)) link(line, '%s.svg' % fnize(line))
benchmarks.append( benchmarks.append(
jobset.JobSpec(['perf', 'record', '-o', '%s-perf.data' % fnize(line), jobset.JobSpec(
'-g', '-F', '997', [
'bins/mutrace/%s' % bm_name, 'perf', 'record', '-o', '%s-perf.data' % fnize(
'--benchmark_filter=^%s$' % line, line), '-g', '-F', '997', 'bins/mutrace/%s' % bm_name,
'--benchmark_min_time=10'], '--benchmark_filter=^%s$' % line, '--benchmark_min_time=10'
],
shortname='perf-%s' % fnize(line))) shortname='perf-%s' % fnize(line)))
profile_analysis.append( profile_analysis.append(
jobset.JobSpec(['tools/run_tests/performance/process_local_perf_flamegraphs.sh'], jobset.JobSpec(
[
'tools/run_tests/performance/process_local_perf_flamegraphs.sh'
],
environ={ environ={
'PERF_BASE_NAME': fnize(line), 'PERF_BASE_NAME': fnize(line),
'OUTPUT_DIR': 'reports', 'OUTPUT_DIR': 'reports',
@ -157,17 +180,21 @@ def collect_perf(bm_name, args):
jobset.run(profile_analysis, maxjobs=multiprocessing.cpu_count()) jobset.run(profile_analysis, maxjobs=multiprocessing.cpu_count())
jobset.run(cleanup, maxjobs=multiprocessing.cpu_count()) jobset.run(cleanup, maxjobs=multiprocessing.cpu_count())
def run_summary(bm_name, cfg, base_json_name): def run_summary(bm_name, cfg, base_json_name):
subprocess.check_call( subprocess.check_call([
['make', bm_name, 'make', bm_name, 'CONFIG=%s' % cfg, '-j',
'CONFIG=%s' % cfg, '-j', '%d' % multiprocessing.cpu_count()]) '%d' % multiprocessing.cpu_count()
cmd = ['bins/%s/%s' % (cfg, bm_name), ])
'--benchmark_out=%s.%s.json' % (base_json_name, cfg), cmd = [
'--benchmark_out_format=json'] 'bins/%s/%s' % (cfg, bm_name), '--benchmark_out=%s.%s.json' %
(base_json_name, cfg), '--benchmark_out_format=json'
]
if args.summary_time is not None: if args.summary_time is not None:
cmd += ['--benchmark_min_time=%d' % args.summary_time] cmd += ['--benchmark_min_time=%d' % args.summary_time]
return subprocess.check_output(cmd) return subprocess.check_output(cmd)
def collect_summary(bm_name, args): def collect_summary(bm_name, args):
heading('Summary: %s [no counters]' % bm_name) heading('Summary: %s [no counters]' % bm_name)
text(run_summary(bm_name, 'opt', bm_name)) text(run_summary(bm_name, 'opt', bm_name))
@ -175,10 +202,15 @@ def collect_summary(bm_name, args):
text(run_summary(bm_name, 'counters', bm_name)) text(run_summary(bm_name, 'counters', bm_name))
if args.bigquery_upload: if args.bigquery_upload:
with open('%s.csv' % bm_name, 'w') as f: with open('%s.csv' % bm_name, 'w') as f:
f.write(subprocess.check_output(['tools/profiling/microbenchmarks/bm2bq.py', f.write(
'%s.counters.json' % bm_name, subprocess.check_output([
'%s.opt.json' % bm_name])) 'tools/profiling/microbenchmarks/bm2bq.py',
subprocess.check_call(['bq', 'load', 'microbenchmarks.microbenchmarks', '%s.csv' % bm_name]) '%s.counters.json' % bm_name, '%s.opt.json' % bm_name
]))
subprocess.check_call([
'bq', 'load', 'microbenchmarks.microbenchmarks', '%s.csv' % bm_name
])
collectors = { collectors = {
'latency': collect_latency, 'latency': collect_latency,
@ -187,23 +219,29 @@ collectors = {
} }
argp = argparse.ArgumentParser(description='Collect data from microbenchmarks') argp = argparse.ArgumentParser(description='Collect data from microbenchmarks')
argp.add_argument('-c', '--collect', argp.add_argument(
'-c',
'--collect',
choices=sorted(collectors.keys()), choices=sorted(collectors.keys()),
nargs='*', nargs='*',
default=sorted(collectors.keys()), default=sorted(collectors.keys()),
help='Which collectors should be run against each benchmark') help='Which collectors should be run against each benchmark')
argp.add_argument('-b', '--benchmarks', argp.add_argument(
'-b',
'--benchmarks',
choices=bm_constants._AVAILABLE_BENCHMARK_TESTS, choices=bm_constants._AVAILABLE_BENCHMARK_TESTS,
default=bm_constants._AVAILABLE_BENCHMARK_TESTS, default=bm_constants._AVAILABLE_BENCHMARK_TESTS,
nargs='+', nargs='+',
type=str, type=str,
help='Which microbenchmarks should be run') help='Which microbenchmarks should be run')
argp.add_argument('--bigquery_upload', argp.add_argument(
'--bigquery_upload',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help='Upload results from summary collection to bigquery') help='Upload results from summary collection to bigquery')
argp.add_argument('--summary_time', argp.add_argument(
'--summary_time',
default=None, default=None,
type=int, type=int,
help='Minimum time to run benchmarks for the summary collection') help='Minimum time to run benchmarks for the summary collection')

@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""Run performance tests locally or remotely.""" """Run performance tests locally or remotely."""
from __future__ import print_function from __future__ import print_function
@ -37,11 +36,9 @@ import performance.scenario_config as scenario_config
import python_utils.jobset as jobset import python_utils.jobset as jobset
import python_utils.report_utils as report_utils import python_utils.report_utils as report_utils
_ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..')) _ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
os.chdir(_ROOT) os.chdir(_ROOT)
_REMOTE_HOST_USERNAME = 'jenkins' _REMOTE_HOST_USERNAME = 'jenkins'
@ -56,7 +53,8 @@ class QpsWorkerJob:
self.perf_file_base_name = perf_file_base_name self.perf_file_base_name = perf_file_base_name
def start(self): def start(self):
self._job = jobset.Job(self._spec, newline_on_success=True, travis=True, add_env={}) self._job = jobset.Job(
self._spec, newline_on_success=True, travis=True, add_env={})
def is_running(self): def is_running(self):
"""Polls a job and returns True if given job is still running.""" """Polls a job and returns True if given job is still running."""
@ -68,7 +66,11 @@ class QpsWorkerJob:
self._job = None self._job = None
def create_qpsworker_job(language, shortname=None, port=10000, remote_host=None, perf_cmd=None): def create_qpsworker_job(language,
shortname=None,
port=10000,
remote_host=None,
perf_cmd=None):
cmdline = (language.worker_cmdline() + ['--driver_port=%s' % port]) cmdline = (language.worker_cmdline() + ['--driver_port=%s' % port])
if remote_host: if remote_host:
@ -80,14 +82,19 @@ def create_qpsworker_job(language, shortname=None, port=10000, remote_host=None,
if perf_cmd: if perf_cmd:
perf_file_base_name = '%s-%s' % (host_and_port, shortname) perf_file_base_name = '%s-%s' % (host_and_port, shortname)
# specify -o output file so perf.data gets collected when worker stopped # specify -o output file so perf.data gets collected when worker stopped
cmdline = perf_cmd + ['-o', '%s-perf.data' % perf_file_base_name] + cmdline cmdline = perf_cmd + ['-o', '%s-perf.data' % perf_file_base_name
] + cmdline
worker_timeout = 3 * 60 worker_timeout = 3 * 60
if remote_host: if remote_host:
user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host) user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)
ssh_cmd = ['ssh'] ssh_cmd = ['ssh']
cmdline = ['timeout', '%s' % (worker_timeout + 30)] + cmdline cmdline = ['timeout', '%s' % (worker_timeout + 30)] + cmdline
ssh_cmd.extend([str(user_at_host), 'cd ~/performance_workspace/grpc/ && python tools/run_tests/start_port_server.py && %s' % ' '.join(cmdline)]) ssh_cmd.extend([
str(user_at_host),
'cd ~/performance_workspace/grpc/ && python tools/run_tests/start_port_server.py && %s'
% ' '.join(cmdline)
])
cmdline = ssh_cmd cmdline = ssh_cmd
jobspec = jobset.JobSpec( jobspec = jobset.JobSpec(
@ -98,21 +105,28 @@ def create_qpsworker_job(language, shortname=None, port=10000, remote_host=None,
return QpsWorkerJob(jobspec, language, host_and_port, perf_file_base_name) return QpsWorkerJob(jobspec, language, host_and_port, perf_file_base_name)
def create_scenario_jobspec(scenario_json, workers, remote_host=None, def create_scenario_jobspec(scenario_json,
bq_result_table=None, server_cpu_load=0): workers,
remote_host=None,
bq_result_table=None,
server_cpu_load=0):
"""Runs one scenario using QPS driver.""" """Runs one scenario using QPS driver."""
# setting QPS_WORKERS env variable here makes sure it works with SSH too. # setting QPS_WORKERS env variable here makes sure it works with SSH too.
cmd = 'QPS_WORKERS="%s" ' % ','.join(workers) cmd = 'QPS_WORKERS="%s" ' % ','.join(workers)
if bq_result_table: if bq_result_table:
cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table
cmd += 'tools/run_tests/performance/run_qps_driver.sh ' cmd += 'tools/run_tests/performance/run_qps_driver.sh '
cmd += '--scenarios_json=%s ' % pipes.quote(json.dumps({'scenarios': [scenario_json]})) cmd += '--scenarios_json=%s ' % pipes.quote(
json.dumps({
'scenarios': [scenario_json]
}))
cmd += '--scenario_result_file=scenario_result.json ' cmd += '--scenario_result_file=scenario_result.json '
if server_cpu_load != 0: if server_cpu_load != 0:
cmd += '--search_param=offered_load --initial_search_value=1000 --targeted_cpu_load=%d --stride=500 --error_tolerance=0.01' % server_cpu_load cmd += '--search_param=offered_load --initial_search_value=1000 --targeted_cpu_load=%d --stride=500 --error_tolerance=0.01' % server_cpu_load
if remote_host: if remote_host:
user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host) user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)
cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (user_at_host, pipes.quote(cmd)) cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (
user_at_host, pipes.quote(cmd))
return jobset.JobSpec( return jobset.JobSpec(
cmdline=[cmd], cmdline=[cmd],
@ -125,10 +139,12 @@ def create_scenario_jobspec(scenario_json, workers, remote_host=None,
def create_quit_jobspec(workers, remote_host=None): def create_quit_jobspec(workers, remote_host=None):
"""Runs quit using QPS driver.""" """Runs quit using QPS driver."""
# setting QPS_WORKERS env variable here makes sure it works with SSH too. # setting QPS_WORKERS env variable here makes sure it works with SSH too.
cmd = 'QPS_WORKERS="%s" bins/opt/qps_json_driver --quit' % ','.join(w.host_and_port for w in workers) cmd = 'QPS_WORKERS="%s" bins/opt/qps_json_driver --quit' % ','.join(
w.host_and_port for w in workers)
if remote_host: if remote_host:
user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host) user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)
cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (user_at_host, pipes.quote(cmd)) cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (
user_at_host, pipes.quote(cmd))
return jobset.JobSpec( return jobset.JobSpec(
cmdline=[cmd], cmdline=[cmd],
@ -138,7 +154,8 @@ def create_quit_jobspec(workers, remote_host=None):
verbose_success=True) verbose_success=True)
def create_netperf_jobspec(server_host='localhost', client_host=None, def create_netperf_jobspec(server_host='localhost',
client_host=None,
bq_result_table=None): bq_result_table=None):
"""Runs netperf benchmark.""" """Runs netperf benchmark."""
cmd = 'NETPERF_SERVER_HOST="%s" ' % server_host cmd = 'NETPERF_SERVER_HOST="%s" ' % server_host
@ -158,7 +175,8 @@ def create_netperf_jobspec(server_host='localhost', client_host=None,
cmd += 'tools/run_tests/performance/run_netperf.sh' cmd += 'tools/run_tests/performance/run_netperf.sh'
if client_host: if client_host:
user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, client_host) user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, client_host)
cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (user_at_host, pipes.quote(cmd)) cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (
user_at_host, pipes.quote(cmd))
return jobset.JobSpec( return jobset.JobSpec(
cmdline=[cmd], cmdline=[cmd],
@ -177,20 +195,19 @@ def archive_repo(languages):
cmdline.append('../grpc-go') cmdline.append('../grpc-go')
archive_job = jobset.JobSpec( archive_job = jobset.JobSpec(
cmdline=cmdline, cmdline=cmdline, shortname='archive_repo', timeout_seconds=3 * 60)
shortname='archive_repo',
timeout_seconds=3*60)
jobset.message('START', 'Archiving local repository.', do_newline=True) jobset.message('START', 'Archiving local repository.', do_newline=True)
num_failures, _ = jobset.run( num_failures, _ = jobset.run(
[archive_job], newline_on_success=True, maxjobs=1) [archive_job], newline_on_success=True, maxjobs=1)
if num_failures == 0: if num_failures == 0:
jobset.message('SUCCESS', jobset.message(
'SUCCESS',
'Archive with local repository created successfully.', 'Archive with local repository created successfully.',
do_newline=True) do_newline=True)
else: else:
jobset.message('FAILED', 'Failed to archive local repository.', jobset.message(
do_newline=True) 'FAILED', 'Failed to archive local repository.', do_newline=True)
sys.exit(1) sys.exit(1)
@ -217,16 +234,17 @@ def prepare_remote_hosts(hosts, prepare_local=False):
num_failures, _ = jobset.run( num_failures, _ = jobset.run(
prepare_jobs, newline_on_success=True, maxjobs=10) prepare_jobs, newline_on_success=True, maxjobs=10)
if num_failures == 0: if num_failures == 0:
jobset.message('SUCCESS', jobset.message(
'Prepare step completed successfully.', 'SUCCESS', 'Prepare step completed successfully.', do_newline=True)
do_newline=True)
else: else:
jobset.message('FAILED', 'Failed to prepare remote hosts.', jobset.message(
do_newline=True) 'FAILED', 'Failed to prepare remote hosts.', do_newline=True)
sys.exit(1) sys.exit(1)
def build_on_remote_hosts(hosts, languages=scenario_config.LANGUAGES.keys(), build_local=False): def build_on_remote_hosts(hosts,
languages=scenario_config.LANGUAGES.keys(),
build_local=False):
"""Builds performance worker on remote hosts (and maybe also locally).""" """Builds performance worker on remote hosts (and maybe also locally)."""
build_timeout = 15 * 60 build_timeout = 15 * 60
# Kokoro VMs (which are local only) do not have caching, so they need more time to build # Kokoro VMs (which are local only) do not have caching, so they need more time to build
@ -236,15 +254,18 @@ def build_on_remote_hosts(hosts, languages=scenario_config.LANGUAGES.keys(), bui
user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, host) user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, host)
build_jobs.append( build_jobs.append(
jobset.JobSpec( jobset.JobSpec(
cmdline=['tools/run_tests/performance/remote_host_build.sh'] + languages, cmdline=['tools/run_tests/performance/remote_host_build.sh'] +
languages,
shortname='remote_host_build.%s' % host, shortname='remote_host_build.%s' % host,
environ = {'USER_AT_HOST': user_at_host, 'CONFIG': 'opt'}, environ={'USER_AT_HOST': user_at_host,
'CONFIG': 'opt'},
timeout_seconds=build_timeout)) timeout_seconds=build_timeout))
if build_local: if build_local:
# Build locally as well # Build locally as well
build_jobs.append( build_jobs.append(
jobset.JobSpec( jobset.JobSpec(
cmdline=['tools/run_tests/performance/build_performance.sh'] + languages, cmdline=['tools/run_tests/performance/build_performance.sh'] +
languages,
shortname='local_build', shortname='local_build',
environ={'CONFIG': 'opt'}, environ={'CONFIG': 'opt'},
timeout_seconds=local_build_timeout)) timeout_seconds=local_build_timeout))
@ -252,12 +273,9 @@ def build_on_remote_hosts(hosts, languages=scenario_config.LANGUAGES.keys(), bui
num_failures, _ = jobset.run( num_failures, _ = jobset.run(
build_jobs, newline_on_success=True, maxjobs=10) build_jobs, newline_on_success=True, maxjobs=10)
if num_failures == 0: if num_failures == 0:
jobset.message('SUCCESS', jobset.message('SUCCESS', 'Built successfully.', do_newline=True)
'Built successfully.',
do_newline=True)
else: else:
jobset.message('FAILED', 'Build failed.', jobset.message('FAILED', 'Build failed.', do_newline=True)
do_newline=True)
sys.exit(1) sys.exit(1)
@ -273,30 +291,31 @@ def create_qpsworkers(languages, worker_hosts, perf_cmd=None):
# run one worker per each remote host (for each language) # run one worker per each remote host (for each language)
workers = [(worker_host, 10000) for worker_host in worker_hosts] workers = [(worker_host, 10000) for worker_host in worker_hosts]
return [create_qpsworker_job(language, return [
shortname= 'qps_worker_%s_%s' % (language, create_qpsworker_job(
worker_idx), language,
shortname='qps_worker_%s_%s' % (language, worker_idx),
port=worker[1] + language.worker_port_offset(), port=worker[1] + language.worker_port_offset(),
remote_host=worker[0], remote_host=worker[0],
perf_cmd=perf_cmd) perf_cmd=perf_cmd)
for language in languages for language in languages for worker_idx, worker in enumerate(workers)
for worker_idx, worker in enumerate(workers)] ]
def perf_report_processor_job(worker_host, perf_base_name, output_filename, flame_graph_reports): def perf_report_processor_job(worker_host, perf_base_name, output_filename,
flame_graph_reports):
print('Creating perf report collection job for %s' % worker_host) print('Creating perf report collection job for %s' % worker_host)
cmd = '' cmd = ''
if worker_host != 'localhost': if worker_host != 'localhost':
user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, worker_host) user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, worker_host)
cmd = "USER_AT_HOST=%s OUTPUT_FILENAME=%s OUTPUT_DIR=%s PERF_BASE_NAME=%s\ cmd = "USER_AT_HOST=%s OUTPUT_FILENAME=%s OUTPUT_DIR=%s PERF_BASE_NAME=%stools/run_tests/performance/process_remote_perf_flamegraphs.sh" % (
tools/run_tests/performance/process_remote_perf_flamegraphs.sh" \ user_at_host, output_filename, flame_graph_reports, perf_base_name)
% (user_at_host, output_filename, flame_graph_reports, perf_base_name)
else: else:
cmd = "OUTPUT_FILENAME=%s OUTPUT_DIR=%s PERF_BASE_NAME=%s\ cmd = "OUTPUT_FILENAME=%s OUTPUT_DIR=%s PERF_BASE_NAME=%stools/run_tests/performance/process_local_perf_flamegraphs.sh" % (
tools/run_tests/performance/process_local_perf_flamegraphs.sh" \ output_filename, flame_graph_reports, perf_base_name)
% (output_filename, flame_graph_reports, perf_base_name)
return jobset.JobSpec(cmdline=cmd, return jobset.JobSpec(
cmdline=cmd,
timeout_seconds=3 * 60, timeout_seconds=3 * 60,
shell=True, shell=True,
verbose_success=True, verbose_success=True,
@ -306,13 +325,19 @@ def perf_report_processor_job(worker_host, perf_base_name, output_filename, flam
Scenario = collections.namedtuple('Scenario', 'jobspec workers name') Scenario = collections.namedtuple('Scenario', 'jobspec workers name')
def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*', def create_scenarios(languages,
category='all', bq_result_table=None, workers_by_lang,
netperf=False, netperf_hosts=[], server_cpu_load=0): remote_host=None,
regex='.*',
category='all',
bq_result_table=None,
netperf=False,
netperf_hosts=[],
server_cpu_load=0):
"""Create jobspecs for scenarios to run.""" """Create jobspecs for scenarios to run."""
all_workers = [worker all_workers = [
for workers in workers_by_lang.values() worker for workers in workers_by_lang.values() for worker in workers
for worker in workers] ]
scenarios = [] scenarios = []
_NO_WORKERS = [] _NO_WORKERS = []
@ -326,52 +351,66 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',
else: else:
netperf_server = netperf_hosts[0] netperf_server = netperf_hosts[0]
netperf_client = netperf_hosts[1] netperf_client = netperf_hosts[1]
scenarios.append(Scenario( scenarios.append(
create_netperf_jobspec(server_host=netperf_server, Scenario(
create_netperf_jobspec(
server_host=netperf_server,
client_host=netperf_client, client_host=netperf_client,
bq_result_table=bq_result_table), bq_result_table=bq_result_table), _NO_WORKERS, 'netperf'))
_NO_WORKERS, 'netperf'))
for language in languages: for language in languages:
for scenario_json in language.scenarios(): for scenario_json in language.scenarios():
if re.search(regex, scenario_json['name']): if re.search(regex, scenario_json['name']):
categories = scenario_json.get('CATEGORIES', ['scalable', 'smoketest']) categories = scenario_json.get('CATEGORIES',
['scalable', 'smoketest'])
if category in categories or category == 'all': if category in categories or category == 'all':
workers = workers_by_lang[str(language)][:] workers = workers_by_lang[str(language)][:]
# 'SERVER_LANGUAGE' is an indicator for this script to pick # 'SERVER_LANGUAGE' is an indicator for this script to pick
# a server in different language. # a server in different language.
custom_server_lang = scenario_json.get('SERVER_LANGUAGE', None) custom_server_lang = scenario_json.get('SERVER_LANGUAGE',
custom_client_lang = scenario_json.get('CLIENT_LANGUAGE', None) None)
scenario_json = scenario_config.remove_nonproto_fields(scenario_json) custom_client_lang = scenario_json.get('CLIENT_LANGUAGE',
None)
scenario_json = scenario_config.remove_nonproto_fields(
scenario_json)
if custom_server_lang and custom_client_lang: if custom_server_lang and custom_client_lang:
raise Exception('Cannot set both custom CLIENT_LANGUAGE and SERVER_LANGUAGE' raise Exception(
'Cannot set both custom CLIENT_LANGUAGE and SERVER_LANGUAGE'
'in the same scenario') 'in the same scenario')
if custom_server_lang: if custom_server_lang:
if not workers_by_lang.get(custom_server_lang, []): if not workers_by_lang.get(custom_server_lang, []):
print('Warning: Skipping scenario %s as' % scenario_json['name']) print('Warning: Skipping scenario %s as' %
print('SERVER_LANGUAGE is set to %s yet the language has ' scenario_json['name'])
'not been selected with -l' % custom_server_lang) print(
'SERVER_LANGUAGE is set to %s yet the language has '
'not been selected with -l' %
custom_server_lang)
continue continue
for idx in range(0, scenario_json['num_servers']): for idx in range(0, scenario_json['num_servers']):
# replace first X workers by workers of a different language # replace first X workers by workers of a different language
workers[idx] = workers_by_lang[custom_server_lang][idx] workers[idx] = workers_by_lang[custom_server_lang][
idx]
if custom_client_lang: if custom_client_lang:
if not workers_by_lang.get(custom_client_lang, []): if not workers_by_lang.get(custom_client_lang, []):
print('Warning: Skipping scenario %s as' % scenario_json['name']) print('Warning: Skipping scenario %s as' %
print('CLIENT_LANGUAGE is set to %s yet the language has ' scenario_json['name'])
'not been selected with -l' % custom_client_lang) print(
'CLIENT_LANGUAGE is set to %s yet the language has '
'not been selected with -l' %
custom_client_lang)
continue continue
for idx in range(scenario_json['num_servers'], len(workers)): for idx in range(scenario_json['num_servers'],
len(workers)):
# replace all client workers by workers of a different language, # replace all client workers by workers of a different language,
# leave num_server workers as they are server workers. # leave num_server workers as they are server workers.
workers[idx] = workers_by_lang[custom_client_lang][idx] workers[idx] = workers_by_lang[custom_client_lang][
idx]
scenario = Scenario( scenario = Scenario(
create_scenario_jobspec(scenario_json, create_scenario_jobspec(
[w.host_and_port for w in workers], scenario_json, [w.host_and_port for w in workers],
remote_host=remote_host, remote_host=remote_host,
bq_result_table=bq_result_table, bq_result_table=bq_result_table,
server_cpu_load=server_cpu_load), server_cpu_load=server_cpu_load), workers,
workers,
scenario_json['name']) scenario_json['name'])
scenarios.append(scenario) scenarios.append(scenario)
@ -396,15 +435,18 @@ def finish_qps_workers(jobs, qpsworker_jobs):
print('All QPS workers finished.') print('All QPS workers finished.')
return num_killed return num_killed
profile_output_files = [] profile_output_files = []
# Collect perf text reports and flamegraphs if perf_cmd was used # Collect perf text reports and flamegraphs if perf_cmd was used
# Note the base names of perf text reports are used when creating and processing # Note the base names of perf text reports are used when creating and processing
# perf data. The scenario name uniqifies the output name in the final # perf data. The scenario name uniqifies the output name in the final
# perf reports directory. # perf reports directory.
# Alos, the perf profiles need to be fetched and processed after each scenario # Alos, the perf profiles need to be fetched and processed after each scenario
# in order to avoid clobbering the output files. # in order to avoid clobbering the output files.
def run_collect_perf_profile_jobs(hosts_and_base_names, scenario_name, flame_graph_reports): def run_collect_perf_profile_jobs(hosts_and_base_names, scenario_name,
flame_graph_reports):
perf_report_jobs = [] perf_report_jobs = []
global profile_output_files global profile_output_files
for host_and_port in hosts_and_base_names: for host_and_port in hosts_and_base_names:
@ -413,51 +455,80 @@ def run_collect_perf_profile_jobs(hosts_and_base_names, scenario_name, flame_gra
# from the base filename, create .svg output filename # from the base filename, create .svg output filename
host = host_and_port.split(':')[0] host = host_and_port.split(':')[0]
profile_output_files.append('%s.svg' % output_filename) profile_output_files.append('%s.svg' % output_filename)
perf_report_jobs.append(perf_report_processor_job(host, perf_base_name, output_filename, flame_graph_reports)) perf_report_jobs.append(
perf_report_processor_job(host, perf_base_name, output_filename,
jobset.message('START', 'Collecting perf reports from qps workers', do_newline=True) flame_graph_reports))
failures, _ = jobset.run(perf_report_jobs, newline_on_success=True, maxjobs=1)
jobset.message('END', 'Collecting perf reports from qps workers', do_newline=True) jobset.message(
'START', 'Collecting perf reports from qps workers', do_newline=True)
failures, _ = jobset.run(
perf_report_jobs, newline_on_success=True, maxjobs=1)
jobset.message(
'END', 'Collecting perf reports from qps workers', do_newline=True)
return failures return failures
def main(): def main():
argp = argparse.ArgumentParser(description='Run performance tests.') argp = argparse.ArgumentParser(description='Run performance tests.')
argp.add_argument('-l', '--language', argp.add_argument(
'-l',
'--language',
choices=['all'] + sorted(scenario_config.LANGUAGES.keys()), choices=['all'] + sorted(scenario_config.LANGUAGES.keys()),
nargs='+', nargs='+',
required=True, required=True,
help='Languages to benchmark.') help='Languages to benchmark.')
argp.add_argument('--remote_driver_host', argp.add_argument(
'--remote_driver_host',
default=None, default=None,
help='Run QPS driver on given host. By default, QPS driver is run locally.') help='Run QPS driver on given host. By default, QPS driver is run locally.'
argp.add_argument('--remote_worker_host', )
argp.add_argument(
'--remote_worker_host',
nargs='+', nargs='+',
default=[], default=[],
help='Worker hosts where to start QPS workers.') help='Worker hosts where to start QPS workers.')
argp.add_argument('--dry_run', argp.add_argument(
'--dry_run',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help='Just list scenarios to be run, but don\'t run them.') help='Just list scenarios to be run, but don\'t run them.')
argp.add_argument('-r', '--regex', default='.*', type=str, argp.add_argument(
'-r',
'--regex',
default='.*',
type=str,
help='Regex to select scenarios to run.') help='Regex to select scenarios to run.')
argp.add_argument('--bq_result_table', default=None, type=str, argp.add_argument(
'--bq_result_table',
default=None,
type=str,
help='Bigquery "dataset.table" to upload results to.') help='Bigquery "dataset.table" to upload results to.')
argp.add_argument('--category', argp.add_argument(
'--category',
choices=['smoketest', 'all', 'scalable', 'sweep'], choices=['smoketest', 'all', 'scalable', 'sweep'],
default='all', default='all',
help='Select a category of tests to run.') help='Select a category of tests to run.')
argp.add_argument('--netperf', argp.add_argument(
'--netperf',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help='Run netperf benchmark as one of the scenarios.') help='Run netperf benchmark as one of the scenarios.')
argp.add_argument('--server_cpu_load', argp.add_argument(
default=0, type=int, '--server_cpu_load',
help='Select a targeted server cpu load to run. 0 means ignore this flag') default=0,
argp.add_argument('-x', '--xml_report', default='report.xml', type=str, type=int,
help='Select a targeted server cpu load to run. 0 means ignore this flag'
)
argp.add_argument(
'-x',
'--xml_report',
default='report.xml',
type=str,
help='Name of XML report file to generate.') help='Name of XML report file to generate.')
argp.add_argument('--perf_args', argp.add_argument(
'--perf_args',
help=('Example usage: "--perf_args=record -F 99 -g". ' help=('Example usage: "--perf_args=record -F 99 -g". '
'Wrap QPS workers in a perf command ' 'Wrap QPS workers in a perf command '
'with the arguments to perf specified here. ' 'with the arguments to perf specified here. '
@ -476,16 +547,26 @@ def main():
'not be used at all. ' 'not be used at all. '
'See http://www.brendangregg.com/perf.html ' 'See http://www.brendangregg.com/perf.html '
'for more general perf examples.')) 'for more general perf examples.'))
argp.add_argument('--skip_generate_flamegraphs', argp.add_argument(
'--skip_generate_flamegraphs',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help=('Turn flame graph generation off. ' help=('Turn flame graph generation off. '
'May be useful if "perf_args" arguments do not make sense for ' 'May be useful if "perf_args" arguments do not make sense for '
'generating flamegraphs (e.g., "--perf_args=stat ...")')) 'generating flamegraphs (e.g., "--perf_args=stat ...")'))
argp.add_argument('-f', '--flame_graph_reports', default='perf_reports', type=str, argp.add_argument(
help='Name of directory to output flame graph profiles to, if any are created.') '-f',
argp.add_argument('-u', '--remote_host_username', default='', type=str, '--flame_graph_reports',
default='perf_reports',
type=str,
help='Name of directory to output flame graph profiles to, if any are created.'
)
argp.add_argument(
'-u',
'--remote_host_username',
default='',
type=str,
help='Use a username that isn\'t "Jenkins" to SSH into remote workers.') help='Use a username that isn\'t "Jenkins" to SSH into remote workers.')
args = argp.parse_args() args = argp.parse_args()
@ -494,11 +575,11 @@ def main():
if args.remote_host_username: if args.remote_host_username:
_REMOTE_HOST_USERNAME = args.remote_host_username _REMOTE_HOST_USERNAME = args.remote_host_username
languages = set(scenario_config.LANGUAGES[l] languages = set(
scenario_config.LANGUAGES[l]
for l in itertools.chain.from_iterable( for l in itertools.chain.from_iterable(
six.iterkeys(scenario_config.LANGUAGES) if x == 'all' six.iterkeys(scenario_config.LANGUAGES) if x == 'all' else [x]
else [x] for x in args.language)) for x in args.language))
# Put together set of remote hosts where to run and build # Put together set of remote hosts where to run and build
remote_hosts = set() remote_hosts = set()
@ -519,7 +600,10 @@ def main():
if not args.remote_driver_host: if not args.remote_driver_host:
build_local = True build_local = True
if not args.dry_run: if not args.dry_run:
build_on_remote_hosts(remote_hosts, languages=[str(l) for l in languages], build_local=build_local) build_on_remote_hosts(
remote_hosts,
languages=[str(l) for l in languages],
build_local=build_local)
perf_cmd = None perf_cmd = None
if args.perf_args: if args.perf_args:
@ -528,14 +612,16 @@ def main():
perf_cmd = ['/usr/bin/perf'] perf_cmd = ['/usr/bin/perf']
perf_cmd.extend(re.split('\s+', args.perf_args)) perf_cmd.extend(re.split('\s+', args.perf_args))
qpsworker_jobs = create_qpsworkers(languages, args.remote_worker_host, perf_cmd=perf_cmd) qpsworker_jobs = create_qpsworkers(
languages, args.remote_worker_host, perf_cmd=perf_cmd)
# get list of worker addresses for each language. # get list of worker addresses for each language.
workers_by_lang = dict([(str(language), []) for language in languages]) workers_by_lang = dict([(str(language), []) for language in languages])
for job in qpsworker_jobs: for job in qpsworker_jobs:
workers_by_lang[str(job.language)].append(job) workers_by_lang[str(job.language)].append(job)
scenarios = create_scenarios(languages, scenarios = create_scenarios(
languages,
workers_by_lang=workers_by_lang, workers_by_lang=workers_by_lang,
remote_host=args.remote_driver_host, remote_host=args.remote_driver_host,
regex=args.regex, regex=args.regex,
@ -563,40 +649,53 @@ def main():
worker.start() worker.start()
jobs = [scenario.jobspec] jobs = [scenario.jobspec]
if scenario.workers: if scenario.workers:
jobs.append(create_quit_jobspec(scenario.workers, remote_host=args.remote_driver_host)) jobs.append(
scenario_failures, resultset = jobset.run(jobs, newline_on_success=True, maxjobs=1) create_quit_jobspec(
scenario.workers,
remote_host=args.remote_driver_host))
scenario_failures, resultset = jobset.run(
jobs, newline_on_success=True, maxjobs=1)
total_scenario_failures += scenario_failures total_scenario_failures += scenario_failures
merged_resultset = dict(itertools.chain(six.iteritems(merged_resultset), merged_resultset = dict(
itertools.chain(
six.iteritems(merged_resultset),
six.iteritems(resultset))) six.iteritems(resultset)))
finally: finally:
# Consider qps workers that need to be killed as failures # Consider qps workers that need to be killed as failures
qps_workers_killed += finish_qps_workers(scenario.workers, qpsworker_jobs) qps_workers_killed += finish_qps_workers(scenario.workers,
qpsworker_jobs)
if perf_cmd and scenario_failures == 0 and not args.skip_generate_flamegraphs: if perf_cmd and scenario_failures == 0 and not args.skip_generate_flamegraphs:
workers_and_base_names = {} workers_and_base_names = {}
for worker in scenario.workers: for worker in scenario.workers:
if not worker.perf_file_base_name: if not worker.perf_file_base_name:
raise Exception('using perf buf perf report filename is unspecified') raise Exception(
workers_and_base_names[worker.host_and_port] = worker.perf_file_base_name 'using perf buf perf report filename is unspecified')
perf_report_failures += run_collect_perf_profile_jobs(workers_and_base_names, scenario.name, args.flame_graph_reports) workers_and_base_names[
worker.host_and_port] = worker.perf_file_base_name
perf_report_failures += run_collect_perf_profile_jobs(
workers_and_base_names, scenario.name,
args.flame_graph_reports)
# Still write the index.html even if some scenarios failed. # Still write the index.html even if some scenarios failed.
# 'profile_output_files' will only have names for scenarios that passed # 'profile_output_files' will only have names for scenarios that passed
if perf_cmd and not args.skip_generate_flamegraphs: if perf_cmd and not args.skip_generate_flamegraphs:
# write the index fil to the output dir, with all profiles from all scenarios/workers # write the index fil to the output dir, with all profiles from all scenarios/workers
report_utils.render_perf_profiling_results('%s/index.html' % args.flame_graph_reports, profile_output_files) report_utils.render_perf_profiling_results(
'%s/index.html' % args.flame_graph_reports, profile_output_files)
report_utils.render_junit_xml_report(merged_resultset, args.xml_report, report_utils.render_junit_xml_report(
suite_name='benchmarks') merged_resultset, args.xml_report, suite_name='benchmarks')
if total_scenario_failures > 0 or qps_workers_killed > 0: if total_scenario_failures > 0 or qps_workers_killed > 0:
print('%s scenarios failed and %s qps worker jobs killed' % (total_scenario_failures, qps_workers_killed)) print('%s scenarios failed and %s qps worker jobs killed' %
(total_scenario_failures, qps_workers_killed))
sys.exit(1) sys.exit(1)
if perf_report_failures > 0: if perf_report_failures > 0:
print('%s perf profile collection jobs failed' % perf_report_failures) print('%s perf profile collection jobs failed' % perf_report_failures)
sys.exit(1) sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

File diff suppressed because it is too large Load Diff

@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""Run test matrix.""" """Run test matrix."""
from __future__ import print_function from __future__ import print_function
@ -55,27 +54,31 @@ def _report_filename_internal_ci(name):
return '%s/%s' % (name, _REPORT_SUFFIX) return '%s/%s' % (name, _REPORT_SUFFIX)
def _docker_jobspec(name, runtests_args=[], runtests_envs={}, def _docker_jobspec(name,
runtests_args=[],
runtests_envs={},
inner_jobs=_DEFAULT_INNER_JOBS, inner_jobs=_DEFAULT_INNER_JOBS,
timeout_seconds=None): timeout_seconds=None):
"""Run a single instance of run_tests.py in a docker container""" """Run a single instance of run_tests.py in a docker container"""
if not timeout_seconds: if not timeout_seconds:
timeout_seconds = _DEFAULT_RUNTESTS_TIMEOUT timeout_seconds = _DEFAULT_RUNTESTS_TIMEOUT
test_job = jobset.JobSpec( test_job = jobset.JobSpec(
cmdline=['python', 'tools/run_tests/run_tests.py', cmdline=[
'--use_docker', 'python', 'tools/run_tests/run_tests.py', '--use_docker', '-t',
'-t', '-j', str(inner_jobs), '-x', _report_filename(name),
'-j', str(inner_jobs), '--report_suite_name', '%s' % name
'-x', _report_filename(name), ] + runtests_args,
'--report_suite_name', '%s' % name] + runtests_args,
environ=runtests_envs, environ=runtests_envs,
shortname='run_tests_%s' % name, shortname='run_tests_%s' % name,
timeout_seconds=timeout_seconds) timeout_seconds=timeout_seconds)
return test_job return test_job
def _workspace_jobspec(name, runtests_args=[], workspace_name=None, def _workspace_jobspec(name,
runtests_envs={}, inner_jobs=_DEFAULT_INNER_JOBS, runtests_args=[],
workspace_name=None,
runtests_envs={},
inner_jobs=_DEFAULT_INNER_JOBS,
timeout_seconds=None): timeout_seconds=None):
"""Run a single instance of run_tests.py in a separate workspace""" """Run a single instance of run_tests.py in a separate workspace"""
if not workspace_name: if not workspace_name:
@ -85,35 +88,41 @@ def _workspace_jobspec(name, runtests_args=[], workspace_name=None,
env = {'WORKSPACE_NAME': workspace_name} env = {'WORKSPACE_NAME': workspace_name}
env.update(runtests_envs) env.update(runtests_envs)
test_job = jobset.JobSpec( test_job = jobset.JobSpec(
cmdline=['bash', cmdline=[
'tools/run_tests/helper_scripts/run_tests_in_workspace.sh', 'bash', 'tools/run_tests/helper_scripts/run_tests_in_workspace.sh',
'-t', '-t', '-j', str(inner_jobs), '-x', '../%s' % _report_filename(name),
'-j', str(inner_jobs), '--report_suite_name', '%s' % name
'-x', '../%s' % _report_filename(name), ] + runtests_args,
'--report_suite_name', '%s' % name] + runtests_args,
environ=env, environ=env,
shortname='run_tests_%s' % name, shortname='run_tests_%s' % name,
timeout_seconds=timeout_seconds) timeout_seconds=timeout_seconds)
return test_job return test_job
def _generate_jobs(languages, configs, platforms, iomgr_platform = 'native', def _generate_jobs(languages,
arch=None, compiler=None, configs,
labels=[], extra_args=[], extra_envs={}, platforms,
iomgr_platform='native',
arch=None,
compiler=None,
labels=[],
extra_args=[],
extra_envs={},
inner_jobs=_DEFAULT_INNER_JOBS, inner_jobs=_DEFAULT_INNER_JOBS,
timeout_seconds=None): timeout_seconds=None):
result = [] result = []
for language in languages: for language in languages:
for platform in platforms: for platform in platforms:
for config in configs: for config in configs:
name = '%s_%s_%s_%s' % (language, platform, config, iomgr_platform) name = '%s_%s_%s_%s' % (language, platform, config,
runtests_args = ['-l', language, iomgr_platform)
'-c', config, runtests_args = [
'--iomgr_platform', iomgr_platform] '-l', language, '-c', config, '--iomgr_platform',
iomgr_platform
]
if arch or compiler: if arch or compiler:
name += '_%s_%s' % (arch, compiler) name += '_%s_%s' % (arch, compiler)
runtests_args += ['--arch', arch, runtests_args += ['--arch', arch, '--compiler', compiler]
'--compiler', compiler]
if '--build_only' in extra_args: if '--build_only' in extra_args:
name += '_buildonly' name += '_buildonly'
for extra_env in extra_envs: for extra_env in extra_envs:
@ -121,15 +130,22 @@ def _generate_jobs(languages, configs, platforms, iomgr_platform = 'native',
runtests_args += extra_args runtests_args += extra_args
if platform == 'linux': if platform == 'linux':
job = _docker_jobspec(name=name, runtests_args=runtests_args, job = _docker_jobspec(
runtests_envs=extra_envs, inner_jobs=inner_jobs, name=name,
runtests_args=runtests_args,
runtests_envs=extra_envs,
inner_jobs=inner_jobs,
timeout_seconds=timeout_seconds) timeout_seconds=timeout_seconds)
else: else:
job = _workspace_jobspec(name=name, runtests_args=runtests_args, job = _workspace_jobspec(
runtests_envs=extra_envs, inner_jobs=inner_jobs, name=name,
runtests_args=runtests_args,
runtests_envs=extra_envs,
inner_jobs=inner_jobs,
timeout_seconds=timeout_seconds) timeout_seconds=timeout_seconds)
job.labels = [platform, config, language, iomgr_platform] + labels job.labels = [platform, config, language, iomgr_platform
] + labels
result.append(job) result.append(job)
return result return result
@ -137,7 +153,8 @@ def _generate_jobs(languages, configs, platforms, iomgr_platform = 'native',
def _create_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS): def _create_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS):
test_jobs = [] test_jobs = []
# supported on linux only # supported on linux only
test_jobs += _generate_jobs(languages=['sanity', 'php7'], test_jobs += _generate_jobs(
languages=['sanity', 'php7'],
configs=['dbg', 'opt'], configs=['dbg', 'opt'],
platforms=['linux'], platforms=['linux'],
labels=['basictests', 'multilang'], labels=['basictests', 'multilang'],
@ -145,7 +162,8 @@ def _create_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS):
inner_jobs=inner_jobs) inner_jobs=inner_jobs)
# supported on all platforms. # supported on all platforms.
test_jobs += _generate_jobs(languages=['c'], test_jobs += _generate_jobs(
languages=['c'],
configs=['dbg', 'opt'], configs=['dbg', 'opt'],
platforms=['linux', 'macos', 'windows'], platforms=['linux', 'macos', 'windows'],
labels=['basictests', 'corelang'], labels=['basictests', 'corelang'],
@ -153,7 +171,8 @@ def _create_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS):
inner_jobs=inner_jobs, inner_jobs=inner_jobs,
timeout_seconds=_CPP_RUNTESTS_TIMEOUT) timeout_seconds=_CPP_RUNTESTS_TIMEOUT)
test_jobs += _generate_jobs(languages=['csharp', 'python'], test_jobs += _generate_jobs(
languages=['csharp', 'python'],
configs=['dbg', 'opt'], configs=['dbg', 'opt'],
platforms=['linux', 'macos', 'windows'], platforms=['linux', 'macos', 'windows'],
labels=['basictests', 'multilang'], labels=['basictests', 'multilang'],
@ -161,7 +180,8 @@ def _create_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS):
inner_jobs=inner_jobs) inner_jobs=inner_jobs)
# supported on linux and mac. # supported on linux and mac.
test_jobs += _generate_jobs(languages=['c++'], test_jobs += _generate_jobs(
languages=['c++'],
configs=['dbg', 'opt'], configs=['dbg', 'opt'],
platforms=['linux', 'macos'], platforms=['linux', 'macos'],
labels=['basictests', 'corelang'], labels=['basictests', 'corelang'],
@ -169,7 +189,8 @@ def _create_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS):
inner_jobs=inner_jobs, inner_jobs=inner_jobs,
timeout_seconds=_CPP_RUNTESTS_TIMEOUT) timeout_seconds=_CPP_RUNTESTS_TIMEOUT)
test_jobs += _generate_jobs(languages=['grpc-node', 'ruby', 'php'], test_jobs += _generate_jobs(
languages=['grpc-node', 'ruby', 'php'],
configs=['dbg', 'opt'], configs=['dbg', 'opt'],
platforms=['linux', 'macos'], platforms=['linux', 'macos'],
labels=['basictests', 'multilang'], labels=['basictests', 'multilang'],
@ -177,7 +198,8 @@ def _create_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS):
inner_jobs=inner_jobs) inner_jobs=inner_jobs)
# supported on mac only. # supported on mac only.
test_jobs += _generate_jobs(languages=['objc'], test_jobs += _generate_jobs(
languages=['objc'],
configs=['dbg', 'opt'], configs=['dbg', 'opt'],
platforms=['macos'], platforms=['macos'],
labels=['basictests', 'multilang'], labels=['basictests', 'multilang'],
@ -185,21 +207,24 @@ def _create_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS):
inner_jobs=inner_jobs) inner_jobs=inner_jobs)
# sanitizers # sanitizers
test_jobs += _generate_jobs(languages=['c'], test_jobs += _generate_jobs(
languages=['c'],
configs=['msan', 'asan', 'tsan', 'ubsan'], configs=['msan', 'asan', 'tsan', 'ubsan'],
platforms=['linux'], platforms=['linux'],
labels=['sanitizers', 'corelang'], labels=['sanitizers', 'corelang'],
extra_args=extra_args, extra_args=extra_args,
inner_jobs=inner_jobs, inner_jobs=inner_jobs,
timeout_seconds=_CPP_RUNTESTS_TIMEOUT) timeout_seconds=_CPP_RUNTESTS_TIMEOUT)
test_jobs += _generate_jobs(languages=['c++'], test_jobs += _generate_jobs(
languages=['c++'],
configs=['asan'], configs=['asan'],
platforms=['linux'], platforms=['linux'],
labels=['sanitizers', 'corelang'], labels=['sanitizers', 'corelang'],
extra_args=extra_args, extra_args=extra_args,
inner_jobs=inner_jobs, inner_jobs=inner_jobs,
timeout_seconds=_CPP_RUNTESTS_TIMEOUT) timeout_seconds=_CPP_RUNTESTS_TIMEOUT)
test_jobs += _generate_jobs(languages=['c++'], test_jobs += _generate_jobs(
languages=['c++'],
configs=['tsan'], configs=['tsan'],
platforms=['linux'], platforms=['linux'],
labels=['sanitizers', 'corelang'], labels=['sanitizers', 'corelang'],
@ -210,10 +235,12 @@ def _create_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS):
return test_jobs return test_jobs
def _create_portability_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS): def _create_portability_test_jobs(extra_args=[],
inner_jobs=_DEFAULT_INNER_JOBS):
test_jobs = [] test_jobs = []
# portability C x86 # portability C x86
test_jobs += _generate_jobs(languages=['c'], test_jobs += _generate_jobs(
languages=['c'],
configs=['dbg'], configs=['dbg'],
platforms=['linux'], platforms=['linux'],
arch='x86', arch='x86',
@ -223,9 +250,11 @@ def _create_portability_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS)
inner_jobs=inner_jobs) inner_jobs=inner_jobs)
# portability C and C++ on x64 # portability C and C++ on x64
for compiler in ['gcc4.8', 'gcc5.3', 'gcc_musl', for compiler in [
'clang3.5', 'clang3.6', 'clang3.7']: 'gcc4.8', 'gcc5.3', 'gcc_musl', 'clang3.5', 'clang3.6', 'clang3.7'
test_jobs += _generate_jobs(languages=['c', 'c++'], ]:
test_jobs += _generate_jobs(
languages=['c', 'c++'],
configs=['dbg'], configs=['dbg'],
platforms=['linux'], platforms=['linux'],
arch='x64', arch='x64',
@ -236,7 +265,8 @@ def _create_portability_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS)
timeout_seconds=_CPP_RUNTESTS_TIMEOUT) timeout_seconds=_CPP_RUNTESTS_TIMEOUT)
# portability C on Windows 64-bit (x86 is the default) # portability C on Windows 64-bit (x86 is the default)
test_jobs += _generate_jobs(languages=['c'], test_jobs += _generate_jobs(
languages=['c'],
configs=['dbg'], configs=['dbg'],
platforms=['windows'], platforms=['windows'],
arch='x64', arch='x64',
@ -247,7 +277,8 @@ def _create_portability_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS)
# portability C++ on Windows # portability C++ on Windows
# TODO(jtattermusch): some of the tests are failing, so we force --build_only # TODO(jtattermusch): some of the tests are failing, so we force --build_only
test_jobs += _generate_jobs(languages=['c++'], test_jobs += _generate_jobs(
languages=['c++'],
configs=['dbg'], configs=['dbg'],
platforms=['windows'], platforms=['windows'],
arch='default', arch='default',
@ -258,7 +289,8 @@ def _create_portability_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS)
# portability C and C++ on Windows using VS2017 (build only) # portability C and C++ on Windows using VS2017 (build only)
# TODO(jtattermusch): some of the tests are failing, so we force --build_only # TODO(jtattermusch): some of the tests are failing, so we force --build_only
test_jobs += _generate_jobs(languages=['c', 'c++'], test_jobs += _generate_jobs(
languages=['c', 'c++'],
configs=['dbg'], configs=['dbg'],
platforms=['windows'], platforms=['windows'],
arch='x64', arch='x64',
@ -268,8 +300,10 @@ def _create_portability_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS)
inner_jobs=inner_jobs) inner_jobs=inner_jobs)
# C and C++ with the c-ares DNS resolver on Linux # C and C++ with the c-ares DNS resolver on Linux
test_jobs += _generate_jobs(languages=['c', 'c++'], test_jobs += _generate_jobs(
configs=['dbg'], platforms=['linux'], languages=['c', 'c++'],
configs=['dbg'],
platforms=['linux'],
labels=['portability', 'corelang'], labels=['portability', 'corelang'],
extra_args=extra_args, extra_args=extra_args,
extra_envs={'GRPC_DNS_RESOLVER': 'ares'}, extra_envs={'GRPC_DNS_RESOLVER': 'ares'},
@ -286,7 +320,8 @@ def _create_portability_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS)
# C and C++ build with cmake on Linux # C and C++ build with cmake on Linux
# TODO(jtattermusch): some of the tests are failing, so we force --build_only # TODO(jtattermusch): some of the tests are failing, so we force --build_only
# to make sure it's buildable at least. # to make sure it's buildable at least.
test_jobs += _generate_jobs(languages=['c', 'c++'], test_jobs += _generate_jobs(
languages=['c', 'c++'],
configs=['dbg'], configs=['dbg'],
platforms=['linux'], platforms=['linux'],
arch='default', arch='default',
@ -295,7 +330,8 @@ def _create_portability_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS)
extra_args=extra_args + ['--build_only'], extra_args=extra_args + ['--build_only'],
inner_jobs=inner_jobs) inner_jobs=inner_jobs)
test_jobs += _generate_jobs(languages=['python'], test_jobs += _generate_jobs(
languages=['python'],
configs=['dbg'], configs=['dbg'],
platforms=['linux'], platforms=['linux'],
arch='default', arch='default',
@ -304,7 +340,8 @@ def _create_portability_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS)
extra_args=extra_args, extra_args=extra_args,
inner_jobs=inner_jobs) inner_jobs=inner_jobs)
test_jobs += _generate_jobs(languages=['csharp'], test_jobs += _generate_jobs(
languages=['csharp'],
configs=['dbg'], configs=['dbg'],
platforms=['linux'], platforms=['linux'],
arch='default', arch='default',
@ -313,7 +350,8 @@ def _create_portability_test_jobs(extra_args=[], inner_jobs=_DEFAULT_INNER_JOBS)
extra_args=extra_args, extra_args=extra_args,
inner_jobs=inner_jobs) inner_jobs=inner_jobs)
test_jobs += _generate_jobs(languages=['c'], test_jobs += _generate_jobs(
languages=['c'],
configs=['dbg'], configs=['dbg'],
platforms=['linux'], platforms=['linux'],
iomgr_platform='uv', iomgr_platform='uv',
@ -346,59 +384,83 @@ def _runs_per_test_type(arg_str):
if __name__ == "__main__": if __name__ == "__main__":
argp = argparse.ArgumentParser(description='Run a matrix of run_tests.py tests.') argp = argparse.ArgumentParser(
argp.add_argument('-j', '--jobs', description='Run a matrix of run_tests.py tests.')
argp.add_argument(
'-j',
'--jobs',
default=multiprocessing.cpu_count() / _DEFAULT_INNER_JOBS, default=multiprocessing.cpu_count() / _DEFAULT_INNER_JOBS,
type=int, type=int,
help='Number of concurrent run_tests.py instances.') help='Number of concurrent run_tests.py instances.')
argp.add_argument('-f', '--filter', argp.add_argument(
'-f',
'--filter',
choices=_allowed_labels(), choices=_allowed_labels(),
nargs='+', nargs='+',
default=[], default=[],
help='Filter targets to run by label with AND semantics.') help='Filter targets to run by label with AND semantics.')
argp.add_argument('--exclude', argp.add_argument(
'--exclude',
choices=_allowed_labels(), choices=_allowed_labels(),
nargs='+', nargs='+',
default=[], default=[],
help='Exclude targets with any of given labels.') help='Exclude targets with any of given labels.')
argp.add_argument('--build_only', argp.add_argument(
'--build_only',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help='Pass --build_only flag to run_tests.py instances.') help='Pass --build_only flag to run_tests.py instances.')
argp.add_argument('--force_default_poller', default=False, action='store_const', const=True, argp.add_argument(
'--force_default_poller',
default=False,
action='store_const',
const=True,
help='Pass --force_default_poller to run_tests.py instances.') help='Pass --force_default_poller to run_tests.py instances.')
argp.add_argument('--dry_run', argp.add_argument(
'--dry_run',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help='Only print what would be run.') help='Only print what would be run.')
argp.add_argument('--filter_pr_tests', argp.add_argument(
'--filter_pr_tests',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help='Filters out tests irrelevant to pull request changes.') help='Filters out tests irrelevant to pull request changes.')
argp.add_argument('--base_branch', argp.add_argument(
'--base_branch',
default='origin/master', default='origin/master',
type=str, type=str,
help='Branch that pull request is requesting to merge into') help='Branch that pull request is requesting to merge into')
argp.add_argument('--inner_jobs', argp.add_argument(
'--inner_jobs',
default=_DEFAULT_INNER_JOBS, default=_DEFAULT_INNER_JOBS,
type=int, type=int,
help='Number of jobs in each run_tests.py instance') help='Number of jobs in each run_tests.py instance')
argp.add_argument('-n', '--runs_per_test', default=1, type=_runs_per_test_type, argp.add_argument(
'-n',
'--runs_per_test',
default=1,
type=_runs_per_test_type,
help='How many times to run each tests. >1 runs implies ' + help='How many times to run each tests. >1 runs implies ' +
'omitting passing test from the output & reports.') 'omitting passing test from the output & reports.')
argp.add_argument('--max_time', default=-1, type=int, argp.add_argument(
'--max_time',
default=-1,
type=int,
help='Maximum amount of time to run tests for' + help='Maximum amount of time to run tests for' +
'(other tests will be skipped)') '(other tests will be skipped)')
argp.add_argument('--internal_ci', argp.add_argument(
'--internal_ci',
default=False, default=False,
action='store_const', action='store_const',
const=True, const=True,
help='Put reports into subdirectories to improve presentation of ' help='Put reports into subdirectories to improve presentation of '
'results by Internal CI.') 'results by Internal CI.')
argp.add_argument('--bq_result_table', argp.add_argument(
'--bq_result_table',
default='', default='',
type=str, type=str,
nargs='?', nargs='?',
@ -430,13 +492,15 @@ if __name__ == "__main__":
jobs = [] jobs = []
for job in all_jobs: for job in all_jobs:
if not args.filter or all(filter in job.labels for filter in args.filter): if not args.filter or all(filter in job.labels
if not any(exclude_label in job.labels for exclude_label in args.exclude): for filter in args.filter):
if not any(exclude_label in job.labels
for exclude_label in args.exclude):
jobs.append(job) jobs.append(job)
if not jobs: if not jobs:
jobset.message('FAILED', 'No test suites match given criteria.', jobset.message(
do_newline=True) 'FAILED', 'No test suites match given criteria.', do_newline=True)
sys.exit(1) sys.exit(1)
print('IMPORTANT: The changes you are testing need to be locally committed') print('IMPORTANT: The changes you are testing need to be locally committed')
@ -472,22 +536,26 @@ if __name__ == "__main__":
sys.exit(1) sys.exit(1)
jobset.message('START', 'Running test matrix.', do_newline=True) jobset.message('START', 'Running test matrix.', do_newline=True)
num_failures, resultset = jobset.run(jobs, num_failures, resultset = jobset.run(
newline_on_success=True, jobs, newline_on_success=True, travis=True, maxjobs=args.jobs)
travis=True,
maxjobs=args.jobs)
# Merge skipped tests into results to show skipped tests on report.xml # Merge skipped tests into results to show skipped tests on report.xml
if skipped_jobs: if skipped_jobs:
ignored_num_skipped_failures, skipped_results = jobset.run( ignored_num_skipped_failures, skipped_results = jobset.run(
skipped_jobs, skip_jobs=True) skipped_jobs, skip_jobs=True)
resultset.update(skipped_results) resultset.update(skipped_results)
report_utils.render_junit_xml_report(resultset, _report_filename('aggregate_tests'), report_utils.render_junit_xml_report(
resultset,
_report_filename('aggregate_tests'),
suite_name='aggregate_tests') suite_name='aggregate_tests')
if num_failures == 0: if num_failures == 0:
jobset.message('SUCCESS', 'All run_tests.py instance finished successfully.', jobset.message(
'SUCCESS',
'All run_tests.py instance finished successfully.',
do_newline=True) do_newline=True)
else: else:
jobset.message('FAILED', 'Some run_tests.py instance have failed.', jobset.message(
'FAILED',
'Some run_tests.py instance have failed.',
do_newline=True) do_newline=True)
sys.exit(1) sys.exit(1)

@ -13,7 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
""" """
Wrapper around port server starting code. Wrapper around port server starting code.

@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""Runs selected gRPC test/build tasks.""" """Runs selected gRPC test/build tasks."""
from __future__ import print_function from __future__ import print_function
@ -32,10 +31,10 @@ _TARGETS += artifact_targets.targets()
_TARGETS += distribtest_targets.targets() _TARGETS += distribtest_targets.targets()
_TARGETS += package_targets.targets() _TARGETS += package_targets.targets()
def _create_build_map(): def _create_build_map():
"""Maps task names and labels to list of tasks to be built.""" """Maps task names and labels to list of tasks to be built."""
target_build_map = dict([(target.name, [target]) target_build_map = dict([(target.name, [target]) for target in _TARGETS])
for target in _TARGETS])
if len(_TARGETS) > len(target_build_map.keys()): if len(_TARGETS) > len(target_build_map.keys()):
raise Exception('Target names need to be unique') raise Exception('Target names need to be unique')
@ -56,21 +55,23 @@ def _create_build_map():
_BUILD_MAP = _create_build_map() _BUILD_MAP = _create_build_map()
argp = argparse.ArgumentParser(description='Runs build/test targets.') argp = argparse.ArgumentParser(description='Runs build/test targets.')
argp.add_argument('-b', '--build', argp.add_argument(
'-b',
'--build',
choices=sorted(_BUILD_MAP.keys()), choices=sorted(_BUILD_MAP.keys()),
nargs='+', nargs='+',
default=['all'], default=['all'],
help='Target name or target label to build.') help='Target name or target label to build.')
argp.add_argument('-f', '--filter', argp.add_argument(
'-f',
'--filter',
choices=sorted(_BUILD_MAP.keys()), choices=sorted(_BUILD_MAP.keys()),
nargs='+', nargs='+',
default=[], default=[],
help='Filter targets to build with AND semantics.') help='Filter targets to build with AND semantics.')
argp.add_argument('-j', '--jobs', default=multiprocessing.cpu_count(), type=int) argp.add_argument('-j', '--jobs', default=multiprocessing.cpu_count(), type=int)
argp.add_argument('-t', '--travis', argp.add_argument(
default=False, '-t', '--travis', default=False, action='store_const', const=True)
action='store_const',
const=True)
args = argp.parse_args() args = argp.parse_args()
@ -104,12 +105,11 @@ if not build_jobs:
jobset.message('START', 'Building targets.', do_newline=True) jobset.message('START', 'Building targets.', do_newline=True)
num_failures, resultset = jobset.run( num_failures, resultset = jobset.run(
build_jobs, newline_on_success=True, maxjobs=args.jobs) build_jobs, newline_on_success=True, maxjobs=args.jobs)
report_utils.render_junit_xml_report(resultset, 'report_taskrunner_sponge_log.xml', report_utils.render_junit_xml_report(
suite_name='tasks') resultset, 'report_taskrunner_sponge_log.xml', suite_name='tasks')
if num_failures == 0: if num_failures == 0:
jobset.message('SUCCESS', 'All targets built successfully.', jobset.message(
do_newline=True) 'SUCCESS', 'All targets built successfully.', do_newline=True)
else: else:
jobset.message('FAILED', 'Failed to build targets.', jobset.message('FAILED', 'Failed to build targets.', do_newline=True)
do_newline=True)
sys.exit(1) sys.exit(1)

Loading…
Cancel
Save