Merge pull request #5402 from sreecha/stress_test_scripts

Stress test scripts to launch in GKE
pull/5483/head
Jan Tattermusch 9 years ago
commit 8ba6a6b321
  1. 43
      test/cpp/interop/metrics_client.cc
  2. 2
      test/cpp/util/metrics_server.cc
  3. 5
      tools/dockerfile/grpc_interop_stress_cxx/Dockerfile
  4. 2
      tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh
  5. 187
      tools/gcp/stress_test/run_client.py
  6. 120
      tools/gcp/stress_test/run_server.py
  7. 197
      tools/gcp/stress_test/stress_test_utils.py
  8. 140
      tools/gcp/utils/big_query_utils.py
  9. 83
      tools/gcp/utils/kubernetes_api.py
  10. 3
      tools/jenkins/build_interop_stress_image.sh
  11. 556
      tools/run_tests/stress_test/run_stress_tests_on_gke.py

@ -1,6 +1,6 @@
/*
*
* Copyright 2015, Google Inc.
* Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -37,39 +37,45 @@
#include <gflags/gflags.h>
#include <grpc++/grpc++.h>
#include "test/cpp/util/metrics_server.h"
#include "test/cpp/util/test_config.h"
#include "src/proto/grpc/testing/metrics.grpc.pb.h"
#include "src/proto/grpc/testing/metrics.pb.h"
#include "test/cpp/util/metrics_server.h"
#include "test/cpp/util/test_config.h"
DEFINE_string(metrics_server_address, "",
"The metrics server addresses in the fomrat <hostname>:<port>");
DEFINE_bool(total_only, false,
"If true, this prints only the total value of all gauges");
int kDeadlineSecs = 10;
using grpc::testing::EmptyMessage;
using grpc::testing::GaugeResponse;
using grpc::testing::MetricsService;
using grpc::testing::MetricsServiceImpl;
void PrintMetrics(const grpc::string& server_address) {
gpr_log(GPR_INFO, "creating a channel to %s", server_address.c_str());
std::shared_ptr<grpc::Channel> channel(
grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()));
std::unique_ptr<MetricsService::Stub> stub(MetricsService::NewStub(channel));
// Prints the values of all Gauges (unless total_only is set to 'true' in which
// case this only prints the sum of all gauge values).
bool PrintMetrics(std::unique_ptr<MetricsService::Stub> stub, bool total_only) {
grpc::ClientContext context;
EmptyMessage message;
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::seconds(kDeadlineSecs);
context.set_deadline(deadline);
std::unique_ptr<grpc::ClientReader<GaugeResponse>> reader(
stub->GetAllGauges(&context, message));
GaugeResponse gauge_response;
long overall_qps = 0;
int idx = 0;
while (reader->Read(&gauge_response)) {
if (gauge_response.value_case() == GaugeResponse::kLongValue) {
gpr_log(GPR_INFO, "Gauge: %d (%s: %ld)", ++idx,
gauge_response.name().c_str(), gauge_response.long_value());
if (!total_only) {
gpr_log(GPR_INFO, "%s: %ld", gauge_response.name().c_str(),
gauge_response.long_value());
}
overall_qps += gauge_response.long_value();
} else {
gpr_log(GPR_INFO, "Gauge %s is not a long value",
@ -77,12 +83,14 @@ void PrintMetrics(const grpc::string& server_address) {
}
}
gpr_log(GPR_INFO, "OVERALL: %ld", overall_qps);
gpr_log(GPR_INFO, "%ld", overall_qps);
const grpc::Status status = reader->Finish();
if (!status.ok()) {
gpr_log(GPR_ERROR, "Error in getting metrics from the client");
}
return status.ok();
}
int main(int argc, char** argv) {
@ -97,7 +105,12 @@ int main(int argc, char** argv) {
return 1;
}
PrintMetrics(FLAGS_metrics_server_address);
std::shared_ptr<grpc::Channel> channel(grpc::CreateChannel(
FLAGS_metrics_server_address, grpc::InsecureChannelCredentials()));
if (!PrintMetrics(MetricsService::NewStub(channel), FLAGS_total_only)) {
return 1;
}
return 0;
}

@ -57,7 +57,7 @@ long Gauge::Get() {
grpc::Status MetricsServiceImpl::GetAllGauges(
ServerContext* context, const EmptyMessage* request,
ServerWriter<GaugeResponse>* writer) {
gpr_log(GPR_INFO, "GetAllGauges called");
gpr_log(GPR_DEBUG, "GetAllGauges called");
std::lock_guard<std::mutex> lock(mu_);
for (auto it = gauges_.begin(); it != gauges_.end(); it++) {

@ -59,6 +59,8 @@ RUN apt-get update && apt-get install -y \
wget \
zip && apt-get clean
RUN easy_install -U pip
# Prepare ccache
RUN ln -s /usr/bin/ccache /usr/local/bin/gcc
RUN ln -s /usr/bin/ccache /usr/local/bin/g++
@ -71,5 +73,8 @@ RUN ln -s /usr/bin/ccache /usr/local/bin/clang++
# C++ dependencies
RUN apt-get update && apt-get -y install libgflags-dev libgtest-dev libc++-dev clang
# Google Cloud platform API libraries (for BigQuery)
RUN pip install --upgrade google-api-python-client
# Define the default command.
CMD ["bash"]

@ -42,4 +42,4 @@ cd /var/local/git/grpc
make install-certs
# build C++ interop stress client, interop client and server
make stress_test interop_client interop_server
make stress_test metrics_client interop_client interop_server

@ -0,0 +1,187 @@
#!/usr/bin/env python2.7
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import datetime
import os
import re
import select
import subprocess
import sys
import time
from stress_test_utils import EventType
from stress_test_utils import BigQueryHelper
# TODO (sree): Write a python grpc client to directly query the metrics instead
# of calling metrics_client
def _get_qps(metrics_cmd):
qps = 0
try:
# Note: gpr_log() writes even non-error messages to stderr stream. So it is
# important that we set stderr=subprocess.STDOUT
p = subprocess.Popen(args=metrics_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
retcode = p.wait()
(out_str, err_str) = p.communicate()
if retcode != 0:
print 'Error in reading metrics information'
print 'Output: ', out_str
else:
# The overall qps is printed at the end of the line
m = re.search('\d+$', out_str)
qps = int(m.group()) if m else 0
except Exception as ex:
print 'Exception while reading metrics information: ' + str(ex)
return qps
def run_client():
"""This is a wrapper around the stress test client and performs the following:
1) Create the following two tables in Big Query:
(i) Summary table: To record events like the test started, completed
successfully or failed
(ii) Qps table: To periodically record the QPS sent by this client
2) Start the stress test client and add a row in the Big Query summary
table
3) Once every few seconds (as specificed by the poll_interval_secs) poll
the status of the stress test client process and perform the
following:
3.1) If the process is still running, get the current qps by invoking
the metrics client program and add a row in the Big Query
Qps table. Sleep for a duration specified by poll_interval_secs
3.2) If the process exited successfully, add a row in the Big Query
Summary table and exit
3.3) If the process failed, add a row in Big Query summary table and
wait forever.
NOTE: This script typically runs inside a GKE pod which means
that the pod gets destroyed when the script exits. However, in
case the stress test client fails, we would not want the pod to
be destroyed (since we might want to connect to the pod for
examining logs). This is the reason why the script waits forever
in case of failures
"""
env = dict(os.environ)
image_type = env['STRESS_TEST_IMAGE_TYPE']
image_name = env['STRESS_TEST_IMAGE']
args_str = env['STRESS_TEST_ARGS_STR']
metrics_client_image = env['METRICS_CLIENT_IMAGE']
metrics_client_args_str = env['METRICS_CLIENT_ARGS_STR']
run_id = env['RUN_ID']
pod_name = env['POD_NAME']
logfile_name = env.get('LOGFILE_NAME')
poll_interval_secs = float(env['POLL_INTERVAL_SECS'])
project_id = env['GCP_PROJECT_ID']
dataset_id = env['DATASET_ID']
summary_table_id = env['SUMMARY_TABLE_ID']
qps_table_id = env['QPS_TABLE_ID']
bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id,
dataset_id, summary_table_id, qps_table_id)
bq_helper.initialize()
# Create BigQuery Dataset and Tables: Summary Table and Metrics Table
if not bq_helper.setup_tables():
print 'Error in creating BigQuery tables'
return
start_time = datetime.datetime.now()
logfile = None
details = 'Logging to stdout'
if logfile_name is not None:
print 'Opening logfile: %s ...' % logfile_name
details = 'Logfile: %s' % logfile_name
logfile = open(logfile_name, 'w')
# Update status that the test is starting (in the status table)
bq_helper.insert_summary_row(EventType.STARTING, details)
metrics_cmd = [metrics_client_image
] + [x for x in metrics_client_args_str.split()]
stress_cmd = [image_name] + [x for x in args_str.split()]
print 'Launching process %s ...' % stress_cmd
stress_p = subprocess.Popen(args=stress_cmd,
stdout=logfile,
stderr=subprocess.STDOUT)
qps_history = [1, 1, 1] # Maintain the last 3 qps readings
qps_history_idx = 0 # Index into the qps_history list
is_error = False
while True:
# Check if stress_client is still running. If so, collect metrics and upload
# to BigQuery status table
if stress_p.poll() is not None:
end_time = datetime.datetime.now().isoformat()
event_type = EventType.SUCCESS
details = 'End time: %s' % end_time
if stress_p.returncode != 0:
event_type = EventType.FAILURE
details = 'Return code = %d. End time: %s' % (stress_p.returncode,
end_time)
is_error = True
bq_helper.insert_summary_row(event_type, details)
print details
break
# Stress client still running. Get metrics
qps = _get_qps(metrics_cmd)
qps_recorded_at = datetime.datetime.now().isoformat()
print 'qps: %d at %s' % (qps, qps_recorded_at)
# If QPS has been zero for the last 3 iterations, flag it as error and exit
qps_history[qps_history_idx] = qps
qps_history_idx = (qps_history_idx + 1) % len(qps_history)
if sum(qps_history) == 0:
details = 'QPS has been zero for the last %d seconds - as of : %s' % (
poll_interval_secs * 3, qps_recorded_at)
is_error = True
bq_helper.insert_summary_row(EventType.FAILURE, details)
print details
break
# Upload qps metrics to BiqQuery
bq_helper.insert_qps_row(qps, qps_recorded_at)
time.sleep(poll_interval_secs)
if is_error:
print 'Waiting indefinitely..'
select.select([], [], [])
print 'Completed'
return
if __name__ == '__main__':
run_client()

@ -0,0 +1,120 @@
#!/usr/bin/env python2.7
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import datetime
import os
import select
import subprocess
import sys
import time
from stress_test_utils import BigQueryHelper
from stress_test_utils import EventType
def run_server():
"""This is a wrapper around the interop server and performs the following:
1) Create a 'Summary table' in Big Query to record events like the server
started, completed successfully or failed. NOTE: This also creates
another table called the QPS table which is currently NOT needed on the
server (it is needed on the stress test clients)
2) Start the server process and add a row in Big Query summary table
3) Wait for the server process to terminate. The server process does not
terminate unless there is an error.
If the server process terminated with a failure, add a row in Big Query
and wait forever.
NOTE: This script typically runs inside a GKE pod which means that the
pod gets destroyed when the script exits. However, in case the server
process fails, we would not want the pod to be destroyed (since we
might want to connect to the pod for examining logs). This is the
reason why the script waits forever in case of failures.
"""
# Read the parameters from environment variables
env = dict(os.environ)
run_id = env['RUN_ID'] # The unique run id for this test
image_type = env['STRESS_TEST_IMAGE_TYPE']
image_name = env['STRESS_TEST_IMAGE']
args_str = env['STRESS_TEST_ARGS_STR']
pod_name = env['POD_NAME']
project_id = env['GCP_PROJECT_ID']
dataset_id = env['DATASET_ID']
summary_table_id = env['SUMMARY_TABLE_ID']
qps_table_id = env['QPS_TABLE_ID']
logfile_name = env.get('LOGFILE_NAME')
print('pod_name: %s, project_id: %s, run_id: %s, dataset_id: %s, '
'summary_table_id: %s, qps_table_id: %s') % (
pod_name, project_id, run_id, dataset_id, summary_table_id,
qps_table_id)
bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id,
dataset_id, summary_table_id, qps_table_id)
bq_helper.initialize()
# Create BigQuery Dataset and Tables: Summary Table and Metrics Table
if not bq_helper.setup_tables():
print 'Error in creating BigQuery tables'
return
start_time = datetime.datetime.now()
logfile = None
details = 'Logging to stdout'
if logfile_name is not None:
print 'Opening log file: ', logfile_name
logfile = open(logfile_name, 'w')
details = 'Logfile: %s' % logfile_name
# Update status that the test is starting (in the status table)
bq_helper.insert_summary_row(EventType.STARTING, details)
stress_cmd = [image_name] + [x for x in args_str.split()]
print 'Launching process %s ...' % stress_cmd
stress_p = subprocess.Popen(args=stress_cmd,
stdout=logfile,
stderr=subprocess.STDOUT)
returncode = stress_p.wait()
if returncode != 0:
end_time = datetime.datetime.now().isoformat()
event_type = EventType.FAILURE
details = 'Returncode: %d; End time: %s' % (returncode, end_time)
bq_helper.insert_summary_row(event_type, details)
print 'Waiting indefinitely..'
select.select([], [], [])
return returncode
if __name__ == '__main__':
run_server()

@ -0,0 +1,197 @@
#!/usr/bin/env python2.7
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import datetime
import json
import os
import re
import select
import subprocess
import sys
import time
# Import big_query_utils module
bq_utils_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), '../utils'))
sys.path.append(bq_utils_dir)
import big_query_utils as bq_utils
class EventType:
STARTING = 'STARTING'
SUCCESS = 'SUCCESS'
FAILURE = 'FAILURE'
class BigQueryHelper:
"""Helper class for the stress test wrappers to interact with BigQuery.
"""
def __init__(self, run_id, image_type, pod_name, project_id, dataset_id,
summary_table_id, qps_table_id):
self.run_id = run_id
self.image_type = image_type
self.pod_name = pod_name
self.project_id = project_id
self.dataset_id = dataset_id
self.summary_table_id = summary_table_id
self.qps_table_id = qps_table_id
def initialize(self):
self.bq = bq_utils.create_big_query()
def setup_tables(self):
return bq_utils.create_dataset(self.bq, self.project_id, self.dataset_id) \
and self.__create_summary_table() \
and self.__create_qps_table()
def insert_summary_row(self, event_type, details):
row_values_dict = {
'run_id': self.run_id,
'image_type': self.image_type,
'pod_name': self.pod_name,
'event_date': datetime.datetime.now().isoformat(),
'event_type': event_type,
'details': details
}
# row_unique_id is something that uniquely identifies the row (BigQuery uses
# it for duplicate detection).
row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, event_type)
row = bq_utils.make_row(row_unique_id, row_values_dict)
return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id,
self.summary_table_id, [row])
def insert_qps_row(self, qps, recorded_at):
row_values_dict = {
'run_id': self.run_id,
'pod_name': self.pod_name,
'recorded_at': recorded_at,
'qps': qps
}
# row_unique_id is something that uniquely identifies the row (BigQuery uses
# it for duplicate detection).
row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, recorded_at)
row = bq_utils.make_row(row_unique_id, row_values_dict)
return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id,
self.qps_table_id, [row])
def check_if_any_tests_failed(self, num_query_retries=3):
query = ('SELECT event_type FROM %s.%s WHERE run_id = \'%s\' AND '
'event_type="%s"') % (self.dataset_id, self.summary_table_id,
self.run_id, EventType.FAILURE)
query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
page = self.bq.jobs().getQueryResults(**query_job['jobReference']).execute(
num_retries=num_query_retries)
num_failures = int(page['totalRows'])
print 'num rows: ', num_failures
return num_failures > 0
def print_summary_records(self, num_query_retries=3):
line = '-' * 120
print line
print 'Summary records'
print 'Run Id: ', self.run_id
print 'Dataset Id: ', self.dataset_id
print line
query = ('SELECT pod_name, image_type, event_type, event_date, details'
' FROM %s.%s WHERE run_id = \'%s\' ORDER by event_date;') % (
self.dataset_id, self.summary_table_id, self.run_id)
query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
print '{:<25} {:<12} {:<12} {:<30} {}'.format(
'Pod name', 'Image type', 'Event type', 'Date', 'Details')
print line
page_token = None
while True:
page = self.bq.jobs().getQueryResults(
pageToken=page_token,
**query_job['jobReference']).execute(num_retries=num_query_retries)
rows = page.get('rows', [])
for row in rows:
print '{:<25} {:<12} {:<12} {:<30} {}'.format(
row['f'][0]['v'], row['f'][1]['v'], row['f'][2]['v'],
row['f'][3]['v'], row['f'][4]['v'])
page_token = page.get('pageToken')
if not page_token:
break
def print_qps_records(self, num_query_retries=3):
line = '-' * 80
print line
print 'QPS Summary'
print 'Run Id: ', self.run_id
print 'Dataset Id: ', self.dataset_id
print line
query = (
'SELECT pod_name, recorded_at, qps FROM %s.%s WHERE run_id = \'%s\' '
'ORDER by recorded_at;') % (self.dataset_id, self.qps_table_id,
self.run_id)
query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
print '{:<25} {:30} {}'.format('Pod name', 'Recorded at', 'Qps')
print line
page_token = None
while True:
page = self.bq.jobs().getQueryResults(
pageToken=page_token,
**query_job['jobReference']).execute(num_retries=num_query_retries)
rows = page.get('rows', [])
for row in rows:
print '{:<25} {:30} {}'.format(row['f'][0]['v'], row['f'][1]['v'],
row['f'][2]['v'])
page_token = page.get('pageToken')
if not page_token:
break
def __create_summary_table(self):
summary_table_schema = [
('run_id', 'STRING', 'Test run id'),
('image_type', 'STRING', 'Client or Server?'),
('pod_name', 'STRING', 'GKE pod hosting this image'),
('event_date', 'STRING', 'The date of this event'),
('event_type', 'STRING', 'STARTED/SUCCESS/FAILURE'),
('details', 'STRING', 'Any other relevant details')
]
desc = ('The table that contains START/SUCCESS/FAILURE events for '
' the stress test clients and servers')
return bq_utils.create_table(self.bq, self.project_id, self.dataset_id,
self.summary_table_id, summary_table_schema,
desc)
def __create_qps_table(self):
qps_table_schema = [
('run_id', 'STRING', 'Test run id'),
('pod_name', 'STRING', 'GKE pod hosting this image'),
('recorded_at', 'STRING', 'Metrics recorded at time'),
('qps', 'INTEGER', 'Queries per second')
]
desc = 'The table that cointains the qps recorded at various intervals'
return bq_utils.create_table(self.bq, self.project_id, self.dataset_id,
self.qps_table_id, qps_table_schema, desc)

@ -0,0 +1,140 @@
#!/usr/bin/env python2.7
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import argparse
import json
import uuid
import httplib2
from apiclient import discovery
from apiclient.errors import HttpError
from oauth2client.client import GoogleCredentials
NUM_RETRIES = 3
def create_big_query():
"""Authenticates with cloud platform and gets a BiqQuery service object
"""
creds = GoogleCredentials.get_application_default()
return discovery.build('bigquery', 'v2', credentials=creds)
def create_dataset(biq_query, project_id, dataset_id):
is_success = True
body = {
'datasetReference': {
'projectId': project_id,
'datasetId': dataset_id
}
}
try:
dataset_req = biq_query.datasets().insert(projectId=project_id, body=body)
dataset_req.execute(num_retries=NUM_RETRIES)
except HttpError as http_error:
if http_error.resp.status == 409:
print 'Warning: The dataset %s already exists' % dataset_id
else:
# Note: For more debugging info, print "http_error.content"
print 'Error in creating dataset: %s. Err: %s' % (dataset_id, http_error)
is_success = False
return is_success
def create_table(big_query, project_id, dataset_id, table_id, table_schema,
description):
is_success = True
body = {
'description': description,
'schema': {
'fields': [{
'name': field_name,
'type': field_type,
'description': field_description
} for (field_name, field_type, field_description) in table_schema]
},
'tableReference': {
'datasetId': dataset_id,
'projectId': project_id,
'tableId': table_id
}
}
try:
table_req = big_query.tables().insert(projectId=project_id,
datasetId=dataset_id,
body=body)
res = table_req.execute(num_retries=NUM_RETRIES)
print 'Successfully created %s "%s"' % (res['kind'], res['id'])
except HttpError as http_error:
if http_error.resp.status == 409:
print 'Warning: Table %s already exists' % table_id
else:
print 'Error in creating table: %s. Err: %s' % (table_id, http_error)
is_success = False
return is_success
def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
is_success = True
body = {'rows': rows_list}
try:
insert_req = big_query.tabledata().insertAll(projectId=project_id,
datasetId=dataset_id,
tableId=table_id,
body=body)
print body
res = insert_req.execute(num_retries=NUM_RETRIES)
print res
except HttpError as http_error:
print 'Error in inserting rows in the table %s' % table_id
is_success = False
return is_success
def sync_query_job(big_query, project_id, query, timeout=5000):
query_data = {'query': query, 'timeoutMs': timeout}
query_job = None
try:
query_job = big_query.jobs().query(
projectId=project_id,
body=query_data).execute(num_retries=NUM_RETRIES)
except HttpError as http_error:
print 'Query execute job failed with error: %s' % http_error
print http_error.content
return query_job
# List of (column name, column type, description) tuples
def make_row(unique_row_id, row_values_dict):
"""row_values_dict is a dictionary of column name and column value.
"""
return {'insertId': unique_row_id, 'json': row_values_dict}

@ -1,5 +1,5 @@
#!/usr/bin/env python2.7
# Copyright 2015, Google Inc.
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@ -33,8 +33,9 @@ import json
_REQUEST_TIMEOUT_SECS = 10
def _make_pod_config(pod_name, image_name, container_port_list, cmd_list,
arg_list):
arg_list, env_dict):
"""Creates a string containing the Pod defintion as required by the Kubernetes API"""
body = {
'kind': 'Pod',
@ -48,20 +49,23 @@ def _make_pod_config(pod_name, image_name, container_port_list, cmd_list,
{
'name': pod_name,
'image': image_name,
'ports': []
'ports': [{'containerPort': port,
'protocol': 'TCP'}
for port in container_port_list],
'imagePullPolicy': 'Always'
}
]
}
}
# Populate the 'ports' list
for port in container_port_list:
port_entry = {'containerPort': port, 'protocol': 'TCP'}
body['spec']['containers'][0]['ports'].append(port_entry)
env_list = [{'name': k, 'value': v} for (k, v) in env_dict.iteritems()]
if len(env_list) > 0:
body['spec']['containers'][0]['env'] = env_list
# Add the 'Command' and 'Args' attributes if they are passed.
# Note:
# - 'Command' overrides the ENTRYPOINT in the Docker Image
# - 'Args' override the COMMAND in Docker image (yes, it is confusing!)
# - 'Args' override the CMD in Docker image (yes, it is confusing!)
if len(cmd_list) > 0:
body['spec']['containers'][0]['command'] = cmd_list
if len(arg_list) > 0:
@ -70,7 +74,7 @@ def _make_pod_config(pod_name, image_name, container_port_list, cmd_list,
def _make_service_config(service_name, pod_name, service_port_list,
container_port_list, is_headless):
container_port_list, is_headless):
"""Creates a string containing the Service definition as required by the Kubernetes API.
NOTE:
@ -124,6 +128,7 @@ def _print_connection_error(msg):
print('ERROR: Connection failed. Did you remember to run Kubenetes proxy on '
'localhost (i.e kubectl proxy --port=<proxy_port>) ?. Error: %s' % msg)
def _do_post(post_url, api_name, request_body):
"""Helper to do HTTP POST.
@ -135,7 +140,9 @@ def _do_post(post_url, api_name, request_body):
"""
is_success = True
try:
r = requests.post(post_url, data=request_body, timeout=_REQUEST_TIMEOUT_SECS)
r = requests.post(post_url,
data=request_body,
timeout=_REQUEST_TIMEOUT_SECS)
if r.status_code == requests.codes.conflict:
print('WARN: Looks like the resource already exists. Api: %s, url: %s' %
(api_name, post_url))
@ -143,7 +150,8 @@ def _do_post(post_url, api_name, request_body):
print('ERROR: %s API returned error. HTTP response: (%d) %s' %
(api_name, r.status_code, r.text))
is_success = False
except(requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
except (requests.exceptions.Timeout,
requests.exceptions.ConnectionError) as e:
is_success = False
_print_connection_error(str(e))
return is_success
@ -165,7 +173,8 @@ def _do_delete(del_url, api_name):
print('ERROR: %s API returned error. HTTP response: %s' %
(api_name, r.text))
is_success = False
except(requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
except (requests.exceptions.Timeout,
requests.exceptions.ConnectionError) as e:
is_success = False
_print_connection_error(str(e))
return is_success
@ -179,12 +188,12 @@ def create_service(kube_host, kube_port, namespace, service_name, pod_name,
post_url = 'http://%s:%d/api/v1/namespaces/%s/services' % (
kube_host, kube_port, namespace)
request_body = _make_service_config(service_name, pod_name, service_port_list,
container_port_list, is_headless)
container_port_list, is_headless)
return _do_post(post_url, 'Create Service', request_body)
def create_pod(kube_host, kube_port, namespace, pod_name, image_name,
container_port_list, cmd_list, arg_list):
container_port_list, cmd_list, arg_list, env_dict):
"""Creates a Kubernetes Pod.
Note that it is generally NOT considered a good practice to directly create
@ -200,7 +209,7 @@ def create_pod(kube_host, kube_port, namespace, pod_name, image_name,
post_url = 'http://%s:%d/api/v1/namespaces/%s/pods' % (kube_host, kube_port,
namespace)
request_body = _make_pod_config(pod_name, image_name, container_port_list,
cmd_list, arg_list)
cmd_list, arg_list, env_dict)
return _do_post(post_url, 'Create Pod', request_body)
@ -214,3 +223,47 @@ def delete_pod(kube_host, kube_port, namespace, pod_name):
del_url = 'http://%s:%d/api/v1/namespaces/%s/pods/%s' % (kube_host, kube_port,
namespace, pod_name)
return _do_delete(del_url, 'Delete Pod')
def create_pod_and_service(kube_host, kube_port, namespace, pod_name,
image_name, container_port_list, cmd_list, arg_list,
env_dict, is_headless_service):
"""A helper function that creates a pod and a service (if pod creation was successful)."""
is_success = create_pod(kube_host, kube_port, namespace, pod_name, image_name,
container_port_list, cmd_list, arg_list, env_dict)
if not is_success:
print 'Error in creating Pod'
return False
is_success = create_service(
kube_host,
kube_port,
namespace,
pod_name, # Use pod_name for service
pod_name,
container_port_list, # Service port list same as container port list
container_port_list,
is_headless_service)
if not is_success:
print 'Error in creating Service'
return False
print 'Successfully created the pod/service %s' % pod_name
return True
def delete_pod_and_service(kube_host, kube_port, namespace, pod_name):
""" A helper function that calls delete_pod and delete_service """
is_success = delete_pod(kube_host, kube_port, namespace, pod_name)
if not is_success:
print 'Error in deleting pod %s' % pod_name
return False
# Note: service name assumed to the the same as pod name
is_success = delete_service(kube_host, kube_port, namespace, pod_name)
if not is_success:
print 'Error in deleting service %s' % pod_name
return False
print 'Successfully deleted the Pod/Service: %s' % pod_name
return True

@ -35,6 +35,8 @@ set -x
# Params:
# INTEROP_IMAGE - name of tag of the final interop image
# INTEROP_IMAGE_TAG - Optional. If set, the created image will be tagged using
# the command: 'docker tag $INTEROP_IMAGE $INTEROP_IMAGE_REPOSITORY_TAG'
# BASE_NAME - base name used to locate the base Dockerfile and build script
# TTY_FLAG - optional -t flag to make docker allocate tty
# BUILD_INTEROP_DOCKER_EXTRA_ARGS - optional args to be passed to the
@ -77,6 +79,7 @@ CONTAINER_NAME="build_${BASE_NAME}_$(uuidgen)"
$BASE_IMAGE \
bash -l /var/local/jenkins/grpc/tools/dockerfile/$BASE_NAME/build_interop_stress.sh \
&& docker commit $CONTAINER_NAME $INTEROP_IMAGE \
&& ( if [ -n "$INTEROP_IMAGE_REPOSITORY_TAG" ]; then docker tag -f $INTEROP_IMAGE $INTEROP_IMAGE_REPOSITORY_TAG ; fi ) \
&& echo "Successfully built image $INTEROP_IMAGE")
EXITCODE=$?

@ -0,0 +1,556 @@
#!/usr/bin/env python2.7
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import argparse
import datetime
import os
import subprocess
import sys
import time
stress_test_utils_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), '../../gcp/stress_test'))
sys.path.append(stress_test_utils_dir)
from stress_test_utils import BigQueryHelper
kubernetes_api_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), '../../gcp/utils'))
sys.path.append(kubernetes_api_dir)
import kubernetes_api
_GRPC_ROOT = os.path.abspath(os.path.join(
os.path.dirname(sys.argv[0]), '../../..'))
os.chdir(_GRPC_ROOT)
# num of seconds to wait for the GKE image to start and warmup
_GKE_IMAGE_WARMUP_WAIT_SECS = 60
_SERVER_POD_NAME = 'stress-server'
_CLIENT_POD_NAME_PREFIX = 'stress-client'
_DATASET_ID_PREFIX = 'stress_test'
_SUMMARY_TABLE_ID = 'summary'
_QPS_TABLE_ID = 'qps'
_DEFAULT_DOCKER_IMAGE_NAME = 'grpc_stress_test'
# The default port on which the kubernetes proxy server is started on localhost
# (i.e kubectl proxy --port=<port>)
_DEFAULT_KUBERNETES_PROXY_PORT = 8001
# How frequently should the stress client wrapper script (running inside a GKE
# container) poll the health of the stress client (also running inside the GKE
# container) and upload metrics to BigQuery
_DEFAULT_STRESS_CLIENT_POLL_INTERVAL_SECS = 60
# The default setting for stress test server and client
_DEFAULT_STRESS_SERVER_PORT = 8080
_DEFAULT_METRICS_PORT = 8081
_DEFAULT_TEST_CASES_STR = 'empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1'
_DEFAULT_NUM_CHANNELS_PER_SERVER = 5
_DEFAULT_NUM_STUBS_PER_CHANNEL = 10
_DEFAULT_METRICS_COLLECTION_INTERVAL_SECS = 30
# Number of stress client instances to launch
_DEFAULT_NUM_CLIENTS = 3
# How frequently should this test monitor the health of Stress clients and
# Servers running in GKE
_DEFAULT_TEST_POLL_INTERVAL_SECS = 60
# Default run time for this test (2 hour)
_DEFAULT_TEST_DURATION_SECS = 7200
# The number of seconds it would take a GKE pod to warm up (i.e get to 'Running'
# state from the time of creation). Ideally this is something the test should
# automatically determine by using Kubernetes API to poll the pods status.
_DEFAULT_GKE_WARMUP_SECS = 60
class KubernetesProxy:
""" Class to start a proxy on localhost to the Kubernetes API server """
def __init__(self, api_port):
self.port = api_port
self.p = None
self.started = False
def start(self):
cmd = ['kubectl', 'proxy', '--port=%d' % self.port]
self.p = subprocess.Popen(args=cmd)
self.started = True
time.sleep(2)
print '..Started'
def get_port(self):
return self.port
def is_started(self):
return self.started
def __del__(self):
if self.p is not None:
print 'Shutting down Kubernetes proxy..'
self.p.kill()
class TestSettings:
def __init__(self, build_docker_image, test_poll_interval_secs,
test_duration_secs, kubernetes_proxy_port):
self.build_docker_image = build_docker_image
self.test_poll_interval_secs = test_poll_interval_secs
self.test_duration_secs = test_duration_secs
self.kubernetes_proxy_port = kubernetes_proxy_port
class GkeSettings:
def __init__(self, project_id, docker_image_name):
self.project_id = project_id
self.docker_image_name = docker_image_name
self.tag_name = 'gcr.io/%s/%s' % (project_id, docker_image_name)
class BigQuerySettings:
def __init__(self, run_id, dataset_id, summary_table_id, qps_table_id):
self.run_id = run_id
self.dataset_id = dataset_id
self.summary_table_id = summary_table_id
self.qps_table_id = qps_table_id
class StressServerSettings:
def __init__(self, server_pod_name, server_port):
self.server_pod_name = server_pod_name
self.server_port = server_port
class StressClientSettings:
def __init__(self, num_clients, client_pod_name_prefix, server_pod_name,
server_port, metrics_port, metrics_collection_interval_secs,
stress_client_poll_interval_secs, num_channels_per_server,
num_stubs_per_channel, test_cases_str):
self.num_clients = num_clients
self.client_pod_name_prefix = client_pod_name_prefix
self.server_pod_name = server_pod_name
self.server_port = server_port
self.metrics_port = metrics_port
self.metrics_collection_interval_secs = metrics_collection_interval_secs
self.stress_client_poll_interval_secs = stress_client_poll_interval_secs
self.num_channels_per_server = num_channels_per_server
self.num_stubs_per_channel = num_stubs_per_channel
self.test_cases_str = test_cases_str
# == Derived properties ==
# Note: Client can accept a list of server addresses (a comma separated list
# of 'server_name:server_port'). In this case, we only have one server
# address to pass
self.server_addresses = '%s.default.svc.cluster.local:%d' % (
server_pod_name, server_port)
self.client_pod_names_list = ['%s-%d' % (client_pod_name_prefix, i)
for i in range(1, num_clients + 1)]
def _build_docker_image(image_name, tag_name):
""" Build the docker image and add tag it to the GKE repository """
print 'Building docker image: %s' % image_name
os.environ['INTEROP_IMAGE'] = image_name
os.environ['INTEROP_IMAGE_REPOSITORY_TAG'] = tag_name
# Note that 'BASE_NAME' HAS to be 'grpc_interop_stress_cxx' since the script
# build_interop_stress_image.sh invokes the following script:
# tools/dockerfile/$BASE_NAME/build_interop_stress.sh
os.environ['BASE_NAME'] = 'grpc_interop_stress_cxx'
cmd = ['tools/jenkins/build_interop_stress_image.sh']
retcode = subprocess.call(args=cmd)
if retcode != 0:
print 'Error in building docker image'
return False
return True
def _push_docker_image_to_gke_registry(docker_tag_name):
"""Executes 'gcloud docker push <docker_tag_name>' to push the image to GKE registry"""
cmd = ['gcloud', 'docker', 'push', docker_tag_name]
print 'Pushing %s to GKE registry..' % docker_tag_name
retcode = subprocess.call(args=cmd)
if retcode != 0:
print 'Error in pushing docker image %s to the GKE registry' % docker_tag_name
return False
return True
def _launch_server(gke_settings, stress_server_settings, bq_settings,
kubernetes_proxy):
""" Launches a stress test server instance in GKE cluster """
if not kubernetes_proxy.is_started:
print 'Kubernetes proxy must be started before calling this function'
return False
# This is the wrapper script that is run in the container. This script runs
# the actual stress test server
server_cmd_list = ['/var/local/git/grpc/tools/gcp/stress_test/run_server.py']
# run_server.py does not take any args from the command line. The args are
# instead passed via environment variables (see server_env below)
server_arg_list = []
# The parameters to the script run_server.py are injected into the container
# via environment variables
server_env = {
'STRESS_TEST_IMAGE_TYPE': 'SERVER',
'STRESS_TEST_IMAGE': '/var/local/git/grpc/bins/opt/interop_server',
'STRESS_TEST_ARGS_STR': '--port=%s' % stress_server_settings.server_port,
'RUN_ID': bq_settings.run_id,
'POD_NAME': stress_server_settings.server_pod_name,
'GCP_PROJECT_ID': gke_settings.project_id,
'DATASET_ID': bq_settings.dataset_id,
'SUMMARY_TABLE_ID': bq_settings.summary_table_id,
'QPS_TABLE_ID': bq_settings.qps_table_id
}
# Launch Server
is_success = kubernetes_api.create_pod_and_service(
'localhost',
kubernetes_proxy.get_port(),
'default', # Use 'default' namespace
stress_server_settings.server_pod_name,
gke_settings.tag_name,
[stress_server_settings.server_port], # Port that should be exposed
server_cmd_list,
server_arg_list,
server_env,
True # Headless = True for server. Since we want DNS records to be created by GKE
)
return is_success
def _launch_client(gke_settings, stress_server_settings, stress_client_settings,
bq_settings, kubernetes_proxy):
""" Launches a configurable number of stress test clients on GKE cluster """
if not kubernetes_proxy.is_started:
print 'Kubernetes proxy must be started before calling this function'
return False
stress_client_arg_list = [
'--server_addresses=%s' % stress_client_settings.server_addresses,
'--test_cases=%s' % stress_client_settings.test_cases_str,
'--num_stubs_per_channel=%d' %
stress_client_settings.num_stubs_per_channel
]
# This is the wrapper script that is run in the container. This script runs
# the actual stress client
client_cmd_list = ['/var/local/git/grpc/tools/gcp/stress_test/run_client.py']
# run_client.py takes no args. All args are passed as env variables (see
# client_env)
client_arg_list = []
metrics_server_address = 'localhost:%d' % stress_client_settings.metrics_port
metrics_client_arg_list = [
'--metrics_server_address=%s' % metrics_server_address,
'--total_only=true'
]
# The parameters to the script run_client.py are injected into the container
# via environment variables
client_env = {
'STRESS_TEST_IMAGE_TYPE': 'CLIENT',
'STRESS_TEST_IMAGE': '/var/local/git/grpc/bins/opt/stress_test',
'STRESS_TEST_ARGS_STR': ' '.join(stress_client_arg_list),
'METRICS_CLIENT_IMAGE': '/var/local/git/grpc/bins/opt/metrics_client',
'METRICS_CLIENT_ARGS_STR': ' '.join(metrics_client_arg_list),
'RUN_ID': bq_settings.run_id,
'POLL_INTERVAL_SECS':
str(stress_client_settings.stress_client_poll_interval_secs),
'GCP_PROJECT_ID': gke_settings.project_id,
'DATASET_ID': bq_settings.dataset_id,
'SUMMARY_TABLE_ID': bq_settings.summary_table_id,
'QPS_TABLE_ID': bq_settings.qps_table_id
}
for pod_name in stress_client_settings.client_pod_names_list:
client_env['POD_NAME'] = pod_name
is_success = kubernetes_api.create_pod_and_service(
'localhost', # Since proxy is running on localhost
kubernetes_proxy.get_port(),
'default', # default namespace
pod_name,
gke_settings.tag_name,
[stress_client_settings.metrics_port
], # Client pods expose metrics port
client_cmd_list,
client_arg_list,
client_env,
False # Client is not a headless service
)
if not is_success:
print 'Error in launching client %s' % pod_name
return False
return True
def _launch_server_and_client(gke_settings, stress_server_settings,
stress_client_settings, bq_settings,
kubernetes_proxy_port):
# Start kubernetes proxy
print 'Kubernetes proxy'
kubernetes_proxy = KubernetesProxy(kubernetes_proxy_port)
kubernetes_proxy.start()
print 'Launching server..'
is_success = _launch_server(gke_settings, stress_server_settings, bq_settings,
kubernetes_proxy)
if not is_success:
print 'Error in launching server'
return False
# Server takes a while to start.
# TODO(sree) Use Kubernetes API to query the status of the server instead of
# sleeping
print 'Waiting for %s seconds for the server to start...' % _GKE_IMAGE_WARMUP_WAIT_SECS
time.sleep(_GKE_IMAGE_WARMUP_WAIT_SECS)
# Launch client
client_pod_name_prefix = 'stress-client'
is_success = _launch_client(gke_settings, stress_server_settings,
stress_client_settings, bq_settings,
kubernetes_proxy)
if not is_success:
print 'Error in launching client(s)'
return False
print 'Waiting for %s seconds for the client images to start...' % _GKE_IMAGE_WARMUP_WAIT_SECS
time.sleep(_GKE_IMAGE_WARMUP_WAIT_SECS)
return True
def _delete_server_and_client(stress_server_settings, stress_client_settings,
kubernetes_proxy_port):
kubernetes_proxy = KubernetesProxy(kubernetes_proxy_port)
kubernetes_proxy.start()
# Delete clients first
is_success = True
for pod_name in stress_client_settings.client_pod_names_list:
is_success = kubernetes_api.delete_pod_and_service(
'localhost', kubernetes_proxy_port, 'default', pod_name)
if not is_success:
return False
# Delete server
is_success = kubernetes_api.delete_pod_and_service(
'localhost', kubernetes_proxy_port, 'default',
stress_server_settings.server_pod_name)
return is_success
def run_test_main(test_settings, gke_settings, stress_server_settings,
stress_client_clients):
is_success = True
if test_settings.build_docker_image:
is_success = _build_docker_image(gke_settings.docker_image_name,
gke_settings.tag_name)
if not is_success:
return False
is_success = _push_docker_image_to_gke_registry(gke_settings.tag_name)
if not is_success:
return False
# Create a unique id for this run (Note: Using timestamp instead of UUID to
# make it easier to deduce the date/time of the run just by looking at the run
# run id. This is useful in debugging when looking at records in Biq query)
run_id = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
dataset_id = '%s_%s' % (_DATASET_ID_PREFIX, run_id)
# Big Query settings (common for both Stress Server and Client)
bq_settings = BigQuerySettings(run_id, dataset_id, _SUMMARY_TABLE_ID,
_QPS_TABLE_ID)
bq_helper = BigQueryHelper(run_id, '', '', args.project_id, dataset_id,
_SUMMARY_TABLE_ID, _QPS_TABLE_ID)
bq_helper.initialize()
try:
is_success = _launch_server_and_client(gke_settings, stress_server_settings,
stress_client_settings, bq_settings,
test_settings.kubernetes_proxy_port)
if not is_success:
return False
start_time = datetime.datetime.now()
end_time = start_time + datetime.timedelta(
seconds=test_settings.test_duration_secs)
print 'Running the test until %s' % end_time.isoformat()
while True:
if datetime.datetime.now() > end_time:
print 'Test was run for %d seconds' % test_settings.test_duration_secs
break
# Check if either stress server or clients have failed
if bq_helper.check_if_any_tests_failed():
is_success = False
print 'Some tests failed.'
break
# Things seem to be running fine. Wait until next poll time to check the
# status
print 'Sleeping for %d seconds..' % test_settings.test_poll_interval_secs
time.sleep(test_settings.test_poll_interval_secs)
# Print BiqQuery tables
bq_helper.print_summary_records()
bq_helper.print_qps_records()
finally:
# If is_success is False at this point, it means that the stress tests were
# started successfully but failed while running the tests. In this case we
# do should not delete the pods (since they contain all the failure
# information)
if is_success:
_delete_server_and_client(stress_server_settings, stress_client_settings,
test_settings.kubernetes_proxy_port)
return is_success
argp = argparse.ArgumentParser(
description='Launch stress tests in GKE',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
argp.add_argument('--project_id',
required=True,
help='The Google Cloud Platform Project Id')
argp.add_argument('--num_clients',
default=1,
type=int,
help='Number of client instances to start')
argp.add_argument('--docker_image_name',
default=_DEFAULT_DOCKER_IMAGE_NAME,
help='The name of the docker image containing stress client '
'and stress servers')
argp.add_argument('--build_docker_image',
dest='build_docker_image',
action='store_true',
help='Build a docker image and push to Google Container '
'Registry')
argp.add_argument('--do_not_build_docker_image',
dest='build_docker_image',
action='store_false',
help='Do not build and push docker image to Google Container '
'Registry')
argp.set_defaults(build_docker_image=True)
argp.add_argument('--test_poll_interval_secs',
default=_DEFAULT_TEST_POLL_INTERVAL_SECS,
type=int,
help='How frequently should this script should monitor the '
'health of stress clients and servers running in the GKE '
'cluster')
argp.add_argument('--test_duration_secs',
default=_DEFAULT_TEST_DURATION_SECS,
type=int,
help='How long should this test be run')
argp.add_argument('--kubernetes_proxy_port',
default=_DEFAULT_KUBERNETES_PROXY_PORT,
type=int,
help='The port on which the kubernetes proxy (on localhost)'
' is started')
argp.add_argument('--stress_server_port',
default=_DEFAULT_STRESS_SERVER_PORT,
type=int,
help='The port on which the stress server (in GKE '
'containers) listens')
argp.add_argument('--stress_client_metrics_port',
default=_DEFAULT_METRICS_PORT,
type=int,
help='The port on which the stress clients (in GKE '
'containers) expose metrics')
argp.add_argument('--stress_client_poll_interval_secs',
default=_DEFAULT_STRESS_CLIENT_POLL_INTERVAL_SECS,
type=int,
help='How frequently should the stress client wrapper script'
' running inside GKE should monitor health of the actual '
' stress client process and upload the metrics to BigQuery')
argp.add_argument('--stress_client_metrics_collection_interval_secs',
default=_DEFAULT_METRICS_COLLECTION_INTERVAL_SECS,
type=int,
help='How frequently should metrics be collected in-memory on'
' the stress clients (running inside GKE containers). Note '
'that this is NOT the same as the upload-to-BigQuery '
'frequency. The metrics upload frequency is controlled by the'
' --stress_client_poll_interval_secs flag')
argp.add_argument('--stress_client_num_channels_per_server',
default=_DEFAULT_NUM_CHANNELS_PER_SERVER,
type=int,
help='The number of channels created to each server from a '
'stress client')
argp.add_argument('--stress_client_num_stubs_per_channel',
default=_DEFAULT_NUM_STUBS_PER_CHANNEL,
type=int,
help='The number of stubs created per channel. This number '
'indicates the max number of RPCs that can be made in '
'parallel on each channel at any given time')
argp.add_argument('--stress_client_test_cases',
default=_DEFAULT_TEST_CASES_STR,
help='List of test cases (with weights) to be executed by the'
' stress test client. The list is in the following format:\n'
' <testcase_1:w_1,<test_case2:w_2>..<testcase_n:w_n>\n'
' (Note: The weights do not have to add up to 100)')
if __name__ == '__main__':
args = argp.parse_args()
test_settings = TestSettings(
args.build_docker_image, args.test_poll_interval_secs,
args.test_duration_secs, args.kubernetes_proxy_port)
gke_settings = GkeSettings(args.project_id, args.docker_image_name)
stress_server_settings = StressServerSettings(_SERVER_POD_NAME,
args.stress_server_port)
stress_client_settings = StressClientSettings(
args.num_clients, _CLIENT_POD_NAME_PREFIX, _SERVER_POD_NAME,
args.stress_server_port, args.stress_client_metrics_port,
args.stress_client_metrics_collection_interval_secs,
args.stress_client_poll_interval_secs,
args.stress_client_num_channels_per_server,
args.stress_client_num_stubs_per_channel, args.stress_client_test_cases)
run_test_main(test_settings, gke_settings, stress_server_settings,
stress_client_settings)
Loading…
Cancel
Save