Merge github.com:grpc/grpc into optionalize_roundrobin

pull/6000/head
Craig Tiller 9 years ago
commit 4c5edc10be
  1. 11
      src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs
  2. 195
      src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
  3. 71
      src/csharp/Grpc.IntegrationTesting/GenericService.cs
  4. 1
      src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
  5. 12
      src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs
  6. 65
      src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
  7. 2
      src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h
  8. 4
      templates/tools/dockerfile/gcp_api_libraries.include
  9. 40
      templates/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile.template
  10. 53
      tools/dockerfile/grpc_interop_stress_cxx/Dockerfile
  11. 4
      tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh
  12. 7
      tools/jenkins/build_interop_stress_image.sh
  13. 10
      tools/run_tests/stress_test/README.md
  14. 135
      tools/run_tests/stress_test/configs/opt-tsan-asan.json
  15. 86
      tools/run_tests/stress_test/configs/opt.json
  16. 636
      tools/run_tests/stress_test/run_on_gke.py
  17. 556
      tools/run_tests/stress_test/run_stress_tests_on_gke.py

@ -1,6 +1,6 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@ -46,16 +46,13 @@ namespace Grpc.Testing
/// </summary>
public class BenchmarkServiceImpl : BenchmarkService.IBenchmarkService
{
private readonly int responseSize;
public BenchmarkServiceImpl(int responseSize)
public BenchmarkServiceImpl()
{
this.responseSize = responseSize;
}
public Task<SimpleResponse> UnaryCall(SimpleRequest request, ServerCallContext context)
{
var response = new SimpleResponse { Payload = CreateZerosPayload(responseSize) };
var response = new SimpleResponse { Payload = CreateZerosPayload(request.ResponseSize) };
return Task.FromResult(response);
}
@ -63,7 +60,7 @@ namespace Grpc.Testing
{
await requestStream.ForEachAsync(async request =>
{
var response = new SimpleResponse { Payload = CreateZerosPayload(responseSize) };
var response = new SimpleResponse { Payload = CreateZerosPayload(request.ResponseSize) };
await responseStream.WriteAsync(response);
});
}

@ -41,6 +41,7 @@ using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
using NUnit.Framework;
using Grpc.Testing;
@ -50,42 +51,65 @@ namespace Grpc.IntegrationTesting
/// <summary>
/// Helper methods to start client runners for performance testing.
/// </summary>
public static class ClientRunners
public class ClientRunners
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientRunners>();
/// <summary>
/// Creates a started client runner.
/// </summary>
public static IClientRunner CreateStarted(ClientConfig config)
{
Logger.Debug("ClientConfig: {0}", config);
string target = config.ServerTargets.Single();
GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop);
GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop,
"Only closed loop scenario supported for C#");
GrpcPreconditions.CheckArgument(config.ClientChannels == 1, "ClientConfig.ClientChannels needs to be 1");
var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
var channel = new Channel(target, credentials);
if (config.OutstandingRpcsPerChannel != 0)
{
Logger.Warning("ClientConfig.OutstandingRpcsPerChannel is not supported for C#. Ignoring the value");
}
if (config.AsyncClientThreads != 0)
{
Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value");
}
if (config.CoreLimit != 0)
{
Logger.Warning("ClientConfig.CoreLimit is not supported for C#. Ignoring the value");
}
if (config.CoreList.Count > 0)
{
Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value");
}
switch (config.RpcType)
var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
List<ChannelOption> channelOptions = null;
if (config.SecurityParams != null && config.SecurityParams.ServerHostOverride != "")
{
case RpcType.UNARY:
return new SyncUnaryClientRunner(channel,
config.PayloadConfig.SimpleParams.ReqSize,
config.HistogramParams);
case RpcType.STREAMING:
default:
throw new ArgumentException("Unsupported RpcType.");
channelOptions = new List<ChannelOption>
{
new ChannelOption(ChannelOptions.SslTargetNameOverride, config.SecurityParams.ServerHostOverride)
};
}
var channel = new Channel(target, credentials, channelOptions);
return new ClientRunnerImpl(channel,
config.ClientType,
config.RpcType,
config.PayloadConfig,
config.HistogramParams);
}
}
/// <summary>
/// Client that starts synchronous unary calls in a closed loop.
/// </summary>
public class SyncUnaryClientRunner : IClientRunner
public class ClientRunnerImpl : IClientRunner
{
const double SecondsToNanos = 1e9;
readonly Channel channel;
readonly int payloadSize;
readonly ClientType clientType;
readonly RpcType rpcType;
readonly PayloadConfig payloadConfig;
readonly Histogram histogram;
readonly BenchmarkService.IBenchmarkServiceClient client;
@ -93,15 +117,19 @@ namespace Grpc.IntegrationTesting
readonly CancellationTokenSource stoppedCts;
readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
public SyncUnaryClientRunner(Channel channel, int payloadSize, HistogramParams histogramParams)
public ClientRunnerImpl(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams)
{
this.channel = GrpcPreconditions.CheckNotNull(channel);
this.payloadSize = payloadSize;
this.clientType = clientType;
this.rpcType = rpcType;
this.payloadConfig = payloadConfig;
this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible);
this.stoppedCts = new CancellationTokenSource();
this.client = BenchmarkService.NewClient(channel);
this.runnerTask = Task.Factory.StartNew(Run, TaskCreationOptions.LongRunning);
var threadBody = GetThreadBody();
this.runnerTask = Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning);
}
public ClientStats GetStats(bool reset)
@ -126,12 +154,9 @@ namespace Grpc.IntegrationTesting
await channel.ShutdownAsync();
}
private void Run()
private void RunClosedLoopUnary()
{
var request = new SimpleRequest
{
Payload = CreateZerosPayload(payloadSize)
};
var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
while (!stoppedCts.Token.IsCancellationRequested)
@ -145,6 +170,124 @@ namespace Grpc.IntegrationTesting
}
}
private async Task RunClosedLoopUnaryAsync()
{
var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
while (!stoppedCts.Token.IsCancellationRequested)
{
stopwatch.Restart();
await client.UnaryCallAsync(request);
stopwatch.Stop();
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
}
}
private async Task RunClosedLoopStreamingAsync()
{
var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
using (var call = client.StreamingCall())
{
while (!stoppedCts.Token.IsCancellationRequested)
{
stopwatch.Restart();
await call.RequestStream.WriteAsync(request);
await call.ResponseStream.MoveNext();
stopwatch.Stop();
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
}
// finish the streaming call
await call.RequestStream.CompleteAsync();
Assert.IsFalse(await call.ResponseStream.MoveNext());
}
}
private async Task RunGenericClosedLoopStreamingAsync()
{
var request = CreateByteBufferRequest();
var stopwatch = new Stopwatch();
var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, GenericService.StreamingCallMethod, new CallOptions());
using (var call = Calls.AsyncDuplexStreamingCall(callDetails))
{
while (!stoppedCts.Token.IsCancellationRequested)
{
stopwatch.Restart();
await call.RequestStream.WriteAsync(request);
await call.ResponseStream.MoveNext();
stopwatch.Stop();
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
}
// finish the streaming call
await call.RequestStream.CompleteAsync();
Assert.IsFalse(await call.ResponseStream.MoveNext());
}
}
private Action GetThreadBody()
{
if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams)
{
GrpcPreconditions.CheckArgument(clientType == ClientType.ASYNC_CLIENT, "Generic client only supports async API");
GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls");
return () =>
{
RunGenericClosedLoopStreamingAsync().Wait();
};
}
GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
if (clientType == ClientType.SYNC_CLIENT)
{
GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#");
return RunClosedLoopUnary;
}
else if (clientType == ClientType.ASYNC_CLIENT)
{
switch (rpcType)
{
case RpcType.UNARY:
return () =>
{
RunClosedLoopUnaryAsync().Wait();
};
case RpcType.STREAMING:
return () =>
{
RunClosedLoopStreamingAsync().Wait();
};
}
}
throw new ArgumentException("Unsupported configuration.");
}
private SimpleRequest CreateSimpleRequest()
{
GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
return new SimpleRequest
{
Payload = CreateZerosPayload(payloadConfig.SimpleParams.ReqSize),
ResponseSize = payloadConfig.SimpleParams.RespSize
};
}
private byte[] CreateByteBufferRequest()
{
return new byte[payloadConfig.BytebufParams.ReqSize];
}
private static Payload CreateZerosPayload(int size)
{
return new Payload { Body = ByteString.CopyFrom(new byte[size]) };

@ -0,0 +1,71 @@
#region Copyright notice and license
// Copyright 2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Core.Utils;
using NUnit.Framework;
using Grpc.Testing;
namespace Grpc.IntegrationTesting
{
/// <summary>
/// Utility methods for defining and calling a service that doesn't use protobufs
/// for serialization/deserialization.
/// </summary>
public static class GenericService
{
readonly static Marshaller<byte[]> ByteArrayMarshaller = new Marshaller<byte[]>((b) => b, (b) => b);
public readonly static Method<byte[], byte[]> StreamingCallMethod = new Method<byte[], byte[]>(
MethodType.DuplexStreaming,
"grpc.testing.BenchmarkService",
"StreamingCall",
ByteArrayMarshaller,
ByteArrayMarshaller
);
public static ServerServiceDefinition BindHandler(DuplexStreamingServerMethod<byte[], byte[]> handler)
{
return ServerServiceDefinition.CreateBuilder(StreamingCallMethod.ServiceName)
.AddMethod(StreamingCallMethod, handler).Build();
}
}
}

@ -120,6 +120,7 @@
<Compile Include="WorkerServiceImpl.cs" />
<Compile Include="QpsWorker.cs" />
<Compile Include="WallClockStopwatch.cs" />
<Compile Include="GenericService.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -55,14 +55,7 @@ namespace Grpc.IntegrationTesting
{
var serverConfig = new ServerConfig
{
ServerType = ServerType.ASYNC_SERVER,
PayloadConfig = new PayloadConfig
{
SimpleParams = new SimpleProtoParams
{
RespSize = 100
}
}
ServerType = ServerType.ASYNC_SERVER
};
serverRunner = ServerRunners.CreateStarted(serverConfig);
}
@ -88,7 +81,8 @@ namespace Grpc.IntegrationTesting
{
SimpleParams = new SimpleProtoParams
{
ReqSize = 100
ReqSize = 100,
RespSize = 100
}
},
HistogramParams = new HistogramParams

@ -41,6 +41,7 @@ using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
using NUnit.Framework;
using Grpc.Testing;
@ -50,27 +51,78 @@ namespace Grpc.IntegrationTesting
/// <summary>
/// Helper methods to start server runners for performance testing.
/// </summary>
public static class ServerRunners
public class ServerRunners
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ServerRunners>();
/// <summary>
/// Creates a started server runner.
/// </summary>
public static IServerRunner CreateStarted(ServerConfig config)
{
GrpcPreconditions.CheckArgument(config.ServerType == ServerType.ASYNC_SERVER);
Logger.Debug("ServerConfig: {0}", config);
var credentials = config.SecurityParams != null ? TestCredentials.CreateSslServerCredentials() : ServerCredentials.Insecure;
// TODO: qps_driver needs to setup payload properly...
int responseSize = config.PayloadConfig != null ? config.PayloadConfig.SimpleParams.RespSize : 0;
if (config.AsyncServerThreads != 0)
{
Logger.Warning("ServerConfig.AsyncServerThreads is not supported for C#. Ignoring the value");
}
if (config.CoreLimit != 0)
{
Logger.Warning("ServerConfig.CoreLimit is not supported for C#. Ignoring the value");
}
if (config.CoreList.Count > 0)
{
Logger.Warning("ServerConfig.CoreList is not supported for C#. Ignoring the value");
}
ServerServiceDefinition service = null;
if (config.ServerType == ServerType.ASYNC_SERVER)
{
GrpcPreconditions.CheckArgument(config.PayloadConfig == null,
"ServerConfig.PayloadConfig shouldn't be set for BenchmarkService based server.");
service = BenchmarkService.BindService(new BenchmarkServiceImpl());
}
else if (config.ServerType == ServerType.ASYNC_GENERIC_SERVER)
{
var genericService = new GenericServiceImpl(config.PayloadConfig.BytebufParams.RespSize);
service = GenericService.BindHandler(genericService.StreamingCall);
}
else
{
throw new ArgumentException("Unsupported ServerType");
}
var server = new Server
{
Services = { BenchmarkService.BindService(new BenchmarkServiceImpl(responseSize)) },
Services = { service },
Ports = { new ServerPort("[::]", config.Port, credentials) }
};
server.Start();
return new ServerRunnerImpl(server);
}
private class GenericServiceImpl
{
readonly byte[] response;
public GenericServiceImpl(int responseSize)
{
this.response = new byte[responseSize];
}
/// <summary>
/// Generic streaming call handler.
/// </summary>
public async Task StreamingCall(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext context)
{
await requestStream.ForEachAsync(async request =>
{
await responseStream.WriteAsync(response);
});
}
}
}
/// <summary>
@ -119,6 +171,5 @@ namespace Grpc.IntegrationTesting
{
return server.ShutdownAsync();
}
}
}
}

