Merge pull request #6188 from jtattermusch/performance_bigquery_support

Benchmarking: support uploading to bigquery
pull/6124/head^2
Jan Tattermusch 9 years ago
commit ec138dd3cb
  1. 18
      tools/gcp/utils/big_query_utils.py
  2. 103
      tools/run_tests/performance/bq_upload_result.py
  3. 40
      tools/run_tests/performance/run_qps_driver.sh
  4. 202
      tools/run_tests/performance/scenario_result_schema.json
  5. 28
      tools/run_tests/run_performance_tests.py

@ -71,16 +71,22 @@ def create_dataset(biq_query, project_id, dataset_id):
def create_table(big_query, project_id, dataset_id, table_id, table_schema,
description):
fields = [{'name': field_name,
'type': field_type,
'description': field_description
} for (field_name, field_type, field_description) in table_schema]
return create_table2(big_query, project_id, dataset_id, table_id,
fields, description)
def create_table2(big_query, project_id, dataset_id, table_id, fields_schema,
description):
is_success = True
body = {
'description': description,
'schema': {
'fields': [{
'name': field_name,
'type': field_type,
'description': field_description
} for (field_name, field_type, field_description) in table_schema]
'fields': fields_schema
},
'tableReference': {
'datasetId': dataset_id,
@ -112,9 +118,7 @@ def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
datasetId=dataset_id,
tableId=table_id,
body=body)
print body
res = insert_req.execute(num_retries=NUM_RETRIES)
print res
except HttpError as http_error:
print 'Error in inserting rows in the table %s' % table_id
is_success = False

@ -0,0 +1,103 @@
#!/usr/bin/env python2.7
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Uploads performance benchmark result file to bigquery.
import argparse
import json
import os
import sys
import uuid
gcp_utils_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), '../../gcp/utils'))
sys.path.append(gcp_utils_dir)
import big_query_utils
_PROJECT_ID='grpc-testing'
def _upload_scenario_result_to_bigquery(dataset_id, table_id, result_file):
bq = big_query_utils.create_big_query()
_create_results_table(bq, dataset_id, table_id)
with open(result_file, 'r') as f:
scenario_result = json.loads(f.read())
if not _insert_result(bq, dataset_id, table_id, scenario_result):
print 'Error uploading result to bigquery.'
sys.exit(1)
def _insert_result(bq, dataset_id, table_id, scenario_result):
_flatten_result_inplace(scenario_result)
row = big_query_utils.make_row(str(uuid.uuid4()), scenario_result)
return big_query_utils.insert_rows(bq,
_PROJECT_ID,
dataset_id,
table_id,
[row])
def _create_results_table(bq, dataset_id, table_id):
with open(os.path.dirname(__file__) + '/scenario_result_schema.json', 'r') as f:
table_schema = json.loads(f.read())
desc = 'Results of performance benchmarks.'
return big_query_utils.create_table2(bq, _PROJECT_ID, dataset_id,
table_id, table_schema, desc)
def _flatten_result_inplace(scenario_result):
"""Bigquery is not really great for handling deeply nested data
and repeated fields. To maintain values of some fields while keeping
the schema relatively simple, we artificially leave some of the fields
as JSON strings.
"""
scenario_result['scenario']['clientConfig'] = json.dumps(scenario_result['scenario']['clientConfig'])
scenario_result['scenario']['serverConfig'] = json.dumps(scenario_result['scenario']['serverConfig'])
scenario_result['latencies'] = json.dumps(scenario_result['latencies'])
for stats in scenario_result['clientStats']:
stats['latencies'] = json.dumps(stats['latencies'])
scenario_result['serverCores'] = json.dumps(scenario_result['serverCores'])
argp = argparse.ArgumentParser(description='Upload result to big query.')
argp.add_argument('--bq_result_table', required=True, default=None, type=str,
help='Bigquery "dataset.table" to upload results to.')
argp.add_argument('--file_to_upload', default='scenario_result.json', type=str,
help='Report file to upload.')
args = argp.parse_args()
dataset_id, table_id = args.bq_result_table.split('.', 2)
_upload_scenario_result_to_bigquery(dataset_id, table_id, args.file_to_upload)
print 'Successfully uploaded %s to BigQuery.\n' % args.file_to_upload

@ -0,0 +1,40 @@
#!/bin/bash
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
set -ex
cd $(dirname $0)/../../..
bins/opt/qps_json_driver "$@"
if [ "$BQ_RESULT_TABLE" != "" ]
then
tools/run_tests/performance/bq_upload_result.py --bq_result_table="$BQ_RESULT_TABLE"
fi

