Change RUN_ID type to string to allow for a non-numeric run_id

pull/5402/head
Sree Kuchibhotla 9 years ago
parent 559e45becd
commit 2715a39a2e
  1. 2
      tools/big_query/big_query_utils.py
  2. 103
      tools/gke/run_stress_tests_on_gke.py
  3. 5
      tools/run_tests/stress_test/run_server.py
  4. 17
      tools/run_tests/stress_test/stress_test_utils.py

@ -135,6 +135,6 @@ def sync_query_job(big_query, project_id, query, timeout=5000):
# List of (column name, column type, description) tuples
def make_row(unique_row_id, row_values_dict):
"""row_values_dict is a dictionar of column name and column value.
"""row_values_dict is a dictionary of column name and column value.
"""
return {'insertId': unique_row_id, 'json': row_values_dict}

@ -33,11 +33,17 @@ import subprocess
import sys
import time
stress_test_utils_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), '../run_tests/stress_test'))
sys.path.append(stress_test_utils_dir)
from stress_test_utils import BigQueryHelper
import kubernetes_api
GRPC_ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
os.chdir(GRPC_ROOT)
class BigQuerySettings:
def __init__(self, run_id, dataset_id, summary_table_id, qps_table_id):
@ -283,27 +289,16 @@ def _launch_client(gcp_project_id, docker_image_name, bq_settings,
return True
def _launch_server_and_client(gcp_project_id, docker_image_name,
def _launch_server_and_client(bq_settings, gcp_project_id, docker_image_name,
num_client_instances):
# == Big Query tables related settings (Common for both server and client) ==
# Create a unique id for this run (Note: Using timestamp instead of UUID to
# make it easier to deduce the date/time of the run just by looking at the run
# run id. This is useful in debugging when looking at records in Biq query)
run_id = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
dataset_id = 'stress_test_%s' % run_id
summary_table_id = 'summary'
qps_table_id = 'qps'
bq_settings = BigQuerySettings(run_id, dataset_id, summary_table_id,
qps_table_id)
# Start kubernetes proxy
kubernetes_api_port = 9001
kubernetes_proxy = KubernetesProxy(kubernetes_api_port)
kubernetes_proxy.start()
# num of seconds to wait for the GKE image to start and warmup
image_warmp_secs = 60
server_pod_name = 'stress-server'
server_port = 8080
is_success = _launch_server(gcp_project_id, docker_image_name, bq_settings,
@ -315,7 +310,8 @@ def _launch_server_and_client(gcp_project_id, docker_image_name,
# Server takes a while to start.
# TODO(sree) Use Kubernetes API to query the status of the server instead of
# sleeping
time.sleep(60)
print 'Waiting for %s seconds for the server to start...' % image_warmp_secs
time.sleep(image_warmp_secs)
# Launch client
server_address = '%s.default.svc.cluster.local:%d' % (server_pod_name,
@ -329,6 +325,8 @@ def _launch_server_and_client(gcp_project_id, docker_image_name,
print 'Error in launching client(s)'
return False
print 'Waiting for %s seconds for the client images to start...' % image_warmp_secs
time.sleep(image_warmp_secs)
return True
@ -359,31 +357,68 @@ def _build_and_push_docker_image(gcp_project_id, docker_image_name, tag_name):
# TODO(sree): This is just to test the above APIs. Rewrite this to make
# everything configurable (like image names / number of instances etc)
def test_run():
image_name = 'grpc_stress_test'
gcp_project_id = 'sree-gce'
tag_name = 'gcr.io/%s/%s' % (gcp_project_id, image_name)
num_client_instances = 3
def run_test(skip_building_image, gcp_project_id, image_name, tag_name,
num_client_instances, poll_interval_secs, total_duration_secs):
if not skip_building_image:
is_success = _build_docker_image(image_name, tag_name)
if not is_success:
return False
is_success = _build_docker_image(image_name, tag_name)
if not is_success:
return
is_success = _push_docker_image_to_gke_registry(tag_name)
if not is_success:
return False
is_success = _push_docker_image_to_gke_registry(tag_name)
if not is_success:
return
# == Big Query tables related settings (Common for both server and client) ==
# Create a unique id for this run (Note: Using timestamp instead of UUID to
# make it easier to deduce the date/time of the run just by looking at the run
# run id. This is useful in debugging when looking at records in Biq query)
run_id = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
dataset_id = 'stress_test_%s' % run_id
summary_table_id = 'summary'
qps_table_id = 'qps'
bq_settings = BigQuerySettings(run_id, dataset_id, summary_table_id,
qps_table_id)
is_success = _launch_server_and_client(gcp_project_id, tag_name,
bq_helper = BigQueryHelper(run_id, '', '', gcp_project_id, dataset_id,
summary_table_id, qps_table_id)
bq_helper.initialize()
is_success = _launch_server_and_client(bq_settings, gcp_project_id, tag_name,
num_client_instances)
if not is_success:
return False
start_time = datetime.datetime.now()
end_time = start_time + datetime.timedelta(seconds=total_duration_secs)
# Run the test for 2 mins
time.sleep(120)
while True:
if datetime.datetime.now() > end_time:
print 'Test was run for %d seconds' % total_duration_secs
break
is_success = _delete_server_and_client(num_client_instances)
# Check if either stress server or clients have failed
if not bq_helper.check_if_any_tests_failed():
is_success = False
print 'Some tests failed.'
break
# Things seem to be running fine. Wait until next poll time to check the
# status
time.sleep(poll_interval_secs)
if not is_success:
return
# Print BiqQuery tables
bq_helper.print_summary_records()
bq_helper.print_qps_records()
_delete_server_and_client(num_client_instances)
return is_success
if __name__ == '__main__':
test_run()
image_name = 'grpc_stress_test'
gcp_project_id = 'sree-gce'
tag_name = 'gcr.io/%s/%s' % (gcp_project_id, image_name)
num_client_instances = 3
poll_interval_secs = 5,
test_duration_secs = 150
run_test(True, gcp_project_id, image_name, tag_name, num_client_instances,
poll_interval_secs, test_duration_secs)

@ -72,6 +72,11 @@ def run_server():
logfile_name = env.get('LOGFILE_NAME')
print('pod_name: %s, project_id: %s, run_id: %s, dataset_id: %s, '
'summary_table_id: %s, qps_table_id: %s') % (
pod_name, project_id, run_id, dataset_id, summary_table_id,
qps_table_id)
bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id,
dataset_id, summary_table_id, qps_table_id)
bq_helper.initialize()

@ -43,11 +43,13 @@ bq_utils_dir = os.path.abspath(os.path.join(
sys.path.append(bq_utils_dir)
import big_query_utils as bq_utils
class EventType:
STARTING = 'STARTING'
SUCCESS = 'SUCCESS'
FAILURE = 'FAILURE'
class BigQueryHelper:
"""Helper class for the stress test wrappers to interact with BigQuery.
"""
@ -101,9 +103,9 @@ class BigQueryHelper:
self.qps_table_id, [row])
def check_if_any_tests_failed(self, num_query_retries=3):
query = ('SELECT event_type FROM %s.%s WHERE run_id = %s AND '
query = ('SELECT event_type FROM %s.%s WHERE run_id = \'%s\' AND '
'event_type="%s"') % (self.dataset_id, self.summary_table_id,
self.run_id, EventType.FAILURE)
self.run_id, EventType.FAILURE)
query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
page = self.bq.jobs().getQueryResults(**query_job['jobReference']).execute(
num_retries=num_query_retries)
@ -119,7 +121,7 @@ class BigQueryHelper:
print 'Run Id', self.run_id
print line
query = ('SELECT pod_name, image_type, event_type, event_date, details'
' FROM %s.%s WHERE run_id = %s ORDER by event_date;') % (
' FROM %s.%s WHERE run_id = \'%s\' ORDER by event_date;') % (
self.dataset_id, self.summary_table_id, self.run_id)
query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
@ -147,8 +149,9 @@ class BigQueryHelper:
print 'Run Id: ', self.run_id
print line
query = (
'SELECT pod_name, recorded_at, qps FROM %s.%s WHERE run_id = %s ORDER '
'by recorded_at;') % (self.dataset_id, self.qps_table_id, self.run_id)
'SELECT pod_name, recorded_at, qps FROM %s.%s WHERE run_id = \'%s\' '
'ORDER by recorded_at;') % (self.dataset_id, self.qps_table_id,
self.run_id)
query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
print '{:<25} {:30} {}'.format('Pod name', 'Recorded at', 'Qps')
print line
@ -167,7 +170,7 @@ class BigQueryHelper:
def __create_summary_table(self):
summary_table_schema = [
('run_id', 'INTEGER', 'Test run id'),
('run_id', 'STRING', 'Test run id'),
('image_type', 'STRING', 'Client or Server?'),
('pod_name', 'STRING', 'GKE pod hosting this image'),
('event_date', 'STRING', 'The date of this event'),
@ -182,7 +185,7 @@ class BigQueryHelper:
def __create_qps_table(self):
qps_table_schema = [
('run_id', 'INTEGER', 'Test run id'),
('run_id', 'STRING', 'Test run id'),
('pod_name', 'STRING', 'GKE pod hosting this image'),
('recorded_at', 'STRING', 'Metrics recorded at time'),
('qps', 'INTEGER', 'Queries per second')

Loading…
Cancel
Save