@ -54,7 +54,9 @@
GRPC_XMACRO_ITEM.
#endif
#if TARGET_OS_IPHONE
GRPC_XMACRO_ITEM(isCell, IsWWAN)
#endif
GRPC_XMACRO_ITEM(reachable, Reachable)
GRPC_XMACRO_ITEM(transientConnection, TransientConnection)
GRPC_XMACRO_ITEM(connectionRequired, ConnectionRequired)

@ -0,0 +1,4 @@
# Google Cloud platform API libraries
RUN apt-get update && apt-get install -y python-pip && apt-get clean
RUN pip install --upgrade google-api-python-client

@ -0,0 +1,40 @@
%YAML 1.2
--- |
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
FROM debian:jessie
<%include file="../apt_get_basic.include"/>
<%include file="../ccache_setup.include"/>
<%include file="../cxx_deps.include"/>
<%include file="../gcp_api_libraries.include"/>
<%include file="../clang_update.include"/>
# Define the default command.
CMD ["bash"]

@ -27,12 +27,9 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# A work-in-progress Dockerfile that allows running gRPC test suites
# inside a docker container.
FROM debian:jessie
# Install Git.
# Install Git and basic packages.
RUN apt-get update && apt-get install -y \
autoconf \
autotools-dev \
@ -43,13 +40,16 @@ RUN apt-get update && apt-get install -y \
gcc \
gcc-multilib \
git \
golang \
gyp \
lcov \
libc6 \
libc6-dbg \
libc6-dev \
libgtest-dev \
libtool \
make \
perl \
strace \
python-dev \
python-setuptools \
@ -59,7 +59,9 @@ RUN apt-get update && apt-get install -y \
wget \
zip && apt-get clean
RUN easy_install -U pip
#================
# Build profiling
RUN apt-get update && apt-get install -y time && apt-get clean
# Prepare ccache
RUN ln -s /usr/bin/ccache /usr/local/bin/gcc
@ -69,12 +71,47 @@ RUN ln -s /usr/bin/ccache /usr/local/bin/c++
RUN ln -s /usr/bin/ccache /usr/local/bin/clang
RUN ln -s /usr/bin/ccache /usr/local/bin/clang++
##################
#=================
# C++ dependencies
RUN apt-get update && apt-get -y install libgflags-dev libgtest-dev libc++-dev clang
RUN apt-get update && apt-get -y install libgflags-dev libgtest-dev libc++-dev clang && apt-get clean
# Google Cloud platform API libraries (for BigQuery)
# Google Cloud platform API libraries
RUN apt-get update && apt-get install -y python-pip && apt-get clean
RUN pip install --upgrade google-api-python-client
#=================
# Update clang to a version with improved tsan
RUN apt-get update && apt-get -y install python cmake && apt-get clean
RUN git clone -n -b release_38 http://llvm.org/git/llvm.git && \
cd llvm && git checkout ad57503 && cd ..
RUN git clone -n -b release_38 http://llvm.org/git/clang.git && \
cd clang && git checkout ad2c56e && cd ..
RUN git clone -n -b release_38 http://llvm.org/git/compiler-rt.git && \
cd compiler-rt && git checkout 3176922 && cd ..
RUN git clone -n -b release_38 \
http://llvm.org/git/clang-tools-extra.git && cd clang-tools-extra && \
git checkout c288525 && cd ..
RUN git clone -n -b release_38 http://llvm.org/git/libcxx.git && \
cd libcxx && git checkout fda3549 && cd ..
RUN git clone -n -b release_38 http://llvm.org/git/libcxxabi.git && \
cd libcxxabi && git checkout 8d4e51d && cd ..
RUN mv clang llvm/tools
RUN mv compiler-rt llvm/projects
RUN mv clang-tools-extra llvm/tools/clang/tools
RUN mv libcxx llvm/projects
RUN mv libcxxabi llvm/projects
RUN mkdir llvm-build
RUN cd llvm-build && cmake \
-DCMAKE_BUILD_TYPE:STRING=Release \
-DCMAKE_INSTALL_PREFIX:STRING=/usr \
-DLLVM_TARGETS_TO_BUILD:STRING=X86 \
../llvm
RUN make -C llvm-build && make -C llvm-build install && rm -rf llvm-build
# Define the default command.
CMD ["bash"]