@ -0,0 +1,202 @@
[
{
"name": "metadata",
"type": "RECORD",
"mode": "NULLABLE",
"fields": [
{
"name": "buildNumber",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "buildUrl",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "jobName",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "gitCommit",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "gitActualCommit",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "created",
"type": "TIMESTAMP",
"mode": "NULLABLE"
}
]
},
{
"name": "scenario",
"type": "RECORD",
"mode": "NULLABLE",
"fields": [
{
"name": "name",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "clientConfig",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "numClients",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "serverConfig",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "numServers",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "warmupSeconds",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "benchmarkSeconds",
"type": "INTEGER",
"mode": "NULLABLE"
}
]
},
{
"name": "latencies",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "clientStats",
"type": "RECORD",
"mode": "REPEATED",
"fields": [
{
"name": "latencies",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "timeElapsed",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "timeUser",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "timeSystem",
"type": "FLOAT",
"mode": "NULLABLE"
}
]
},
{
"name": "serverStats",
"type": "RECORD",
"mode": "REPEATED",
"fields": [
{
"name": "timeElapsed",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "timeUser",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "timeSystem",
"type": "FLOAT",
"mode": "NULLABLE"
}
]
},
{
"name": "serverCores",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "summary",
"type": "RECORD",
"mode": "NULLABLE",
"fields": [
{
"name": "qps",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "qps_per_server_core",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "server_system_time",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "server_user_time",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "client_system_time",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "client_user_time",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "latency_50",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "latency_90",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "latency_95",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "latency_99",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "latency_999",
"type": "FLOAT",
"mode": "NULLABLE"
}
]
}
]

@ -93,15 +93,19 @@ def create_qpsworker_job(language, shortname=None,
return QpsWorkerJob(jobspec, language, host_and_port)
def create_scenario_jobspec(scenario_json, workers, remote_host=None):
def create_scenario_jobspec(scenario_json, workers, remote_host=None,
bq_result_table=None):
"""Runs one scenario using QPS driver."""
# setting QPS_WORKERS env variable here makes sure it works with SSH too.
cmd = 'QPS_WORKERS="%s" bins/opt/qps_json_driver ' % ','.join(workers)
cmd += '--scenarios_json=%s' % pipes.quote(json.dumps({'scenarios': [scenario_json]}))
cmd += ' --scenario_result_file=scenario_result.json'
cmd = 'QPS_WORKERS="%s" ' % ','.join(workers)
if bq_result_table:
cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table
cmd += 'tools/run_tests/performance/run_qps_driver.sh '
cmd += '--scenarios_json=%s ' % pipes.quote(json.dumps({'scenarios': [scenario_json]}))
cmd += '--scenario_result_file=scenario_result.json'
if remote_host:
user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)
cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && %s"' % (user_at_host, cmd)
cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (user_at_host, pipes.quote(cmd))
return jobset.JobSpec(
cmdline=[cmd],
@ -117,7 +121,7 @@ def create_quit_jobspec(workers, remote_host=None):
cmd = 'QPS_WORKERS="%s" bins/opt/qps_driver --quit' % ','.join(workers)
if remote_host:
user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, remote_host)
cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && %s"' % (user_at_host, cmd)
cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (user_at_host, pipes.quote(cmd))
return jobset.JobSpec(
cmdline=[cmd],
@ -226,7 +230,8 @@ def start_qpsworkers(languages, worker_hosts):
for worker_idx, worker in enumerate(workers)]
def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*'):
def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',
bq_result_table=None):
"""Create jobspecs for scenarios to run."""
scenarios = []
for language in languages:
@ -248,7 +253,8 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*'):
workers[idx] = workers_by_lang[custom_server_lang][idx]
scenario = create_scenario_jobspec(scenario_json,
workers,
remote_host=remote_host)
remote_host=remote_host,
bq_result_table=bq_result_table)
scenarios.append(scenario)
# the very last scenario requests shutting down the workers.
@ -290,6 +296,8 @@ argp.add_argument('--remote_worker_host',
help='Worker hosts where to start QPS workers.')
argp.add_argument('-r', '--regex', default='.*', type=str,
help='Regex to select scenarios to run.')
argp.add_argument('--bq_result_table', default=None, type=str,
help='Bigquery "dataset.table" to upload results to.')
args = argp.parse_args()
@ -298,6 +306,7 @@ languages = set(scenario_config.LANGUAGES[l]
scenario_config.LANGUAGES.iterkeys() if x == 'all' else [x]
for x in args.language))
# Put together set of remote hosts where to run and build
remote_hosts = set()
if args.remote_worker_host:
@ -329,7 +338,8 @@ try:
scenarios = create_scenarios(languages,
workers_by_lang=worker_addresses,
remote_host=args.remote_driver_host,
regex=args.regex)
regex=args.regex,
bq_result_table=args.bq_result_table)
if not scenarios:
raise Exception('No scenarios to run')

Loading…
Cancel
Save