@ -41,5 +41,7 @@ cd /var/local/git/grpc
make install-certs
BUILD_TYPE=${BUILD_TYPE:=opt}
# build C++ interop stress client, interop client and server
make stress_test metrics_client interop_client interop_server
make CONFIG=$BUILD_TYPE stress_test metrics_client interop_client interop_server

@ -34,10 +34,12 @@
set -x
# Params:
# INTEROP_IMAGE - name of tag of the final interop image
# INTEROP_IMAGE - Name of tag of the final interop image
# INTEROP_IMAGE_TAG - Optional. If set, the created image will be tagged using
# the command: 'docker tag $INTEROP_IMAGE $INTEROP_IMAGE_REPOSITORY_TAG'
# BASE_NAME - base name used to locate the base Dockerfile and build script
# BASE_NAME - Base name used to locate the base Dockerfile and build script
# BUILD_TYPE - The 'CONFIG' variable passed to the 'make' command (example:
# asan, tsan. Default value: opt).
# TTY_FLAG - optional -t flag to make docker allocate tty
# BUILD_INTEROP_DOCKER_EXTRA_ARGS - optional args to be passed to the
# docker run command
@ -71,6 +73,7 @@ CONTAINER_NAME="build_${BASE_NAME}_$(uuidgen)"
(docker run \
-e CCACHE_DIR=/tmp/ccache \
-e THIS_IS_REALLY_NEEDED='see https://github.com/docker/docker/issues/14203 for why docker is awful' \
-e BUILD_TYPE=${BUILD_TYPE:=opt} \
-i $TTY_FLAG \
$MOUNT_ARGS \
$BUILD_INTEROP_DOCKER_EXTRA_ARGS \

@ -67,8 +67,10 @@ The script has several parameters and you can find out more details by using the
- `<grpc_root_dir>$ tools/run_tests/stress_test/run_stress_tests_on_gke.py --help`
> **Example**
> `$ tools/run_tests/stress_test/run_stress_tests_on_gke.py --project_id=sree-gce --test_duration_secs=180 --num_clients=5`
> ```bash
> $ # Change to the grpc root directory
> $ cd $GRPC_ROOT
> $ tools/run_tests/stress_test/run_on_gke.py --project_id=sree-gce --config_file=tools/run_tests/stress_test/configs/opt.json
> ```
> Launches the 5 instances of stress test clients, 1 instance of stress test server and runs the test for 180 seconds. The test would be run on the default container cluster (that you have set in `gcloud`) in the project `sree-gce`.
> Note: we currently do not have the ability to launch multiple instances of the server. This can be added very easily in future
> The above runs the stress test on GKE under the project `sree-gce` in the default cluster (that you set by `gcloud` command earlier). The test settings (like number of client instances, servers, the parmeters to pass, test cases etc) are all loaded from the config file `$GRPC_ROOT/tools/run_tests/stress_test/opt.json`

@ -0,0 +1,135 @@
{
"dockerImages": {
"grpc_stress_cxx_opt" : {
"buildScript": "tools/jenkins/build_interop_stress_image.sh",
"dockerFileDir": "grpc_interop_stress_cxx",
"buildType": "opt"
},
"grpc_stress_cxx_tsan": {
"buildScript": "tools/jenkins/build_interop_stress_image.sh",
"dockerFileDir": "grpc_interop_stress_cxx",
"buildType": "tsan"
},
"grpc_stress_cxx_asan": {
"buildScript": "tools/jenkins/build_interop_stress_image.sh",
"dockerFileDir": "grpc_interop_stress_cxx",
"buildType": "asan"
}
},
"clientTemplates": {
"baseTemplates": {
"default": {
"wrapperScriptPath": "/var/local/git/grpc/tools/gcp/stress_test/run_client.py",
"pollIntervalSecs": 60,
"clientArgs": {
"num_channels_per_server":5,
"num_stubs_per_channel":10,
"test_cases": "empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1",
"metrics_port": 8081,
"metrics_collection_interval_secs":60
},
"metricsPort": 8081,
"metricsArgs": {
"metrics_server_address": "localhost:8081",
"total_only": "true"
}
}
},
"templates": {
"cxx_client_opt": {
"baseTemplate": "default",
"clientImagePath": "/var/local/git/grpc/bins/opt/stress_test",
"metricsClientImagePath": "/var/local/git/grpc/bins/opt/metrics_client"
},
"cxx_client_tsan": {
"baseTemplate": "default",
"clientImagePath": "/var/local/git/grpc/bins/tsan/stress_test",
"metricsClientImagePath": "/var/local/git/grpc/bins/tsan/metrics_client"
},
"cxx_client_asan": {
"baseTemplate": "default",
"clientImagePath": "/var/local/git/grpc/bins/asan/stress_test",
"metricsClientImagePath": "/var/local/git/grpc/bins/asan/metrics_client"
}
}
},
"serverTemplates": {
"baseTemplates":{
"default": {
"wrapperScriptPath": "/var/local/git/grpc/tools/gcp/stress_test/run_server.py",
"serverPort": 8080,
"serverArgs": {
"port": 8080
}
}
},
"templates": {
"cxx_server_opt": {
"baseTemplate": "default",
"serverImagePath": "/var/local/git/grpc/bins/opt/interop_server"
},
"cxx_server_tsan": {
"baseTemplate": "default",
"serverImagePath": "/var/local/git/grpc/bins/tsan/interop_server"
},
"cxx_server_asan": {
"baseTemplate": "default",
"serverImagePath": "/var/local/git/grpc/bins/asan/interop_server"
}
}
},
"testMatrix": {
"serverPodSpecs": {
"stress-server-opt": {
"serverTemplate": "cxx_server_opt",
"dockerImage": "grpc_stress_cxx_opt",
"numInstances": 1
},
"stress-server-tsan": {
"serverTemplate": "cxx_server_tsan",
"dockerImage": "grpc_stress_cxx_tsan",
"numInstances": 1
},
"stress-server-asan": {
"serverTemplate": "cxx_server_asan",
"dockerImage": "grpc_stress_cxx_asan",
"numInstances": 1
}
},
"clientPodSpecs": {
"stress-client-opt": {
"clientTemplate": "cxx_client_opt",
"dockerImage": "grpc_stress_cxx_opt",
"numInstances": 3,
"serverPodSpec": "stress-server-opt"
},
"stress-client-tsan": {
"clientTemplate": "cxx_client_tsan",
"dockerImage": "grpc_stress_cxx_tsan",
"numInstances": 3,
"serverPodSpec": "stress-server-tsan"
},
"stress-client-asan": {
"clientTemplate": "cxx_client_asan",
"dockerImage": "grpc_stress_cxx_asan",
"numInstances": 3,
"serverPodSpec": "stress-server-asan"
}
}
},
"globalSettings": {
"buildDockerImages": true,
"pollIntervalSecs": 60,
"testDurationSecs": 7200,
"kubernetesProxyPort": 8001,
"datasetIdNamePrefix": "stress_test_opt_tsan",
"summaryTableId": "summary",
"qpsTableId": "qps",
"podWarmupSecs": 60
}
}

@ -0,0 +1,86 @@
{
"dockerImages": {
"grpc_stress_cxx_opt" : {
"buildScript": "tools/jenkins/build_interop_stress_image.sh",
"dockerFileDir": "grpc_interop_stress_cxx",
"buildType": "opt"
}
},
"clientTemplates": {
"baseTemplates": {
"default": {
"wrapperScriptPath": "/var/local/git/grpc/tools/gcp/stress_test/run_client.py",
"pollIntervalSecs": 60,
"clientArgs": {
"num_channels_per_server":5,
"num_stubs_per_channel":10,
"test_cases": "empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1",
"metrics_port": 8081,
"metrics_collection_interval_secs":60
},
"metricsPort": 8081,
"metricsArgs": {
"metrics_server_address": "localhost:8081",
"total_only": "true"
}
}
},
"templates": {
"cxx_client_opt": {
"baseTemplate": "default",
"clientImagePath": "/var/local/git/grpc/bins/opt/stress_test",
"metricsClientImagePath": "/var/local/git/grpc/bins/opt/metrics_client"
}
}
},
"serverTemplates": {
"baseTemplates":{
"default": {
"wrapperScriptPath": "/var/local/git/grpc/tools/gcp/stress_test/run_server.py",
"serverPort": 8080,
"serverArgs": {
"port": 8080
}
}
},
"templates": {
"cxx_server_opt": {
"baseTemplate": "default",
"serverImagePath": "/var/local/git/grpc/bins/opt/interop_server"
}
}
},
"testMatrix": {
"serverPodSpecs": {
"stress-server-opt": {
"serverTemplate": "cxx_server_opt",
"dockerImage": "grpc_stress_cxx_opt",
"numInstances": 1
}
},
"clientPodSpecs": {
"stress-client-opt": {
"clientTemplate": "cxx_client_opt",
"dockerImage": "grpc_stress_cxx_opt",
"numInstances": 10,
"serverPodSpec": "stress-server-opt"
}
}
},
"globalSettings": {
"buildDockerImages": true,
"pollIntervalSecs": 10,
"testDurationSecs": 120,
"kubernetesProxyPort": 8001,
"datasetIdNamePrefix": "stress_test_opt",
"summaryTableId": "summary",
"qpsTableId": "qps",
"podWarmupSecs": 60
}
}

@ -0,0 +1,636 @@
#!/usr/bin/env python2.7
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import argparse
import datetime
import json
import os
import subprocess
import sys
import time
stress_test_utils_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), '../../gcp/stress_test'))
sys.path.append(stress_test_utils_dir)
from stress_test_utils import BigQueryHelper
kubernetes_api_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), '../../gcp/utils'))
sys.path.append(kubernetes_api_dir)
import kubernetes_api
class GlobalSettings:
def __init__(self, gcp_project_id, build_docker_images,
test_poll_interval_secs, test_duration_secs,
kubernetes_proxy_port, dataset_id_prefix, summary_table_id,
qps_table_id, pod_warmup_secs):
self.gcp_project_id = gcp_project_id
self.build_docker_images = build_docker_images
self.test_poll_interval_secs = test_poll_interval_secs
self.test_duration_secs = test_duration_secs
self.kubernetes_proxy_port = kubernetes_proxy_port
self.dataset_id_prefix = dataset_id_prefix
self.summary_table_id = summary_table_id
self.qps_table_id = qps_table_id
self.pod_warmup_secs = pod_warmup_secs
class ClientTemplate:
""" Contains all the common settings that are used by a stress client """
def __init__(self, name, client_image_path, metrics_client_image_path,
metrics_port, wrapper_script_path, poll_interval_secs,
client_args_dict, metrics_args_dict):
self.name = name
self.client_image_path = client_image_path
self.metrics_client_image_path = metrics_client_image_path
self.metrics_port = metrics_port
self.wrapper_script_path = wrapper_script_path
self.poll_interval_secs = poll_interval_secs
self.client_args_dict = client_args_dict
self.metrics_args_dict = metrics_args_dict
class ServerTemplate:
""" Contains all the common settings used by a stress server """
def __init__(self, name, server_image_path, wrapper_script_path, server_port,
server_args_dict):
self.name = name
self.server_image_path = server_image_path
self.wrapper_script_path = wrapper_script_path
self.server_port = server_port
self.server_args_dict = server_args_dict
class DockerImage:
""" Represents properties of a Docker image. Provides methods to build the
image and push it to GKE registry
"""
def __init__(self, gcp_project_id, image_name, build_script_path,
dockerfile_dir, build_type):
"""Args:
image_name: The docker image name
tag_name: The additional tag name. This is the name used when pushing the
docker image to GKE registry
build_script_path: The path to the build script that builds this docker
image
dockerfile_dir: The name of the directory under
'<grpc_root>/tools/dockerfile' that contains the dockerfile
"""
self.image_name = image_name
self.gcp_project_id = gcp_project_id
self.build_script_path = build_script_path
self.dockerfile_dir = dockerfile_dir
self.build_type = build_type
self.tag_name = self._make_tag_name(gcp_project_id, image_name)
def _make_tag_name(self, project_id, image_name):
return 'gcr.io/%s/%s' % (project_id, image_name)
def build_image(self):
print 'Building docker image: %s (tag: %s)' % (self.image_name,
self.tag_name)
os.environ['INTEROP_IMAGE'] = self.image_name
os.environ['INTEROP_IMAGE_REPOSITORY_TAG'] = self.tag_name
os.environ['BASE_NAME'] = self.dockerfile_dir
os.environ['BUILD_TYPE'] = self.build_type
print 'DEBUG: path: ', self.build_script_path
if subprocess.call(args=[self.build_script_path]) != 0:
print 'Error in building the Docker image'
return False
return True
def push_to_gke_registry(self):
cmd = ['gcloud', 'docker', 'push', self.tag_name]
print 'Pushing %s to the GKE registry..' % self.tag_name
if subprocess.call(args=cmd) != 0:
print 'Error in pushing the image %s to the GKE registry' % self.tag_name
return False
return True
class ServerPodSpec:
""" Contains the information required to launch server pods. """
def __init__(self, name, server_template, docker_image, num_instances):
self.name = name
self.template = server_template
self.docker_image = docker_image
self.num_instances = num_instances
def pod_names(self):
""" Return a list of names of server pods to create. """
return ['%s-%d' % (self.name, i) for i in range(1, self.num_instances + 1)]
def server_addresses(self):
""" Return string of server addresses in the following format:
'<server_pod_name_1>:<server_port>,<server_pod_name_2>:<server_port>...'
"""
return ','.join(['%s:%d' % (pod_name, self.template.server_port)
for pod_name in self.pod_names()])
class ClientPodSpec:
""" Contains the information required to launch client pods """
def __init__(self, name, client_template, docker_image, num_instances,
server_addresses):
self.name = name
self.template = client_template
self.docker_image = docker_image
self.num_instances = num_instances
self.server_addresses = server_addresses
def pod_names(self):
""" Return a list of names of client pods to create """
return ['%s-%d' % (self.name, i) for i in range(1, self.num_instances + 1)]
# The client args in the template do not have server addresses. This function
# adds the server addresses and returns the updated client args
def get_client_args_dict(self):
args_dict = self.template.client_args_dict.copy()
args_dict['server_addresses'] = self.server_addresses
return args_dict
class Gke:
""" Class that has helper methods to interact with GKE """
class KubernetesProxy:
"""Class to start a proxy on localhost to talk to the Kubernetes API server"""
def __init__(self, port):
cmd = ['kubectl', 'proxy', '--port=%d' % port]
self.p = subprocess.Popen(args=cmd)
time.sleep(2)
print '\nStarted kubernetes proxy on port: %d' % port
def __del__(self):
if self.p is not None:
print 'Shutting down Kubernetes proxy..'
self.p.kill()
def __init__(self, project_id, run_id, dataset_id, summary_table_id,
qps_table_id, kubernetes_port):
self.project_id = project_id
self.run_id = run_id
self.dataset_id = dataset_id
self.summary_table_id = summary_table_id
self.qps_table_id = qps_table_id
# The environment variables we would like to pass to every pod (both client
# and server) launched in GKE
self.gke_env = {
'RUN_ID': self.run_id,
'GCP_PROJECT_ID': self.project_id,
'DATASET_ID': self.dataset_id,
'SUMMARY_TABLE_ID': self.summary_table_id,
'QPS_TABLE_ID': self.qps_table_id
}
self.kubernetes_port = kubernetes_port
# Start kubernetes proxy
self.kubernetes_proxy = Gke.KubernetesProxy(kubernetes_port)
def _args_dict_to_str(self, args_dict):
return ' '.join('--%s=%s' % (k, args_dict[k]) for k in args_dict.keys())
def launch_servers(self, server_pod_spec):
is_success = True
# The command to run inside the container is the wrapper script (which then
# launches the actual server)
container_cmd = server_pod_spec.template.wrapper_script_path
# The parameters to the wrapper script (defined in
# server_pod_spec.template.wrapper_script_path) are are injected into the
# container via environment variables
server_env = self.gke_env.copy()
server_env.update({
'STRESS_TEST_IMAGE_TYPE': 'SERVER',
'STRESS_TEST_IMAGE': server_pod_spec.template.server_image_path,
'STRESS_TEST_ARGS_STR': self._args_dict_to_str(
server_pod_spec.template.server_args_dict)
})
for pod_name in server_pod_spec.pod_names():
server_env['POD_NAME'] = pod_name
print 'Creating server: %s' % pod_name
is_success = kubernetes_api.create_pod_and_service(
'localhost',
self.kubernetes_port,
'default', # Use 'default' namespace
pod_name,
server_pod_spec.docker_image.tag_name,
[server_pod_spec.template.server_port], # Ports to expose on the pod
[container_cmd],
[], # Args list is empty since we are passing all args via env variables
server_env,
True # Headless = True for server to that GKE creates a DNS record for pod_name
)
if not is_success:
print 'Error in launching server: %s' % pod_name
break
if is_success:
print 'Successfully created server(s)'
return is_success
def launch_clients(self, client_pod_spec):
is_success = True
# The command to run inside the container is the wrapper script (which then
# launches the actual stress client)
container_cmd = client_pod_spec.template.wrapper_script_path
# The parameters to the wrapper script (defined in
# client_pod_spec.template.wrapper_script_path) are are injected into the
# container via environment variables
client_env = self.gke_env.copy()
client_env.update({
'STRESS_TEST_IMAGE_TYPE': 'CLIENT',
'STRESS_TEST_IMAGE': client_pod_spec.template.client_image_path,
'STRESS_TEST_ARGS_STR': self._args_dict_to_str(
client_pod_spec.get_client_args_dict()),
'METRICS_CLIENT_IMAGE':
client_pod_spec.template.metrics_client_image_path,
'METRICS_CLIENT_ARGS_STR': self._args_dict_to_str(
client_pod_spec.template.metrics_args_dict),
'POLL_INTERVAL_SECS': str(client_pod_spec.template.poll_interval_secs)
})
for pod_name in client_pod_spec.pod_names():
client_env['POD_NAME'] = pod_name
print 'Creating client: %s' % pod_name
is_success = kubernetes_api.create_pod_and_service(
'localhost',
self.kubernetes_port,
'default', # default namespace,
pod_name,
client_pod_spec.docker_image.tag_name,
[client_pod_spec.template.metrics_port], # Ports to expose on the pod
[container_cmd],
[], # Empty args list since all args are passed via env variables
client_env,
False # Client is not a headless service.
)
if not is_success:
print 'Error in launching client %s' % pod_name
break
if is_success:
print 'Successfully created all client(s)'
return is_success
def _delete_pods(self, pod_name_list):
is_success = True
for pod_name in pod_name_list:
print 'Deleting %s' % pod_name
is_success = kubernetes_api.delete_pod_and_service(
'localhost',
self.kubernetes_port,
'default', # default namespace
pod_name)
if not is_success:
print 'Error in deleting pod %s' % pod_name
break
if is_success:
print 'Successfully deleted all pods'
return is_success
def delete_servers(self, server_pod_spec):
return self._delete_pods(server_pod_spec.pod_names())
def delete_clients(self, client_pod_spec):
return self._delete_pods(client_pod_spec.pod_names())
class Config:
def __init__(self, config_filename, gcp_project_id):
print 'Loading configuration...'
config_dict = self._load_config(config_filename)
self.global_settings = self._parse_global_settings(config_dict,
gcp_project_id)
self.docker_images_dict = self._parse_docker_images(
config_dict, self.global_settings.gcp_project_id)
self.client_templates_dict = self._parse_client_templates(config_dict)
self.server_templates_dict = self._parse_server_templates(config_dict)
self.server_pod_specs_dict = self._parse_server_pod_specs(
config_dict, self.docker_images_dict, self.server_templates_dict)
self.client_pod_specs_dict = self._parse_client_pod_specs(
config_dict, self.docker_images_dict, self.client_templates_dict,
self.server_pod_specs_dict)
print 'Loaded Configuaration.'
def _parse_global_settings(self, config_dict, gcp_project_id):
global_settings_dict = config_dict['globalSettings']
return GlobalSettings(gcp_project_id,
global_settings_dict['buildDockerImages'],
global_settings_dict['pollIntervalSecs'],
global_settings_dict['testDurationSecs'],
global_settings_dict['kubernetesProxyPort'],
global_settings_dict['datasetIdNamePrefix'],
global_settings_dict['summaryTableId'],
global_settings_dict['qpsTableId'],
global_settings_dict['podWarmupSecs'])
def _parse_docker_images(self, config_dict, gcp_project_id):
"""Parses the 'dockerImages' section of the config file and returns a
Dictionary of 'DockerImage' objects keyed by docker image names"""
docker_images_dict = {}
docker_config_dict = config_dict['dockerImages']
for image_name in docker_config_dict.keys():
build_script_path = docker_config_dict[image_name]['buildScript']
dockerfile_dir = docker_config_dict[image_name]['dockerFileDir']
build_type = docker_config_dict[image_name]['buildType']
docker_images_dict[image_name] = DockerImage(gcp_project_id, image_name,
build_script_path,
dockerfile_dir, build_type)
return docker_images_dict
def _parse_client_templates(self, config_dict):
"""Parses the 'clientTemplates' section of the config file and returns a
Dictionary of 'ClientTemplate' objects keyed by client template names.
Note: The 'baseTemplates' sub section of the config file contains templates
with default values and the 'templates' sub section contains the actual
client templates (which refer to the base template name to use for default
values).
"""
client_templates_dict = {}
templates_dict = config_dict['clientTemplates']['templates']
base_templates_dict = config_dict['clientTemplates'].get('baseTemplates',
{})
for template_name in templates_dict.keys():
# temp_dict is a temporary dictionary that merges base template dictionary
# and client template dictionary (with client template dictionary values
# overriding base template values)
temp_dict = {}
base_template_name = templates_dict[template_name].get('baseTemplate')
if not base_template_name is None:
temp_dict = base_templates_dict[base_template_name].copy()
temp_dict.update(templates_dict[template_name])
# Create and add ClientTemplate object to the final client_templates_dict
client_templates_dict[template_name] = ClientTemplate(
template_name, temp_dict['clientImagePath'],
temp_dict['metricsClientImagePath'], temp_dict['metricsPort'],
temp_dict['wrapperScriptPath'], temp_dict['pollIntervalSecs'],
temp_dict['clientArgs'].copy(), temp_dict['metricsArgs'].copy())
return client_templates_dict
def _parse_server_templates(self, config_dict):
"""Parses the 'serverTemplates' section of the config file and returns a
Dictionary of 'serverTemplate' objects keyed by server template names.
Note: The 'baseTemplates' sub section of the config file contains templates
with default values and the 'templates' sub section contains the actual
server templates (which refer to the base template name to use for default
values).
"""
server_templates_dict = {}
templates_dict = config_dict['serverTemplates']['templates']
base_templates_dict = config_dict['serverTemplates'].get('baseTemplates',
{})
for template_name in templates_dict.keys():
# temp_dict is a temporary dictionary that merges base template dictionary
# and server template dictionary (with server template dictionary values
# overriding base template values)
temp_dict = {}
base_template_name = templates_dict[template_name].get('baseTemplate')
if not base_template_name is None:
temp_dict = base_templates_dict[base_template_name].copy()
temp_dict.update(templates_dict[template_name])
# Create and add ServerTemplate object to the final server_templates_dict
server_templates_dict[template_name] = ServerTemplate(
template_name, temp_dict['serverImagePath'],
temp_dict['wrapperScriptPath'], temp_dict['serverPort'],
temp_dict['serverArgs'].copy())
return server_templates_dict
def _parse_server_pod_specs(self, config_dict, docker_images_dict,
server_templates_dict):
"""Parses the 'serverPodSpecs' sub-section (under 'testMatrix' section) of
the config file and returns a Dictionary of 'ServerPodSpec' objects keyed
by server pod spec names"""
server_pod_specs_dict = {}
pod_specs_dict = config_dict['testMatrix'].get('serverPodSpecs', {})
for pod_name in pod_specs_dict.keys():
server_template_name = pod_specs_dict[pod_name]['serverTemplate']
docker_image_name = pod_specs_dict[pod_name]['dockerImage']
num_instances = pod_specs_dict[pod_name].get('numInstances', 1)
# Create and add the ServerPodSpec object to the final
# server_pod_specs_dict
server_pod_specs_dict[pod_name] = ServerPodSpec(
pod_name, server_templates_dict[server_template_name],
docker_images_dict[docker_image_name], num_instances)
return server_pod_specs_dict
def _parse_client_pod_specs(self, config_dict, docker_images_dict,
client_templates_dict, server_pod_specs_dict):
"""Parses the 'clientPodSpecs' sub-section (under 'testMatrix' section) of
the config file and returns a Dictionary of 'ClientPodSpec' objects keyed
by client pod spec names"""
client_pod_specs_dict = {}
pod_specs_dict = config_dict['testMatrix'].get('clientPodSpecs', {})
for pod_name in pod_specs_dict.keys():
client_template_name = pod_specs_dict[pod_name]['clientTemplate']
docker_image_name = pod_specs_dict[pod_name]['dockerImage']
num_instances = pod_specs_dict[pod_name]['numInstances']
# Get the server addresses from the server pod spec object
server_pod_spec_name = pod_specs_dict[pod_name]['serverPodSpec']
server_addresses = server_pod_specs_dict[
server_pod_spec_name].server_addresses()
client_pod_specs_dict[pod_name] = ClientPodSpec(
pod_name, client_templates_dict[client_template_name],
docker_images_dict[docker_image_name], num_instances,
server_addresses)
return client_pod_specs_dict
def _load_config(self, config_filename):
"""Opens the config file and converts the Json text to Dictionary"""
if not os.path.isabs(config_filename):
raise Exception('Config objects expects an absolute file path. '
'config file name passed: %s' % config_filename)
with open(config_filename) as config_file:
return json.load(config_file)
def run_tests(config):
""" The main function that launches the stress tests """
# Build docker images and push to GKE registry
if config.global_settings.build_docker_images:
for name, docker_image in config.docker_images_dict.iteritems():
if not (docker_image.build_image() and
docker_image.push_to_gke_registry()):
return False
# Create a unique id for this run (Note: Using timestamp instead of UUID to
# make it easier to deduce the date/time of the run just by looking at the run
# run id. This is useful in debugging when looking at records in Biq query)
run_id = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
dataset_id = '%s_%s' % (config.global_settings.dataset_id_prefix, run_id)
bq_helper = BigQueryHelper(run_id, '', '',
config.global_settings.gcp_project_id, dataset_id,
config.global_settings.summary_table_id,
config.global_settings.qps_table_id)
bq_helper.initialize()
gke = Gke(config.global_settings.gcp_project_id, run_id, dataset_id,
config.global_settings.summary_table_id,
config.global_settings.qps_table_id,
config.global_settings.kubernetes_proxy_port)
is_success = True
try:
print 'Launching servers..'
for name, server_pod_spec in config.server_pod_specs_dict.iteritems():
if not gke.launch_servers(server_pod_spec):
is_success = False # is_success is checked in the 'finally' block
return False
print('Launched servers. Waiting for %d seconds for the server pods to be '
'fully online') % config.global_settings.pod_warmup_secs
time.sleep(config.global_settings.pod_warmup_secs)
for name, client_pod_spec in config.client_pod_specs_dict.iteritems():
if not gke.launch_clients(client_pod_spec):
is_success = False # is_success is checked in the 'finally' block
return False
print('Launched all clients. Waiting for %d seconds for the client pods to '
'be fully online') % config.global_settings.pod_warmup_secs
time.sleep(config.global_settings.pod_warmup_secs)
start_time = datetime.datetime.now()
end_time = start_time + datetime.timedelta(
seconds=config.global_settings.test_duration_secs)
print 'Running the test until %s' % end_time.isoformat()
while True:
if datetime.datetime.now() > end_time:
print 'Test was run for %d seconds' % config.global_settings.test_duration_secs
break
# Check if either stress server or clients have failed (btw, the bq_helper
# monitors all the rows in the summary table and checks if any of them
# have a failure status)
if bq_helper.check_if_any_tests_failed():
is_success = False
print 'Some tests failed.'
break # Don't 'return' here. We still want to call bq_helper to print qps/summary tables
# Tests running fine. Wait until next poll time to check the status
print 'Sleeping for %d seconds..' % config.global_settings.test_poll_interval_secs
time.sleep(config.global_settings.test_poll_interval_secs)
# Print BiqQuery tables
bq_helper.print_qps_records()
bq_helper.print_summary_records()
finally:
# If there was a test failure, we should not delete the pods since they
# would contain useful debug information (logs, core dumps etc)
if is_success:
for name, server_pod_spec in config.server_pod_specs_dict.iteritems():
gke.delete_servers(server_pod_spec)
for name, client_pod_spec in config.client_pod_specs_dict.iteritems():
gke.delete_clients(client_pod_spec)
return is_success
argp = argparse.ArgumentParser(
description='Launch stress tests in GKE',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
argp.add_argument('--gcp_project_id',
required=True,
help='The Google Cloud Platform Project Id')
argp.add_argument('--config_file',
required=True,
type=str,
help='The test config file')
if __name__ == '__main__':
args = argp.parse_args()
config_filename = args.config_file
# Since we will be changing the current working directory to grpc root in the
# next step, we should check if the config filename path is a relative path
# (i.e a path relative to the current working directory) and if so, convert it
# to abosulte path
if not os.path.isabs(config_filename):
config_filename = os.path.abspath(config_filename)
config = Config(config_filename, args.gcp_project_id)
# Change current working directory to grpc root
# (This is important because all relative file paths in the config file are
# supposed to interpreted as relative to the GRPC root)
grpc_root = os.path.abspath(os.path.join(
os.path.dirname(sys.argv[0]), '../../..'))
os.chdir(grpc_root)
run_tests(config)

@ -1,556 +0,0 @@
#!/usr/bin/env python2.7
# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import argparse
import datetime
import os
import subprocess
import sys
import time
stress_test_utils_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), '../../gcp/stress_test'))
sys.path.append(stress_test_utils_dir)
from stress_test_utils import BigQueryHelper
kubernetes_api_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), '../../gcp/utils'))
sys.path.append(kubernetes_api_dir)
import kubernetes_api
_GRPC_ROOT = os.path.abspath(os.path.join(
os.path.dirname(sys.argv[0]), '../../..'))
os.chdir(_GRPC_ROOT)
# num of seconds to wait for the GKE image to start and warmup
_GKE_IMAGE_WARMUP_WAIT_SECS = 60
_SERVER_POD_NAME = 'stress-server'
_CLIENT_POD_NAME_PREFIX = 'stress-client'
_DATASET_ID_PREFIX = 'stress_test'
_SUMMARY_TABLE_ID = 'summary'
_QPS_TABLE_ID = 'qps'
_DEFAULT_DOCKER_IMAGE_NAME = 'grpc_stress_test'
# The default port on which the kubernetes proxy server is started on localhost
# (i.e kubectl proxy --port=<port>)
_DEFAULT_KUBERNETES_PROXY_PORT = 8001
# How frequently should the stress client wrapper script (running inside a GKE
# container) poll the health of the stress client (also running inside the GKE
# container) and upload metrics to BigQuery
_DEFAULT_STRESS_CLIENT_POLL_INTERVAL_SECS = 60
# The default setting for stress test server and client
_DEFAULT_STRESS_SERVER_PORT = 8080
_DEFAULT_METRICS_PORT = 8081
_DEFAULT_TEST_CASES_STR = 'empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1'
_DEFAULT_NUM_CHANNELS_PER_SERVER = 5
_DEFAULT_NUM_STUBS_PER_CHANNEL = 10
_DEFAULT_METRICS_COLLECTION_INTERVAL_SECS = 30
# Number of stress client instances to launch
_DEFAULT_NUM_CLIENTS = 3
# How frequently should this test monitor the health of Stress clients and
# Servers running in GKE
_DEFAULT_TEST_POLL_INTERVAL_SECS = 60
# Default run time for this test (2 hour)
_DEFAULT_TEST_DURATION_SECS = 7200
# The number of seconds it would take a GKE pod to warm up (i.e get to 'Running'
# state from the time of creation). Ideally this is something the test should
# automatically determine by using Kubernetes API to poll the pods status.
_DEFAULT_GKE_WARMUP_SECS = 60
class KubernetesProxy:
""" Class to start a proxy on localhost to the Kubernetes API server """
def __init__(self, api_port):
self.port = api_port
self.p = None
self.started = False
def start(self):
cmd = ['kubectl', 'proxy', '--port=%d' % self.port]
self.p = subprocess.Popen(args=cmd)
self.started = True
time.sleep(2)
print '..Started'
def get_port(self):
return self.port
def is_started(self):
return self.started
def __del__(self):
if self.p is not None:
print 'Shutting down Kubernetes proxy..'
self.p.kill()
class TestSettings:
def __init__(self, build_docker_image, test_poll_interval_secs,
test_duration_secs, kubernetes_proxy_port):
self.build_docker_image = build_docker_image
self.test_poll_interval_secs = test_poll_interval_secs
self.test_duration_secs = test_duration_secs
self.kubernetes_proxy_port = kubernetes_proxy_port
class GkeSettings:
def __init__(self, project_id, docker_image_name):
self.project_id = project_id
self.docker_image_name = docker_image_name
self.tag_name = 'gcr.io/%s/%s' % (project_id, docker_image_name)
class BigQuerySettings:
def __init__(self, run_id, dataset_id, summary_table_id, qps_table_id):
self.run_id = run_id
self.dataset_id = dataset_id
self.summary_table_id = summary_table_id
self.qps_table_id = qps_table_id
class StressServerSettings:
def __init__(self, server_pod_name, server_port):
self.server_pod_name = server_pod_name
self.server_port = server_port
class StressClientSettings:
def __init__(self, num_clients, client_pod_name_prefix, server_pod_name,
server_port, metrics_port, metrics_collection_interval_secs,
stress_client_poll_interval_secs, num_channels_per_server,
num_stubs_per_channel, test_cases_str):
self.num_clients = num_clients
self.client_pod_name_prefix = client_pod_name_prefix
self.server_pod_name = server_pod_name
self.server_port = server_port
self.metrics_port = metrics_port
self.metrics_collection_interval_secs = metrics_collection_interval_secs
self.stress_client_poll_interval_secs = stress_client_poll_interval_secs
self.num_channels_per_server = num_channels_per_server
self.num_stubs_per_channel = num_stubs_per_channel
self.test_cases_str = test_cases_str
# == Derived properties ==
# Note: Client can accept a list of server addresses (a comma separated list
# of 'server_name:server_port'). In this case, we only have one server
# address to pass
self.server_addresses = '%s.default.svc.cluster.local:%d' % (
server_pod_name, server_port)
self.client_pod_names_list = ['%s-%d' % (client_pod_name_prefix, i)
for i in range(1, num_clients + 1)]
def _build_docker_image(image_name, tag_name):
""" Build the docker image and add tag it to the GKE repository """
print 'Building docker image: %s' % image_name
os.environ['INTEROP_IMAGE'] = image_name
os.environ['INTEROP_IMAGE_REPOSITORY_TAG'] = tag_name
# Note that 'BASE_NAME' HAS to be 'grpc_interop_stress_cxx' since the script
# build_interop_stress_image.sh invokes the following script:
# tools/dockerfile/$BASE_NAME/build_interop_stress.sh
os.environ['BASE_NAME'] = 'grpc_interop_stress_cxx'
cmd = ['tools/jenkins/build_interop_stress_image.sh']
retcode = subprocess.call(args=cmd)
if retcode != 0:
print 'Error in building docker image'
return False
return True
def _push_docker_image_to_gke_registry(docker_tag_name):
"""Executes 'gcloud docker push <docker_tag_name>' to push the image to GKE registry"""
cmd = ['gcloud', 'docker', 'push', docker_tag_name]
print 'Pushing %s to GKE registry..' % docker_tag_name
retcode = subprocess.call(args=cmd)
if retcode != 0:
print 'Error in pushing docker image %s to the GKE registry' % docker_tag_name
return False
return True
def _launch_server(gke_settings, stress_server_settings, bq_settings,
kubernetes_proxy):
""" Launches a stress test server instance in GKE cluster """
if not kubernetes_proxy.is_started:
print 'Kubernetes proxy must be started before calling this function'
return False
# This is the wrapper script that is run in the container. This script runs
# the actual stress test server
server_cmd_list = ['/var/local/git/grpc/tools/gcp/stress_test/run_server.py']
# run_server.py does not take any args from the command line. The args are
# instead passed via environment variables (see server_env below)
server_arg_list = []
# The parameters to the script run_server.py are injected into the container
# via environment variables
server_env = {
'STRESS_TEST_IMAGE_TYPE': 'SERVER',
'STRESS_TEST_IMAGE': '/var/local/git/grpc/bins/opt/interop_server',
'STRESS_TEST_ARGS_STR': '--port=%s' % stress_server_settings.server_port,
'RUN_ID': bq_settings.run_id,
'POD_NAME': stress_server_settings.server_pod_name,
'GCP_PROJECT_ID': gke_settings.project_id,
'DATASET_ID': bq_settings.dataset_id,
'SUMMARY_TABLE_ID': bq_settings.summary_table_id,
'QPS_TABLE_ID': bq_settings.qps_table_id
}
# Launch Server
is_success = kubernetes_api.create_pod_and_service(
'localhost',
kubernetes_proxy.get_port(),
'default', # Use 'default' namespace
stress_server_settings.server_pod_name,
gke_settings.tag_name,
[stress_server_settings.server_port], # Port that should be exposed
server_cmd_list,
server_arg_list,
server_env,
True # Headless = True for server. Since we want DNS records to be created by GKE
)
return is_success
def _launch_client(gke_settings, stress_server_settings, stress_client_settings,
bq_settings, kubernetes_proxy):
""" Launches a configurable number of stress test clients on GKE cluster """
if not kubernetes_proxy.is_started:
print 'Kubernetes proxy must be started before calling this function'
return False
stress_client_arg_list = [
'--server_addresses=%s' % stress_client_settings.server_addresses,
'--test_cases=%s' % stress_client_settings.test_cases_str,
'--num_stubs_per_channel=%d' %
stress_client_settings.num_stubs_per_channel
]
# This is the wrapper script that is run in the container. This script runs
# the actual stress client
client_cmd_list = ['/var/local/git/grpc/tools/gcp/stress_test/run_client.py']
# run_client.py takes no args. All args are passed as env variables (see
# client_env)
client_arg_list = []
metrics_server_address = 'localhost:%d' % stress_client_settings.metrics_port
metrics_client_arg_list = [
'--metrics_server_address=%s' % metrics_server_address,
'--total_only=true'
]
# The parameters to the script run_client.py are injected into the container
# via environment variables
client_env = {
'STRESS_TEST_IMAGE_TYPE': 'CLIENT',
'STRESS_TEST_IMAGE': '/var/local/git/grpc/bins/opt/stress_test',
'STRESS_TEST_ARGS_STR': ' '.join(stress_client_arg_list),
'METRICS_CLIENT_IMAGE': '/var/local/git/grpc/bins/opt/metrics_client',
'METRICS_CLIENT_ARGS_STR': ' '.join(metrics_client_arg_list),
'RUN_ID': bq_settings.run_id,
'POLL_INTERVAL_SECS':
str(stress_client_settings.stress_client_poll_interval_secs),
'GCP_PROJECT_ID': gke_settings.project_id,
'DATASET_ID': bq_settings.dataset_id,
'SUMMARY_TABLE_ID': bq_settings.summary_table_id,
'QPS_TABLE_ID': bq_settings.qps_table_id
}
for pod_name in stress_client_settings.client_pod_names_list:
client_env['POD_NAME'] = pod_name
is_success = kubernetes_api.create_pod_and_service(
'localhost', # Since proxy is running on localhost
kubernetes_proxy.get_port(),
'default', # default namespace
pod_name,
gke_settings.tag_name,
[stress_client_settings.metrics_port
], # Client pods expose metrics port
client_cmd_list,
client_arg_list,
client_env,
False # Client is not a headless service
)
if not is_success:
print 'Error in launching client %s' % pod_name
return False
return True
def _launch_server_and_client(gke_settings, stress_server_settings,
stress_client_settings, bq_settings,
kubernetes_proxy_port):
# Start kubernetes proxy
print 'Kubernetes proxy'
kubernetes_proxy = KubernetesProxy(kubernetes_proxy_port)
kubernetes_proxy.start()
print 'Launching server..'
is_success = _launch_server(gke_settings, stress_server_settings, bq_settings,
kubernetes_proxy)
if not is_success:
print 'Error in launching server'
return False
# Server takes a while to start.
# TODO(sree) Use Kubernetes API to query the status of the server instead of
# sleeping
print 'Waiting for %s seconds for the server to start...' % _GKE_IMAGE_WARMUP_WAIT_SECS
time.sleep(_GKE_IMAGE_WARMUP_WAIT_SECS)
# Launch client
client_pod_name_prefix = 'stress-client'
is_success = _launch_client(gke_settings, stress_server_settings,
stress_client_settings, bq_settings,
kubernetes_proxy)
if not is_success:
print 'Error in launching client(s)'
return False
print 'Waiting for %s seconds for the client images to start...' % _GKE_IMAGE_WARMUP_WAIT_SECS
time.sleep(_GKE_IMAGE_WARMUP_WAIT_SECS)
return True
def _delete_server_and_client(stress_server_settings, stress_client_settings,
kubernetes_proxy_port):
kubernetes_proxy = KubernetesProxy(kubernetes_proxy_port)
kubernetes_proxy.start()
# Delete clients first
is_success = True
for pod_name in stress_client_settings.client_pod_names_list:
is_success = kubernetes_api.delete_pod_and_service(
'localhost', kubernetes_proxy_port, 'default', pod_name)
if not is_success:
return False
# Delete server
is_success = kubernetes_api.delete_pod_and_service(
'localhost', kubernetes_proxy_port, 'default',
stress_server_settings.server_pod_name)
return is_success
def run_test_main(test_settings, gke_settings, stress_server_settings,
stress_client_clients):
is_success = True
if test_settings.build_docker_image:
is_success = _build_docker_image(gke_settings.docker_image_name,
gke_settings.tag_name)
if not is_success:
return False
is_success = _push_docker_image_to_gke_registry(gke_settings.tag_name)
if not is_success:
return False
# Create a unique id for this run (Note: Using timestamp instead of UUID to
# make it easier to deduce the date/time of the run just by looking at the run
# run id. This is useful in debugging when looking at records in Biq query)
run_id = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
dataset_id = '%s_%s' % (_DATASET_ID_PREFIX, run_id)
# Big Query settings (common for both Stress Server and Client)
bq_settings = BigQuerySettings(run_id, dataset_id, _SUMMARY_TABLE_ID,
_QPS_TABLE_ID)
bq_helper = BigQueryHelper(run_id, '', '', args.project_id, dataset_id,
_SUMMARY_TABLE_ID, _QPS_TABLE_ID)
bq_helper.initialize()
try:
is_success = _launch_server_and_client(gke_settings, stress_server_settings,
stress_client_settings, bq_settings,
test_settings.kubernetes_proxy_port)
if not is_success:
return False
start_time = datetime.datetime.now()
end_time = start_time + datetime.timedelta(
seconds=test_settings.test_duration_secs)
print 'Running the test until %s' % end_time.isoformat()
while True:
if datetime.datetime.now() > end_time:
print 'Test was run for %d seconds' % test_settings.test_duration_secs
break
# Check if either stress server or clients have failed
if bq_helper.check_if_any_tests_failed():
is_success = False
print 'Some tests failed.'
break
# Things seem to be running fine. Wait until next poll time to check the
# status
print 'Sleeping for %d seconds..' % test_settings.test_poll_interval_secs
time.sleep(test_settings.test_poll_interval_secs)
# Print BiqQuery tables
bq_helper.print_summary_records()
bq_helper.print_qps_records()
finally:
# If is_success is False at this point, it means that the stress tests were
# started successfully but failed while running the tests. In this case we
# do should not delete the pods (since they contain all the failure
# information)
if is_success:
_delete_server_and_client(stress_server_settings, stress_client_settings,
test_settings.kubernetes_proxy_port)
return is_success
argp = argparse.ArgumentParser(
description='Launch stress tests in GKE',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
argp.add_argument('--project_id',
required=True,
help='The Google Cloud Platform Project Id')
argp.add_argument('--num_clients',
default=1,
type=int,
help='Number of client instances to start')
argp.add_argument('--docker_image_name',
default=_DEFAULT_DOCKER_IMAGE_NAME,
help='The name of the docker image containing stress client '
'and stress servers')
argp.add_argument('--build_docker_image',
dest='build_docker_image',
action='store_true',
help='Build a docker image and push to Google Container '
'Registry')
argp.add_argument('--do_not_build_docker_image',
dest='build_docker_image',
action='store_false',
help='Do not build and push docker image to Google Container '
'Registry')
argp.set_defaults(build_docker_image=True)
argp.add_argument('--test_poll_interval_secs',
default=_DEFAULT_TEST_POLL_INTERVAL_SECS,
type=int,
help='How frequently should this script should monitor the '
'health of stress clients and servers running in the GKE '
'cluster')
argp.add_argument('--test_duration_secs',
default=_DEFAULT_TEST_DURATION_SECS,
type=int,
help='How long should this test be run')
argp.add_argument('--kubernetes_proxy_port',
default=_DEFAULT_KUBERNETES_PROXY_PORT,
type=int,
help='The port on which the kubernetes proxy (on localhost)'
' is started')
argp.add_argument('--stress_server_port',
default=_DEFAULT_STRESS_SERVER_PORT,
type=int,
help='The port on which the stress server (in GKE '
'containers) listens')
argp.add_argument('--stress_client_metrics_port',
default=_DEFAULT_METRICS_PORT,
type=int,
help='The port on which the stress clients (in GKE '
'containers) expose metrics')
argp.add_argument('--stress_client_poll_interval_secs',
default=_DEFAULT_STRESS_CLIENT_POLL_INTERVAL_SECS,
type=int,
help='How frequently should the stress client wrapper script'
' running inside GKE should monitor health of the actual '
' stress client process and upload the metrics to BigQuery')
argp.add_argument('--stress_client_metrics_collection_interval_secs',
default=_DEFAULT_METRICS_COLLECTION_INTERVAL_SECS,
type=int,
help='How frequently should metrics be collected in-memory on'
' the stress clients (running inside GKE containers). Note '
'that this is NOT the same as the upload-to-BigQuery '
'frequency. The metrics upload frequency is controlled by the'
' --stress_client_poll_interval_secs flag')
argp.add_argument('--stress_client_num_channels_per_server',
default=_DEFAULT_NUM_CHANNELS_PER_SERVER,
type=int,
help='The number of channels created to each server from a '
'stress client')
argp.add_argument('--stress_client_num_stubs_per_channel',
default=_DEFAULT_NUM_STUBS_PER_CHANNEL,
type=int,
help='The number of stubs created per channel. This number '
'indicates the max number of RPCs that can be made in '
'parallel on each channel at any given time')
argp.add_argument('--stress_client_test_cases',
default=_DEFAULT_TEST_CASES_STR,
help='List of test cases (with weights) to be executed by the'
' stress test client. The list is in the following format:\n'
' <testcase_1:w_1,<test_case2:w_2>..<testcase_n:w_n>\n'
' (Note: The weights do not have to add up to 100)')
if __name__ == '__main__':
args = argp.parse_args()
test_settings = TestSettings(
args.build_docker_image, args.test_poll_interval_secs,
args.test_duration_secs, args.kubernetes_proxy_port)
gke_settings = GkeSettings(args.project_id, args.docker_image_name)
stress_server_settings = StressServerSettings(_SERVER_POD_NAME,
args.stress_server_port)
stress_client_settings = StressClientSettings(
args.num_clients, _CLIENT_POD_NAME_PREFIX, _SERVER_POD_NAME,
args.stress_server_port, args.stress_client_metrics_port,
args.stress_client_metrics_collection_interval_secs,
args.stress_client_poll_interval_secs,
args.stress_client_num_channels_per_server,
args.stress_client_num_stubs_per_channel, args.stress_client_test_cases)
run_test_main(test_settings, gke_settings, stress_server_settings,
stress_client_settings)
Loading…
Cancel
Save