diff --git a/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs index 47a15224f16..1edeedae2fa 100644 --- a/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs +++ b/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs @@ -1,6 +1,6 @@ #region Copyright notice and license -// Copyright 2015, Google Inc. +// Copyright 2015-2016, Google Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -46,16 +46,13 @@ namespace Grpc.Testing /// public class BenchmarkServiceImpl : BenchmarkService.IBenchmarkService { - private readonly int responseSize; - - public BenchmarkServiceImpl(int responseSize) + public BenchmarkServiceImpl() { - this.responseSize = responseSize; } public Task UnaryCall(SimpleRequest request, ServerCallContext context) { - var response = new SimpleResponse { Payload = CreateZerosPayload(responseSize) }; + var response = new SimpleResponse { Payload = CreateZerosPayload(request.ResponseSize) }; return Task.FromResult(response); } @@ -63,7 +60,7 @@ namespace Grpc.Testing { await requestStream.ForEachAsync(async request => { - var response = new SimpleResponse { Payload = CreateZerosPayload(responseSize) }; + var response = new SimpleResponse { Payload = CreateZerosPayload(request.ResponseSize) }; await responseStream.WriteAsync(response); }); } diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index c4016012cbb..e6dc2321c4c 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -41,6 +41,7 @@ using System.Threading; using System.Threading.Tasks; using Google.Protobuf; using Grpc.Core; +using Grpc.Core.Logging; using Grpc.Core.Utils; using NUnit.Framework; using Grpc.Testing; @@ -50,42 +51,65 @@ namespace Grpc.IntegrationTesting /// /// Helper methods to start client runners for performance testing. /// - public static class ClientRunners + public class ClientRunners { + static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); + /// /// Creates a started client runner. /// public static IClientRunner CreateStarted(ClientConfig config) { + Logger.Debug("ClientConfig: {0}", config); string target = config.ServerTargets.Single(); - GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop); + GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop, + "Only closed loop scenario supported for C#"); + GrpcPreconditions.CheckArgument(config.ClientChannels == 1, "ClientConfig.ClientChannels needs to be 1"); - var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure; - var channel = new Channel(target, credentials); + if (config.OutstandingRpcsPerChannel != 0) + { + Logger.Warning("ClientConfig.OutstandingRpcsPerChannel is not supported for C#. Ignoring the value"); + } + if (config.AsyncClientThreads != 0) + { + Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value"); + } + if (config.CoreLimit != 0) + { + Logger.Warning("ClientConfig.CoreLimit is not supported for C#. Ignoring the value"); + } + if (config.CoreList.Count > 0) + { + Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value"); + } - switch (config.RpcType) + var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure; + List channelOptions = null; + if (config.SecurityParams != null && config.SecurityParams.ServerHostOverride != "") { - case RpcType.UNARY: - return new SyncUnaryClientRunner(channel, - config.PayloadConfig.SimpleParams.ReqSize, - config.HistogramParams); - - case RpcType.STREAMING: - default: - throw new ArgumentException("Unsupported RpcType."); + channelOptions = new List + { + new ChannelOption(ChannelOptions.SslTargetNameOverride, config.SecurityParams.ServerHostOverride) + }; } + var channel = new Channel(target, credentials, channelOptions); + + return new ClientRunnerImpl(channel, + config.ClientType, + config.RpcType, + config.PayloadConfig, + config.HistogramParams); } } - /// - /// Client that starts synchronous unary calls in a closed loop. - /// - public class SyncUnaryClientRunner : IClientRunner + public class ClientRunnerImpl : IClientRunner { const double SecondsToNanos = 1e9; readonly Channel channel; - readonly int payloadSize; + readonly ClientType clientType; + readonly RpcType rpcType; + readonly PayloadConfig payloadConfig; readonly Histogram histogram; readonly BenchmarkService.IBenchmarkServiceClient client; @@ -93,15 +117,19 @@ namespace Grpc.IntegrationTesting readonly CancellationTokenSource stoppedCts; readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); - public SyncUnaryClientRunner(Channel channel, int payloadSize, HistogramParams histogramParams) + public ClientRunnerImpl(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams) { this.channel = GrpcPreconditions.CheckNotNull(channel); - this.payloadSize = payloadSize; + this.clientType = clientType; + this.rpcType = rpcType; + this.payloadConfig = payloadConfig; this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible); this.stoppedCts = new CancellationTokenSource(); this.client = BenchmarkService.NewClient(channel); - this.runnerTask = Task.Factory.StartNew(Run, TaskCreationOptions.LongRunning); + + var threadBody = GetThreadBody(); + this.runnerTask = Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning); } public ClientStats GetStats(bool reset) @@ -126,12 +154,9 @@ namespace Grpc.IntegrationTesting await channel.ShutdownAsync(); } - private void Run() + private void RunClosedLoopUnary() { - var request = new SimpleRequest - { - Payload = CreateZerosPayload(payloadSize) - }; + var request = CreateSimpleRequest(); var stopwatch = new Stopwatch(); while (!stoppedCts.Token.IsCancellationRequested) @@ -145,6 +170,124 @@ namespace Grpc.IntegrationTesting } } + private async Task RunClosedLoopUnaryAsync() + { + var request = CreateSimpleRequest(); + var stopwatch = new Stopwatch(); + + while (!stoppedCts.Token.IsCancellationRequested) + { + stopwatch.Restart(); + await client.UnaryCallAsync(request); + stopwatch.Stop(); + + // spec requires data point in nanoseconds. + histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + } + } + + private async Task RunClosedLoopStreamingAsync() + { + var request = CreateSimpleRequest(); + var stopwatch = new Stopwatch(); + + using (var call = client.StreamingCall()) + { + while (!stoppedCts.Token.IsCancellationRequested) + { + stopwatch.Restart(); + await call.RequestStream.WriteAsync(request); + await call.ResponseStream.MoveNext(); + stopwatch.Stop(); + + // spec requires data point in nanoseconds. + histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + } + + // finish the streaming call + await call.RequestStream.CompleteAsync(); + Assert.IsFalse(await call.ResponseStream.MoveNext()); + } + } + + private async Task RunGenericClosedLoopStreamingAsync() + { + var request = CreateByteBufferRequest(); + var stopwatch = new Stopwatch(); + + var callDetails = new CallInvocationDetails(channel, GenericService.StreamingCallMethod, new CallOptions()); + + using (var call = Calls.AsyncDuplexStreamingCall(callDetails)) + { + while (!stoppedCts.Token.IsCancellationRequested) + { + stopwatch.Restart(); + await call.RequestStream.WriteAsync(request); + await call.ResponseStream.MoveNext(); + stopwatch.Stop(); + + // spec requires data point in nanoseconds. + histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + } + + // finish the streaming call + await call.RequestStream.CompleteAsync(); + Assert.IsFalse(await call.ResponseStream.MoveNext()); + } + } + + private Action GetThreadBody() + { + if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams) + { + GrpcPreconditions.CheckArgument(clientType == ClientType.ASYNC_CLIENT, "Generic client only supports async API"); + GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls"); + return () => + { + RunGenericClosedLoopStreamingAsync().Wait(); + }; + } + + GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams); + if (clientType == ClientType.SYNC_CLIENT) + { + GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#"); + return RunClosedLoopUnary; + } + else if (clientType == ClientType.ASYNC_CLIENT) + { + switch (rpcType) + { + case RpcType.UNARY: + return () => + { + RunClosedLoopUnaryAsync().Wait(); + }; + case RpcType.STREAMING: + return () => + { + RunClosedLoopStreamingAsync().Wait(); + }; + } + } + throw new ArgumentException("Unsupported configuration."); + } + + private SimpleRequest CreateSimpleRequest() + { + GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams); + return new SimpleRequest + { + Payload = CreateZerosPayload(payloadConfig.SimpleParams.ReqSize), + ResponseSize = payloadConfig.SimpleParams.RespSize + }; + } + + private byte[] CreateByteBufferRequest() + { + return new byte[payloadConfig.BytebufParams.ReqSize]; + } + private static Payload CreateZerosPayload(int size) { return new Payload { Body = ByteString.CopyFrom(new byte[size]) }; diff --git a/src/csharp/Grpc.IntegrationTesting/GenericService.cs b/src/csharp/Grpc.IntegrationTesting/GenericService.cs new file mode 100644 index 00000000000..c6128264ac5 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/GenericService.cs @@ -0,0 +1,71 @@ +#region Copyright notice and license + +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Text.RegularExpressions; +using System.Threading.Tasks; +using Google.Protobuf; +using Grpc.Core; +using Grpc.Core.Utils; +using NUnit.Framework; +using Grpc.Testing; + +namespace Grpc.IntegrationTesting +{ + /// + /// Utility methods for defining and calling a service that doesn't use protobufs + /// for serialization/deserialization. + /// + public static class GenericService + { + readonly static Marshaller ByteArrayMarshaller = new Marshaller((b) => b, (b) => b); + + public readonly static Method StreamingCallMethod = new Method( + MethodType.DuplexStreaming, + "grpc.testing.BenchmarkService", + "StreamingCall", + ByteArrayMarshaller, + ByteArrayMarshaller + ); + + public static ServerServiceDefinition BindHandler(DuplexStreamingServerMethod handler) + { + return ServerServiceDefinition.CreateBuilder(StreamingCallMethod.ServiceName) + .AddMethod(StreamingCallMethod, handler).Build(); + } + } +} diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 372991374ee..4c049944eaf 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -120,6 +120,7 @@ + diff --git a/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs index 06d5ee93d88..a8cf75bd819 100644 --- a/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs @@ -55,14 +55,7 @@ namespace Grpc.IntegrationTesting { var serverConfig = new ServerConfig { - ServerType = ServerType.ASYNC_SERVER, - PayloadConfig = new PayloadConfig - { - SimpleParams = new SimpleProtoParams - { - RespSize = 100 - } - } + ServerType = ServerType.ASYNC_SERVER }; serverRunner = ServerRunners.CreateStarted(serverConfig); } @@ -88,7 +81,8 @@ namespace Grpc.IntegrationTesting { SimpleParams = new SimpleProtoParams { - ReqSize = 100 + ReqSize = 100, + RespSize = 100 } }, HistogramParams = new HistogramParams diff --git a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs index 4a73645e6ca..c326378cfac 100644 --- a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs @@ -41,6 +41,7 @@ using System.Threading; using System.Threading.Tasks; using Google.Protobuf; using Grpc.Core; +using Grpc.Core.Logging; using Grpc.Core.Utils; using NUnit.Framework; using Grpc.Testing; @@ -50,27 +51,78 @@ namespace Grpc.IntegrationTesting /// /// Helper methods to start server runners for performance testing. /// - public static class ServerRunners + public class ServerRunners { + static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); + /// /// Creates a started server runner. /// public static IServerRunner CreateStarted(ServerConfig config) { - GrpcPreconditions.CheckArgument(config.ServerType == ServerType.ASYNC_SERVER); + Logger.Debug("ServerConfig: {0}", config); var credentials = config.SecurityParams != null ? TestCredentials.CreateSslServerCredentials() : ServerCredentials.Insecure; - // TODO: qps_driver needs to setup payload properly... - int responseSize = config.PayloadConfig != null ? config.PayloadConfig.SimpleParams.RespSize : 0; + if (config.AsyncServerThreads != 0) + { + Logger.Warning("ServerConfig.AsyncServerThreads is not supported for C#. Ignoring the value"); + } + if (config.CoreLimit != 0) + { + Logger.Warning("ServerConfig.CoreLimit is not supported for C#. Ignoring the value"); + } + if (config.CoreList.Count > 0) + { + Logger.Warning("ServerConfig.CoreList is not supported for C#. Ignoring the value"); + } + + ServerServiceDefinition service = null; + if (config.ServerType == ServerType.ASYNC_SERVER) + { + GrpcPreconditions.CheckArgument(config.PayloadConfig == null, + "ServerConfig.PayloadConfig shouldn't be set for BenchmarkService based server."); + service = BenchmarkService.BindService(new BenchmarkServiceImpl()); + } + else if (config.ServerType == ServerType.ASYNC_GENERIC_SERVER) + { + var genericService = new GenericServiceImpl(config.PayloadConfig.BytebufParams.RespSize); + service = GenericService.BindHandler(genericService.StreamingCall); + } + else + { + throw new ArgumentException("Unsupported ServerType"); + } + var server = new Server { - Services = { BenchmarkService.BindService(new BenchmarkServiceImpl(responseSize)) }, + Services = { service }, Ports = { new ServerPort("[::]", config.Port, credentials) } }; server.Start(); return new ServerRunnerImpl(server); } + + private class GenericServiceImpl + { + readonly byte[] response; + + public GenericServiceImpl(int responseSize) + { + this.response = new byte[responseSize]; + } + + /// + /// Generic streaming call handler. + /// + public async Task StreamingCall(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) + { + await requestStream.ForEachAsync(async request => + { + await responseStream.WriteAsync(response); + }); + } + } } /// @@ -119,6 +171,5 @@ namespace Grpc.IntegrationTesting { return server.ShutdownAsync(); } - } - + } } diff --git a/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h b/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h index 02871d5d028..4b92504b555 100644 --- a/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h +++ b/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h @@ -54,7 +54,9 @@ GRPC_XMACRO_ITEM. #endif +#if TARGET_OS_IPHONE GRPC_XMACRO_ITEM(isCell, IsWWAN) +#endif GRPC_XMACRO_ITEM(reachable, Reachable) GRPC_XMACRO_ITEM(transientConnection, TransientConnection) GRPC_XMACRO_ITEM(connectionRequired, ConnectionRequired) diff --git a/templates/tools/dockerfile/gcp_api_libraries.include b/templates/tools/dockerfile/gcp_api_libraries.include new file mode 100644 index 00000000000..669b0f887c8 --- /dev/null +++ b/templates/tools/dockerfile/gcp_api_libraries.include @@ -0,0 +1,4 @@ +# Google Cloud platform API libraries +RUN apt-get update && apt-get install -y python-pip && apt-get clean +RUN pip install --upgrade google-api-python-client + diff --git a/templates/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile.template b/templates/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile.template new file mode 100644 index 00000000000..b1049d0d7f2 --- /dev/null +++ b/templates/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile.template @@ -0,0 +1,40 @@ +%YAML 1.2 +--- | + # Copyright 2015-2016, Google Inc. + # All rights reserved. + # + # Redistribution and use in source and binary forms, with or without + # modification, are permitted provided that the following conditions are + # met: + # + # * Redistributions of source code must retain the above copyright + # notice, this list of conditions and the following disclaimer. + # * Redistributions in binary form must reproduce the above + # copyright notice, this list of conditions and the following disclaimer + # in the documentation and/or other materials provided with the + # distribution. + # * Neither the name of Google Inc. nor the names of its + # contributors may be used to endorse or promote products derived from + # this software without specific prior written permission. + # + # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + FROM debian:jessie + + <%include file="../apt_get_basic.include"/> + <%include file="../ccache_setup.include"/> + <%include file="../cxx_deps.include"/> + <%include file="../gcp_api_libraries.include"/> + <%include file="../clang_update.include"/> + # Define the default command. + CMD ["bash"] diff --git a/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile b/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile index 4123cc1a26a..214747fd4a5 100644 --- a/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile +++ b/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile @@ -27,12 +27,9 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# A work-in-progress Dockerfile that allows running gRPC test suites -# inside a docker container. - FROM debian:jessie -# Install Git. +# Install Git and basic packages. RUN apt-get update && apt-get install -y \ autoconf \ autotools-dev \ @@ -43,13 +40,16 @@ RUN apt-get update && apt-get install -y \ gcc \ gcc-multilib \ git \ + golang \ gyp \ + lcov \ libc6 \ libc6-dbg \ libc6-dev \ libgtest-dev \ libtool \ make \ + perl \ strace \ python-dev \ python-setuptools \ @@ -59,7 +59,9 @@ RUN apt-get update && apt-get install -y \ wget \ zip && apt-get clean -RUN easy_install -U pip +#================ +# Build profiling +RUN apt-get update && apt-get install -y time && apt-get clean # Prepare ccache RUN ln -s /usr/bin/ccache /usr/local/bin/gcc @@ -69,12 +71,47 @@ RUN ln -s /usr/bin/ccache /usr/local/bin/c++ RUN ln -s /usr/bin/ccache /usr/local/bin/clang RUN ln -s /usr/bin/ccache /usr/local/bin/clang++ -################## +#================= # C++ dependencies -RUN apt-get update && apt-get -y install libgflags-dev libgtest-dev libc++-dev clang +RUN apt-get update && apt-get -y install libgflags-dev libgtest-dev libc++-dev clang && apt-get clean -# Google Cloud platform API libraries (for BigQuery) +# Google Cloud platform API libraries +RUN apt-get update && apt-get install -y python-pip && apt-get clean RUN pip install --upgrade google-api-python-client + +#================= +# Update clang to a version with improved tsan + +RUN apt-get update && apt-get -y install python cmake && apt-get clean + +RUN git clone -n -b release_38 http://llvm.org/git/llvm.git && \ + cd llvm && git checkout ad57503 && cd .. +RUN git clone -n -b release_38 http://llvm.org/git/clang.git && \ + cd clang && git checkout ad2c56e && cd .. +RUN git clone -n -b release_38 http://llvm.org/git/compiler-rt.git && \ + cd compiler-rt && git checkout 3176922 && cd .. +RUN git clone -n -b release_38 \ + http://llvm.org/git/clang-tools-extra.git && cd clang-tools-extra && \ + git checkout c288525 && cd .. +RUN git clone -n -b release_38 http://llvm.org/git/libcxx.git && \ + cd libcxx && git checkout fda3549 && cd .. +RUN git clone -n -b release_38 http://llvm.org/git/libcxxabi.git && \ + cd libcxxabi && git checkout 8d4e51d && cd .. + +RUN mv clang llvm/tools +RUN mv compiler-rt llvm/projects +RUN mv clang-tools-extra llvm/tools/clang/tools +RUN mv libcxx llvm/projects +RUN mv libcxxabi llvm/projects + +RUN mkdir llvm-build +RUN cd llvm-build && cmake \ + -DCMAKE_BUILD_TYPE:STRING=Release \ + -DCMAKE_INSTALL_PREFIX:STRING=/usr \ + -DLLVM_TARGETS_TO_BUILD:STRING=X86 \ + ../llvm +RUN make -C llvm-build && make -C llvm-build install && rm -rf llvm-build + # Define the default command. CMD ["bash"] diff --git a/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh b/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh index 392bdfccda0..470db4c13fb 100755 --- a/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh +++ b/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh @@ -41,5 +41,7 @@ cd /var/local/git/grpc make install-certs +BUILD_TYPE=${BUILD_TYPE:=opt} + # build C++ interop stress client, interop client and server -make stress_test metrics_client interop_client interop_server +make CONFIG=$BUILD_TYPE stress_test metrics_client interop_client interop_server diff --git a/tools/jenkins/build_interop_stress_image.sh b/tools/jenkins/build_interop_stress_image.sh index 501dc5b7ca4..b5dbcc5ce4b 100755 --- a/tools/jenkins/build_interop_stress_image.sh +++ b/tools/jenkins/build_interop_stress_image.sh @@ -34,10 +34,12 @@ set -x # Params: -# INTEROP_IMAGE - name of tag of the final interop image +# INTEROP_IMAGE - Name of tag of the final interop image # INTEROP_IMAGE_TAG - Optional. If set, the created image will be tagged using # the command: 'docker tag $INTEROP_IMAGE $INTEROP_IMAGE_REPOSITORY_TAG' -# BASE_NAME - base name used to locate the base Dockerfile and build script +# BASE_NAME - Base name used to locate the base Dockerfile and build script +# BUILD_TYPE - The 'CONFIG' variable passed to the 'make' command (example: +# asan, tsan. Default value: opt). # TTY_FLAG - optional -t flag to make docker allocate tty # BUILD_INTEROP_DOCKER_EXTRA_ARGS - optional args to be passed to the # docker run command @@ -71,6 +73,7 @@ CONTAINER_NAME="build_${BASE_NAME}_$(uuidgen)" (docker run \ -e CCACHE_DIR=/tmp/ccache \ -e THIS_IS_REALLY_NEEDED='see https://github.com/docker/docker/issues/14203 for why docker is awful' \ + -e BUILD_TYPE=${BUILD_TYPE:=opt} \ -i $TTY_FLAG \ $MOUNT_ARGS \ $BUILD_INTEROP_DOCKER_EXTRA_ARGS \ diff --git a/tools/run_tests/stress_test/README.md b/tools/run_tests/stress_test/README.md index 1a48e90c69e..84f9719cb16 100644 --- a/tools/run_tests/stress_test/README.md +++ b/tools/run_tests/stress_test/README.md @@ -67,8 +67,10 @@ The script has several parameters and you can find out more details by using the - `$ tools/run_tests/stress_test/run_stress_tests_on_gke.py --help` > **Example** -> `$ tools/run_tests/stress_test/run_stress_tests_on_gke.py --project_id=sree-gce --test_duration_secs=180 --num_clients=5` +> ```bash +> $ # Change to the grpc root directory +> $ cd $GRPC_ROOT +> $ tools/run_tests/stress_test/run_on_gke.py --project_id=sree-gce --config_file=tools/run_tests/stress_test/configs/opt.json +> ``` -> Launches the 5 instances of stress test clients, 1 instance of stress test server and runs the test for 180 seconds. The test would be run on the default container cluster (that you have set in `gcloud`) in the project `sree-gce`. - -> Note: we currently do not have the ability to launch multiple instances of the server. This can be added very easily in future +> The above runs the stress test on GKE under the project `sree-gce` in the default cluster (that you set by `gcloud` command earlier). The test settings (like number of client instances, servers, the parmeters to pass, test cases etc) are all loaded from the config file `$GRPC_ROOT/tools/run_tests/stress_test/opt.json` diff --git a/tools/run_tests/stress_test/configs/opt-tsan-asan.json b/tools/run_tests/stress_test/configs/opt-tsan-asan.json new file mode 100644 index 00000000000..1dc2d3fe086 --- /dev/null +++ b/tools/run_tests/stress_test/configs/opt-tsan-asan.json @@ -0,0 +1,135 @@ +{ + "dockerImages": { + "grpc_stress_cxx_opt" : { + "buildScript": "tools/jenkins/build_interop_stress_image.sh", + "dockerFileDir": "grpc_interop_stress_cxx", + "buildType": "opt" + }, + "grpc_stress_cxx_tsan": { + "buildScript": "tools/jenkins/build_interop_stress_image.sh", + "dockerFileDir": "grpc_interop_stress_cxx", + "buildType": "tsan" + }, + "grpc_stress_cxx_asan": { + "buildScript": "tools/jenkins/build_interop_stress_image.sh", + "dockerFileDir": "grpc_interop_stress_cxx", + "buildType": "asan" + } + }, + + "clientTemplates": { + "baseTemplates": { + "default": { + "wrapperScriptPath": "/var/local/git/grpc/tools/gcp/stress_test/run_client.py", + "pollIntervalSecs": 60, + "clientArgs": { + "num_channels_per_server":5, + "num_stubs_per_channel":10, + "test_cases": "empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1", + "metrics_port": 8081, + "metrics_collection_interval_secs":60 + }, + "metricsPort": 8081, + "metricsArgs": { + "metrics_server_address": "localhost:8081", + "total_only": "true" + } + } + }, + "templates": { + "cxx_client_opt": { + "baseTemplate": "default", + "clientImagePath": "/var/local/git/grpc/bins/opt/stress_test", + "metricsClientImagePath": "/var/local/git/grpc/bins/opt/metrics_client" + }, + "cxx_client_tsan": { + "baseTemplate": "default", + "clientImagePath": "/var/local/git/grpc/bins/tsan/stress_test", + "metricsClientImagePath": "/var/local/git/grpc/bins/tsan/metrics_client" + }, + "cxx_client_asan": { + "baseTemplate": "default", + "clientImagePath": "/var/local/git/grpc/bins/asan/stress_test", + "metricsClientImagePath": "/var/local/git/grpc/bins/asan/metrics_client" + } + } + }, + + "serverTemplates": { + "baseTemplates":{ + "default": { + "wrapperScriptPath": "/var/local/git/grpc/tools/gcp/stress_test/run_server.py", + "serverPort": 8080, + "serverArgs": { + "port": 8080 + } + } + }, + "templates": { + "cxx_server_opt": { + "baseTemplate": "default", + "serverImagePath": "/var/local/git/grpc/bins/opt/interop_server" + }, + "cxx_server_tsan": { + "baseTemplate": "default", + "serverImagePath": "/var/local/git/grpc/bins/tsan/interop_server" + }, + "cxx_server_asan": { + "baseTemplate": "default", + "serverImagePath": "/var/local/git/grpc/bins/asan/interop_server" + } + } + }, + + "testMatrix": { + "serverPodSpecs": { + "stress-server-opt": { + "serverTemplate": "cxx_server_opt", + "dockerImage": "grpc_stress_cxx_opt", + "numInstances": 1 + }, + "stress-server-tsan": { + "serverTemplate": "cxx_server_tsan", + "dockerImage": "grpc_stress_cxx_tsan", + "numInstances": 1 + }, + "stress-server-asan": { + "serverTemplate": "cxx_server_asan", + "dockerImage": "grpc_stress_cxx_asan", + "numInstances": 1 + } + }, + + "clientPodSpecs": { + "stress-client-opt": { + "clientTemplate": "cxx_client_opt", + "dockerImage": "grpc_stress_cxx_opt", + "numInstances": 3, + "serverPodSpec": "stress-server-opt" + }, + "stress-client-tsan": { + "clientTemplate": "cxx_client_tsan", + "dockerImage": "grpc_stress_cxx_tsan", + "numInstances": 3, + "serverPodSpec": "stress-server-tsan" + }, + "stress-client-asan": { + "clientTemplate": "cxx_client_asan", + "dockerImage": "grpc_stress_cxx_asan", + "numInstances": 3, + "serverPodSpec": "stress-server-asan" + } + } + }, + + "globalSettings": { + "buildDockerImages": true, + "pollIntervalSecs": 60, + "testDurationSecs": 7200, + "kubernetesProxyPort": 8001, + "datasetIdNamePrefix": "stress_test_opt_tsan", + "summaryTableId": "summary", + "qpsTableId": "qps", + "podWarmupSecs": 60 + } +} diff --git a/tools/run_tests/stress_test/configs/opt.json b/tools/run_tests/stress_test/configs/opt.json new file mode 100644 index 00000000000..7fc024034b8 --- /dev/null +++ b/tools/run_tests/stress_test/configs/opt.json @@ -0,0 +1,86 @@ +{ + "dockerImages": { + "grpc_stress_cxx_opt" : { + "buildScript": "tools/jenkins/build_interop_stress_image.sh", + "dockerFileDir": "grpc_interop_stress_cxx", + "buildType": "opt" + } + }, + + "clientTemplates": { + "baseTemplates": { + "default": { + "wrapperScriptPath": "/var/local/git/grpc/tools/gcp/stress_test/run_client.py", + "pollIntervalSecs": 60, + "clientArgs": { + "num_channels_per_server":5, + "num_stubs_per_channel":10, + "test_cases": "empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1", + "metrics_port": 8081, + "metrics_collection_interval_secs":60 + }, + "metricsPort": 8081, + "metricsArgs": { + "metrics_server_address": "localhost:8081", + "total_only": "true" + } + } + }, + "templates": { + "cxx_client_opt": { + "baseTemplate": "default", + "clientImagePath": "/var/local/git/grpc/bins/opt/stress_test", + "metricsClientImagePath": "/var/local/git/grpc/bins/opt/metrics_client" + } + } + }, + + "serverTemplates": { + "baseTemplates":{ + "default": { + "wrapperScriptPath": "/var/local/git/grpc/tools/gcp/stress_test/run_server.py", + "serverPort": 8080, + "serverArgs": { + "port": 8080 + } + } + }, + "templates": { + "cxx_server_opt": { + "baseTemplate": "default", + "serverImagePath": "/var/local/git/grpc/bins/opt/interop_server" + } + } + }, + + "testMatrix": { + "serverPodSpecs": { + "stress-server-opt": { + "serverTemplate": "cxx_server_opt", + "dockerImage": "grpc_stress_cxx_opt", + "numInstances": 1 + } + }, + + "clientPodSpecs": { + "stress-client-opt": { + "clientTemplate": "cxx_client_opt", + "dockerImage": "grpc_stress_cxx_opt", + "numInstances": 10, + "serverPodSpec": "stress-server-opt" + } + } + }, + + "globalSettings": { + "buildDockerImages": true, + "pollIntervalSecs": 10, + "testDurationSecs": 120, + "kubernetesProxyPort": 8001, + "datasetIdNamePrefix": "stress_test_opt", + "summaryTableId": "summary", + "qpsTableId": "qps", + "podWarmupSecs": 60 + } +} + diff --git a/tools/run_tests/stress_test/run_on_gke.py b/tools/run_tests/stress_test/run_on_gke.py new file mode 100755 index 00000000000..3a81c1a3763 --- /dev/null +++ b/tools/run_tests/stress_test/run_on_gke.py @@ -0,0 +1,636 @@ +#!/usr/bin/env python2.7 +# Copyright 2015-2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import argparse +import datetime +import json +import os +import subprocess +import sys +import time + +stress_test_utils_dir = os.path.abspath(os.path.join( + os.path.dirname(__file__), '../../gcp/stress_test')) +sys.path.append(stress_test_utils_dir) +from stress_test_utils import BigQueryHelper + +kubernetes_api_dir = os.path.abspath(os.path.join( + os.path.dirname(__file__), '../../gcp/utils')) +sys.path.append(kubernetes_api_dir) + +import kubernetes_api + + +class GlobalSettings: + + def __init__(self, gcp_project_id, build_docker_images, + test_poll_interval_secs, test_duration_secs, + kubernetes_proxy_port, dataset_id_prefix, summary_table_id, + qps_table_id, pod_warmup_secs): + self.gcp_project_id = gcp_project_id + self.build_docker_images = build_docker_images + self.test_poll_interval_secs = test_poll_interval_secs + self.test_duration_secs = test_duration_secs + self.kubernetes_proxy_port = kubernetes_proxy_port + self.dataset_id_prefix = dataset_id_prefix + self.summary_table_id = summary_table_id + self.qps_table_id = qps_table_id + self.pod_warmup_secs = pod_warmup_secs + + +class ClientTemplate: + """ Contains all the common settings that are used by a stress client """ + + def __init__(self, name, client_image_path, metrics_client_image_path, + metrics_port, wrapper_script_path, poll_interval_secs, + client_args_dict, metrics_args_dict): + self.name = name + self.client_image_path = client_image_path + self.metrics_client_image_path = metrics_client_image_path + self.metrics_port = metrics_port + self.wrapper_script_path = wrapper_script_path + self.poll_interval_secs = poll_interval_secs + self.client_args_dict = client_args_dict + self.metrics_args_dict = metrics_args_dict + + +class ServerTemplate: + """ Contains all the common settings used by a stress server """ + + def __init__(self, name, server_image_path, wrapper_script_path, server_port, + server_args_dict): + self.name = name + self.server_image_path = server_image_path + self.wrapper_script_path = wrapper_script_path + self.server_port = server_port + self.server_args_dict = server_args_dict + + +class DockerImage: + """ Represents properties of a Docker image. Provides methods to build the + image and push it to GKE registry + """ + + def __init__(self, gcp_project_id, image_name, build_script_path, + dockerfile_dir, build_type): + """Args: + + image_name: The docker image name + tag_name: The additional tag name. This is the name used when pushing the + docker image to GKE registry + build_script_path: The path to the build script that builds this docker + image + dockerfile_dir: The name of the directory under + '/tools/dockerfile' that contains the dockerfile + """ + self.image_name = image_name + self.gcp_project_id = gcp_project_id + self.build_script_path = build_script_path + self.dockerfile_dir = dockerfile_dir + self.build_type = build_type + self.tag_name = self._make_tag_name(gcp_project_id, image_name) + + def _make_tag_name(self, project_id, image_name): + return 'gcr.io/%s/%s' % (project_id, image_name) + + def build_image(self): + print 'Building docker image: %s (tag: %s)' % (self.image_name, + self.tag_name) + os.environ['INTEROP_IMAGE'] = self.image_name + os.environ['INTEROP_IMAGE_REPOSITORY_TAG'] = self.tag_name + os.environ['BASE_NAME'] = self.dockerfile_dir + os.environ['BUILD_TYPE'] = self.build_type + print 'DEBUG: path: ', self.build_script_path + if subprocess.call(args=[self.build_script_path]) != 0: + print 'Error in building the Docker image' + return False + return True + + def push_to_gke_registry(self): + cmd = ['gcloud', 'docker', 'push', self.tag_name] + print 'Pushing %s to the GKE registry..' % self.tag_name + if subprocess.call(args=cmd) != 0: + print 'Error in pushing the image %s to the GKE registry' % self.tag_name + return False + return True + + +class ServerPodSpec: + """ Contains the information required to launch server pods. """ + + def __init__(self, name, server_template, docker_image, num_instances): + self.name = name + self.template = server_template + self.docker_image = docker_image + self.num_instances = num_instances + + def pod_names(self): + """ Return a list of names of server pods to create. """ + return ['%s-%d' % (self.name, i) for i in range(1, self.num_instances + 1)] + + def server_addresses(self): + """ Return string of server addresses in the following format: + ':,:...' + """ + return ','.join(['%s:%d' % (pod_name, self.template.server_port) + for pod_name in self.pod_names()]) + + +class ClientPodSpec: + """ Contains the information required to launch client pods """ + + def __init__(self, name, client_template, docker_image, num_instances, + server_addresses): + self.name = name + self.template = client_template + self.docker_image = docker_image + self.num_instances = num_instances + self.server_addresses = server_addresses + + def pod_names(self): + """ Return a list of names of client pods to create """ + return ['%s-%d' % (self.name, i) for i in range(1, self.num_instances + 1)] + + # The client args in the template do not have server addresses. This function + # adds the server addresses and returns the updated client args + def get_client_args_dict(self): + args_dict = self.template.client_args_dict.copy() + args_dict['server_addresses'] = self.server_addresses + return args_dict + + +class Gke: + """ Class that has helper methods to interact with GKE """ + + class KubernetesProxy: + """Class to start a proxy on localhost to talk to the Kubernetes API server""" + + def __init__(self, port): + cmd = ['kubectl', 'proxy', '--port=%d' % port] + self.p = subprocess.Popen(args=cmd) + time.sleep(2) + print '\nStarted kubernetes proxy on port: %d' % port + + def __del__(self): + if self.p is not None: + print 'Shutting down Kubernetes proxy..' + self.p.kill() + + def __init__(self, project_id, run_id, dataset_id, summary_table_id, + qps_table_id, kubernetes_port): + self.project_id = project_id + self.run_id = run_id + self.dataset_id = dataset_id + self.summary_table_id = summary_table_id + self.qps_table_id = qps_table_id + + # The environment variables we would like to pass to every pod (both client + # and server) launched in GKE + self.gke_env = { + 'RUN_ID': self.run_id, + 'GCP_PROJECT_ID': self.project_id, + 'DATASET_ID': self.dataset_id, + 'SUMMARY_TABLE_ID': self.summary_table_id, + 'QPS_TABLE_ID': self.qps_table_id + } + + self.kubernetes_port = kubernetes_port + # Start kubernetes proxy + self.kubernetes_proxy = Gke.KubernetesProxy(kubernetes_port) + + def _args_dict_to_str(self, args_dict): + return ' '.join('--%s=%s' % (k, args_dict[k]) for k in args_dict.keys()) + + def launch_servers(self, server_pod_spec): + is_success = True + + # The command to run inside the container is the wrapper script (which then + # launches the actual server) + container_cmd = server_pod_spec.template.wrapper_script_path + + # The parameters to the wrapper script (defined in + # server_pod_spec.template.wrapper_script_path) are are injected into the + # container via environment variables + server_env = self.gke_env.copy() + server_env.update({ + 'STRESS_TEST_IMAGE_TYPE': 'SERVER', + 'STRESS_TEST_IMAGE': server_pod_spec.template.server_image_path, + 'STRESS_TEST_ARGS_STR': self._args_dict_to_str( + server_pod_spec.template.server_args_dict) + }) + + for pod_name in server_pod_spec.pod_names(): + server_env['POD_NAME'] = pod_name + print 'Creating server: %s' % pod_name + is_success = kubernetes_api.create_pod_and_service( + 'localhost', + self.kubernetes_port, + 'default', # Use 'default' namespace + pod_name, + server_pod_spec.docker_image.tag_name, + [server_pod_spec.template.server_port], # Ports to expose on the pod + [container_cmd], + [], # Args list is empty since we are passing all args via env variables + server_env, + True # Headless = True for server to that GKE creates a DNS record for pod_name + ) + if not is_success: + print 'Error in launching server: %s' % pod_name + break + + if is_success: + print 'Successfully created server(s)' + + return is_success + + def launch_clients(self, client_pod_spec): + is_success = True + + # The command to run inside the container is the wrapper script (which then + # launches the actual stress client) + container_cmd = client_pod_spec.template.wrapper_script_path + + # The parameters to the wrapper script (defined in + # client_pod_spec.template.wrapper_script_path) are are injected into the + # container via environment variables + client_env = self.gke_env.copy() + client_env.update({ + 'STRESS_TEST_IMAGE_TYPE': 'CLIENT', + 'STRESS_TEST_IMAGE': client_pod_spec.template.client_image_path, + 'STRESS_TEST_ARGS_STR': self._args_dict_to_str( + client_pod_spec.get_client_args_dict()), + 'METRICS_CLIENT_IMAGE': + client_pod_spec.template.metrics_client_image_path, + 'METRICS_CLIENT_ARGS_STR': self._args_dict_to_str( + client_pod_spec.template.metrics_args_dict), + 'POLL_INTERVAL_SECS': str(client_pod_spec.template.poll_interval_secs) + }) + + for pod_name in client_pod_spec.pod_names(): + client_env['POD_NAME'] = pod_name + print 'Creating client: %s' % pod_name + is_success = kubernetes_api.create_pod_and_service( + 'localhost', + self.kubernetes_port, + 'default', # default namespace, + pod_name, + client_pod_spec.docker_image.tag_name, + [client_pod_spec.template.metrics_port], # Ports to expose on the pod + [container_cmd], + [], # Empty args list since all args are passed via env variables + client_env, + False # Client is not a headless service. + ) + + if not is_success: + print 'Error in launching client %s' % pod_name + break + + if is_success: + print 'Successfully created all client(s)' + + return is_success + + def _delete_pods(self, pod_name_list): + is_success = True + for pod_name in pod_name_list: + print 'Deleting %s' % pod_name + is_success = kubernetes_api.delete_pod_and_service( + 'localhost', + self.kubernetes_port, + 'default', # default namespace + pod_name) + + if not is_success: + print 'Error in deleting pod %s' % pod_name + break + + if is_success: + print 'Successfully deleted all pods' + + return is_success + + def delete_servers(self, server_pod_spec): + return self._delete_pods(server_pod_spec.pod_names()) + + def delete_clients(self, client_pod_spec): + return self._delete_pods(client_pod_spec.pod_names()) + + +class Config: + + def __init__(self, config_filename, gcp_project_id): + print 'Loading configuration...' + config_dict = self._load_config(config_filename) + + self.global_settings = self._parse_global_settings(config_dict, + gcp_project_id) + self.docker_images_dict = self._parse_docker_images( + config_dict, self.global_settings.gcp_project_id) + self.client_templates_dict = self._parse_client_templates(config_dict) + self.server_templates_dict = self._parse_server_templates(config_dict) + self.server_pod_specs_dict = self._parse_server_pod_specs( + config_dict, self.docker_images_dict, self.server_templates_dict) + self.client_pod_specs_dict = self._parse_client_pod_specs( + config_dict, self.docker_images_dict, self.client_templates_dict, + self.server_pod_specs_dict) + print 'Loaded Configuaration.' + + def _parse_global_settings(self, config_dict, gcp_project_id): + global_settings_dict = config_dict['globalSettings'] + return GlobalSettings(gcp_project_id, + global_settings_dict['buildDockerImages'], + global_settings_dict['pollIntervalSecs'], + global_settings_dict['testDurationSecs'], + global_settings_dict['kubernetesProxyPort'], + global_settings_dict['datasetIdNamePrefix'], + global_settings_dict['summaryTableId'], + global_settings_dict['qpsTableId'], + global_settings_dict['podWarmupSecs']) + + def _parse_docker_images(self, config_dict, gcp_project_id): + """Parses the 'dockerImages' section of the config file and returns a + Dictionary of 'DockerImage' objects keyed by docker image names""" + docker_images_dict = {} + + docker_config_dict = config_dict['dockerImages'] + for image_name in docker_config_dict.keys(): + build_script_path = docker_config_dict[image_name]['buildScript'] + dockerfile_dir = docker_config_dict[image_name]['dockerFileDir'] + build_type = docker_config_dict[image_name]['buildType'] + docker_images_dict[image_name] = DockerImage(gcp_project_id, image_name, + build_script_path, + dockerfile_dir, build_type) + return docker_images_dict + + def _parse_client_templates(self, config_dict): + """Parses the 'clientTemplates' section of the config file and returns a + Dictionary of 'ClientTemplate' objects keyed by client template names. + + Note: The 'baseTemplates' sub section of the config file contains templates + with default values and the 'templates' sub section contains the actual + client templates (which refer to the base template name to use for default + values). + """ + client_templates_dict = {} + + templates_dict = config_dict['clientTemplates']['templates'] + base_templates_dict = config_dict['clientTemplates'].get('baseTemplates', + {}) + for template_name in templates_dict.keys(): + # temp_dict is a temporary dictionary that merges base template dictionary + # and client template dictionary (with client template dictionary values + # overriding base template values) + temp_dict = {} + + base_template_name = templates_dict[template_name].get('baseTemplate') + if not base_template_name is None: + temp_dict = base_templates_dict[base_template_name].copy() + + temp_dict.update(templates_dict[template_name]) + + # Create and add ClientTemplate object to the final client_templates_dict + client_templates_dict[template_name] = ClientTemplate( + template_name, temp_dict['clientImagePath'], + temp_dict['metricsClientImagePath'], temp_dict['metricsPort'], + temp_dict['wrapperScriptPath'], temp_dict['pollIntervalSecs'], + temp_dict['clientArgs'].copy(), temp_dict['metricsArgs'].copy()) + + return client_templates_dict + + def _parse_server_templates(self, config_dict): + """Parses the 'serverTemplates' section of the config file and returns a + Dictionary of 'serverTemplate' objects keyed by server template names. + + Note: The 'baseTemplates' sub section of the config file contains templates + with default values and the 'templates' sub section contains the actual + server templates (which refer to the base template name to use for default + values). + """ + server_templates_dict = {} + + templates_dict = config_dict['serverTemplates']['templates'] + base_templates_dict = config_dict['serverTemplates'].get('baseTemplates', + {}) + + for template_name in templates_dict.keys(): + # temp_dict is a temporary dictionary that merges base template dictionary + # and server template dictionary (with server template dictionary values + # overriding base template values) + temp_dict = {} + + base_template_name = templates_dict[template_name].get('baseTemplate') + if not base_template_name is None: + temp_dict = base_templates_dict[base_template_name].copy() + + temp_dict.update(templates_dict[template_name]) + + # Create and add ServerTemplate object to the final server_templates_dict + server_templates_dict[template_name] = ServerTemplate( + template_name, temp_dict['serverImagePath'], + temp_dict['wrapperScriptPath'], temp_dict['serverPort'], + temp_dict['serverArgs'].copy()) + + return server_templates_dict + + def _parse_server_pod_specs(self, config_dict, docker_images_dict, + server_templates_dict): + """Parses the 'serverPodSpecs' sub-section (under 'testMatrix' section) of + the config file and returns a Dictionary of 'ServerPodSpec' objects keyed + by server pod spec names""" + server_pod_specs_dict = {} + + pod_specs_dict = config_dict['testMatrix'].get('serverPodSpecs', {}) + + for pod_name in pod_specs_dict.keys(): + server_template_name = pod_specs_dict[pod_name]['serverTemplate'] + docker_image_name = pod_specs_dict[pod_name]['dockerImage'] + num_instances = pod_specs_dict[pod_name].get('numInstances', 1) + + # Create and add the ServerPodSpec object to the final + # server_pod_specs_dict + server_pod_specs_dict[pod_name] = ServerPodSpec( + pod_name, server_templates_dict[server_template_name], + docker_images_dict[docker_image_name], num_instances) + + return server_pod_specs_dict + + def _parse_client_pod_specs(self, config_dict, docker_images_dict, + client_templates_dict, server_pod_specs_dict): + """Parses the 'clientPodSpecs' sub-section (under 'testMatrix' section) of + the config file and returns a Dictionary of 'ClientPodSpec' objects keyed + by client pod spec names""" + client_pod_specs_dict = {} + + pod_specs_dict = config_dict['testMatrix'].get('clientPodSpecs', {}) + for pod_name in pod_specs_dict.keys(): + client_template_name = pod_specs_dict[pod_name]['clientTemplate'] + docker_image_name = pod_specs_dict[pod_name]['dockerImage'] + num_instances = pod_specs_dict[pod_name]['numInstances'] + + # Get the server addresses from the server pod spec object + server_pod_spec_name = pod_specs_dict[pod_name]['serverPodSpec'] + server_addresses = server_pod_specs_dict[ + server_pod_spec_name].server_addresses() + + client_pod_specs_dict[pod_name] = ClientPodSpec( + pod_name, client_templates_dict[client_template_name], + docker_images_dict[docker_image_name], num_instances, + server_addresses) + + return client_pod_specs_dict + + def _load_config(self, config_filename): + """Opens the config file and converts the Json text to Dictionary""" + if not os.path.isabs(config_filename): + raise Exception('Config objects expects an absolute file path. ' + 'config file name passed: %s' % config_filename) + with open(config_filename) as config_file: + return json.load(config_file) + + +def run_tests(config): + """ The main function that launches the stress tests """ + # Build docker images and push to GKE registry + if config.global_settings.build_docker_images: + for name, docker_image in config.docker_images_dict.iteritems(): + if not (docker_image.build_image() and + docker_image.push_to_gke_registry()): + return False + + # Create a unique id for this run (Note: Using timestamp instead of UUID to + # make it easier to deduce the date/time of the run just by looking at the run + # run id. This is useful in debugging when looking at records in Biq query) + run_id = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S') + dataset_id = '%s_%s' % (config.global_settings.dataset_id_prefix, run_id) + + bq_helper = BigQueryHelper(run_id, '', '', + config.global_settings.gcp_project_id, dataset_id, + config.global_settings.summary_table_id, + config.global_settings.qps_table_id) + bq_helper.initialize() + + gke = Gke(config.global_settings.gcp_project_id, run_id, dataset_id, + config.global_settings.summary_table_id, + config.global_settings.qps_table_id, + config.global_settings.kubernetes_proxy_port) + + is_success = True + + try: + print 'Launching servers..' + for name, server_pod_spec in config.server_pod_specs_dict.iteritems(): + if not gke.launch_servers(server_pod_spec): + is_success = False # is_success is checked in the 'finally' block + return False + + print('Launched servers. Waiting for %d seconds for the server pods to be ' + 'fully online') % config.global_settings.pod_warmup_secs + time.sleep(config.global_settings.pod_warmup_secs) + + for name, client_pod_spec in config.client_pod_specs_dict.iteritems(): + if not gke.launch_clients(client_pod_spec): + is_success = False # is_success is checked in the 'finally' block + return False + + print('Launched all clients. Waiting for %d seconds for the client pods to ' + 'be fully online') % config.global_settings.pod_warmup_secs + time.sleep(config.global_settings.pod_warmup_secs) + + start_time = datetime.datetime.now() + end_time = start_time + datetime.timedelta( + seconds=config.global_settings.test_duration_secs) + print 'Running the test until %s' % end_time.isoformat() + + while True: + if datetime.datetime.now() > end_time: + print 'Test was run for %d seconds' % config.global_settings.test_duration_secs + break + + # Check if either stress server or clients have failed (btw, the bq_helper + # monitors all the rows in the summary table and checks if any of them + # have a failure status) + if bq_helper.check_if_any_tests_failed(): + is_success = False + print 'Some tests failed.' + break # Don't 'return' here. We still want to call bq_helper to print qps/summary tables + + # Tests running fine. Wait until next poll time to check the status + print 'Sleeping for %d seconds..' % config.global_settings.test_poll_interval_secs + time.sleep(config.global_settings.test_poll_interval_secs) + + # Print BiqQuery tables + bq_helper.print_qps_records() + bq_helper.print_summary_records() + + finally: + # If there was a test failure, we should not delete the pods since they + # would contain useful debug information (logs, core dumps etc) + if is_success: + for name, server_pod_spec in config.server_pod_specs_dict.iteritems(): + gke.delete_servers(server_pod_spec) + for name, client_pod_spec in config.client_pod_specs_dict.iteritems(): + gke.delete_clients(client_pod_spec) + + return is_success + + +argp = argparse.ArgumentParser( + description='Launch stress tests in GKE', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) +argp.add_argument('--gcp_project_id', + required=True, + help='The Google Cloud Platform Project Id') +argp.add_argument('--config_file', + required=True, + type=str, + help='The test config file') + +if __name__ == '__main__': + args = argp.parse_args() + + config_filename = args.config_file + + # Since we will be changing the current working directory to grpc root in the + # next step, we should check if the config filename path is a relative path + # (i.e a path relative to the current working directory) and if so, convert it + # to abosulte path + if not os.path.isabs(config_filename): + config_filename = os.path.abspath(config_filename) + + config = Config(config_filename, args.gcp_project_id) + + # Change current working directory to grpc root + # (This is important because all relative file paths in the config file are + # supposed to interpreted as relative to the GRPC root) + grpc_root = os.path.abspath(os.path.join( + os.path.dirname(sys.argv[0]), '../../..')) + os.chdir(grpc_root) + + run_tests(config) diff --git a/tools/run_tests/stress_test/run_stress_tests_on_gke.py b/tools/run_tests/stress_test/run_stress_tests_on_gke.py deleted file mode 100755 index 634eb1aca53..00000000000 --- a/tools/run_tests/stress_test/run_stress_tests_on_gke.py +++ /dev/null @@ -1,556 +0,0 @@ -#!/usr/bin/env python2.7 -# Copyright 2015-2016, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import argparse -import datetime -import os -import subprocess -import sys -import time - -stress_test_utils_dir = os.path.abspath(os.path.join( - os.path.dirname(__file__), '../../gcp/stress_test')) -sys.path.append(stress_test_utils_dir) -from stress_test_utils import BigQueryHelper - -kubernetes_api_dir = os.path.abspath(os.path.join( - os.path.dirname(__file__), '../../gcp/utils')) -sys.path.append(kubernetes_api_dir) - -import kubernetes_api - -_GRPC_ROOT = os.path.abspath(os.path.join( - os.path.dirname(sys.argv[0]), '../../..')) -os.chdir(_GRPC_ROOT) - -# num of seconds to wait for the GKE image to start and warmup -_GKE_IMAGE_WARMUP_WAIT_SECS = 60 - -_SERVER_POD_NAME = 'stress-server' -_CLIENT_POD_NAME_PREFIX = 'stress-client' -_DATASET_ID_PREFIX = 'stress_test' -_SUMMARY_TABLE_ID = 'summary' -_QPS_TABLE_ID = 'qps' - -_DEFAULT_DOCKER_IMAGE_NAME = 'grpc_stress_test' - -# The default port on which the kubernetes proxy server is started on localhost -# (i.e kubectl proxy --port=) -_DEFAULT_KUBERNETES_PROXY_PORT = 8001 - -# How frequently should the stress client wrapper script (running inside a GKE -# container) poll the health of the stress client (also running inside the GKE -# container) and upload metrics to BigQuery -_DEFAULT_STRESS_CLIENT_POLL_INTERVAL_SECS = 60 - -# The default setting for stress test server and client -_DEFAULT_STRESS_SERVER_PORT = 8080 -_DEFAULT_METRICS_PORT = 8081 -_DEFAULT_TEST_CASES_STR = 'empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1' -_DEFAULT_NUM_CHANNELS_PER_SERVER = 5 -_DEFAULT_NUM_STUBS_PER_CHANNEL = 10 -_DEFAULT_METRICS_COLLECTION_INTERVAL_SECS = 30 - -# Number of stress client instances to launch -_DEFAULT_NUM_CLIENTS = 3 - -# How frequently should this test monitor the health of Stress clients and -# Servers running in GKE -_DEFAULT_TEST_POLL_INTERVAL_SECS = 60 - -# Default run time for this test (2 hour) -_DEFAULT_TEST_DURATION_SECS = 7200 - -# The number of seconds it would take a GKE pod to warm up (i.e get to 'Running' -# state from the time of creation). Ideally this is something the test should -# automatically determine by using Kubernetes API to poll the pods status. -_DEFAULT_GKE_WARMUP_SECS = 60 - - -class KubernetesProxy: - """ Class to start a proxy on localhost to the Kubernetes API server """ - - def __init__(self, api_port): - self.port = api_port - self.p = None - self.started = False - - def start(self): - cmd = ['kubectl', 'proxy', '--port=%d' % self.port] - self.p = subprocess.Popen(args=cmd) - self.started = True - time.sleep(2) - print '..Started' - - def get_port(self): - return self.port - - def is_started(self): - return self.started - - def __del__(self): - if self.p is not None: - print 'Shutting down Kubernetes proxy..' - self.p.kill() - - -class TestSettings: - - def __init__(self, build_docker_image, test_poll_interval_secs, - test_duration_secs, kubernetes_proxy_port): - self.build_docker_image = build_docker_image - self.test_poll_interval_secs = test_poll_interval_secs - self.test_duration_secs = test_duration_secs - self.kubernetes_proxy_port = kubernetes_proxy_port - - -class GkeSettings: - - def __init__(self, project_id, docker_image_name): - self.project_id = project_id - self.docker_image_name = docker_image_name - self.tag_name = 'gcr.io/%s/%s' % (project_id, docker_image_name) - - -class BigQuerySettings: - - def __init__(self, run_id, dataset_id, summary_table_id, qps_table_id): - self.run_id = run_id - self.dataset_id = dataset_id - self.summary_table_id = summary_table_id - self.qps_table_id = qps_table_id - - -class StressServerSettings: - - def __init__(self, server_pod_name, server_port): - self.server_pod_name = server_pod_name - self.server_port = server_port - - -class StressClientSettings: - - def __init__(self, num_clients, client_pod_name_prefix, server_pod_name, - server_port, metrics_port, metrics_collection_interval_secs, - stress_client_poll_interval_secs, num_channels_per_server, - num_stubs_per_channel, test_cases_str): - self.num_clients = num_clients - self.client_pod_name_prefix = client_pod_name_prefix - self.server_pod_name = server_pod_name - self.server_port = server_port - self.metrics_port = metrics_port - self.metrics_collection_interval_secs = metrics_collection_interval_secs - self.stress_client_poll_interval_secs = stress_client_poll_interval_secs - self.num_channels_per_server = num_channels_per_server - self.num_stubs_per_channel = num_stubs_per_channel - self.test_cases_str = test_cases_str - - # == Derived properties == - # Note: Client can accept a list of server addresses (a comma separated list - # of 'server_name:server_port'). In this case, we only have one server - # address to pass - self.server_addresses = '%s.default.svc.cluster.local:%d' % ( - server_pod_name, server_port) - self.client_pod_names_list = ['%s-%d' % (client_pod_name_prefix, i) - for i in range(1, num_clients + 1)] - - -def _build_docker_image(image_name, tag_name): - """ Build the docker image and add tag it to the GKE repository """ - print 'Building docker image: %s' % image_name - os.environ['INTEROP_IMAGE'] = image_name - os.environ['INTEROP_IMAGE_REPOSITORY_TAG'] = tag_name - # Note that 'BASE_NAME' HAS to be 'grpc_interop_stress_cxx' since the script - # build_interop_stress_image.sh invokes the following script: - # tools/dockerfile/$BASE_NAME/build_interop_stress.sh - os.environ['BASE_NAME'] = 'grpc_interop_stress_cxx' - cmd = ['tools/jenkins/build_interop_stress_image.sh'] - retcode = subprocess.call(args=cmd) - if retcode != 0: - print 'Error in building docker image' - return False - return True - - -def _push_docker_image_to_gke_registry(docker_tag_name): - """Executes 'gcloud docker push ' to push the image to GKE registry""" - cmd = ['gcloud', 'docker', 'push', docker_tag_name] - print 'Pushing %s to GKE registry..' % docker_tag_name - retcode = subprocess.call(args=cmd) - if retcode != 0: - print 'Error in pushing docker image %s to the GKE registry' % docker_tag_name - return False - return True - - -def _launch_server(gke_settings, stress_server_settings, bq_settings, - kubernetes_proxy): - """ Launches a stress test server instance in GKE cluster """ - if not kubernetes_proxy.is_started: - print 'Kubernetes proxy must be started before calling this function' - return False - - # This is the wrapper script that is run in the container. This script runs - # the actual stress test server - server_cmd_list = ['/var/local/git/grpc/tools/gcp/stress_test/run_server.py'] - - # run_server.py does not take any args from the command line. The args are - # instead passed via environment variables (see server_env below) - server_arg_list = [] - - # The parameters to the script run_server.py are injected into the container - # via environment variables - server_env = { - 'STRESS_TEST_IMAGE_TYPE': 'SERVER', - 'STRESS_TEST_IMAGE': '/var/local/git/grpc/bins/opt/interop_server', - 'STRESS_TEST_ARGS_STR': '--port=%s' % stress_server_settings.server_port, - 'RUN_ID': bq_settings.run_id, - 'POD_NAME': stress_server_settings.server_pod_name, - 'GCP_PROJECT_ID': gke_settings.project_id, - 'DATASET_ID': bq_settings.dataset_id, - 'SUMMARY_TABLE_ID': bq_settings.summary_table_id, - 'QPS_TABLE_ID': bq_settings.qps_table_id - } - - # Launch Server - is_success = kubernetes_api.create_pod_and_service( - 'localhost', - kubernetes_proxy.get_port(), - 'default', # Use 'default' namespace - stress_server_settings.server_pod_name, - gke_settings.tag_name, - [stress_server_settings.server_port], # Port that should be exposed - server_cmd_list, - server_arg_list, - server_env, - True # Headless = True for server. Since we want DNS records to be created by GKE - ) - - return is_success - - -def _launch_client(gke_settings, stress_server_settings, stress_client_settings, - bq_settings, kubernetes_proxy): - """ Launches a configurable number of stress test clients on GKE cluster """ - if not kubernetes_proxy.is_started: - print 'Kubernetes proxy must be started before calling this function' - return False - - stress_client_arg_list = [ - '--server_addresses=%s' % stress_client_settings.server_addresses, - '--test_cases=%s' % stress_client_settings.test_cases_str, - '--num_stubs_per_channel=%d' % - stress_client_settings.num_stubs_per_channel - ] - - # This is the wrapper script that is run in the container. This script runs - # the actual stress client - client_cmd_list = ['/var/local/git/grpc/tools/gcp/stress_test/run_client.py'] - - # run_client.py takes no args. All args are passed as env variables (see - # client_env) - client_arg_list = [] - - metrics_server_address = 'localhost:%d' % stress_client_settings.metrics_port - metrics_client_arg_list = [ - '--metrics_server_address=%s' % metrics_server_address, - '--total_only=true' - ] - - # The parameters to the script run_client.py are injected into the container - # via environment variables - client_env = { - 'STRESS_TEST_IMAGE_TYPE': 'CLIENT', - 'STRESS_TEST_IMAGE': '/var/local/git/grpc/bins/opt/stress_test', - 'STRESS_TEST_ARGS_STR': ' '.join(stress_client_arg_list), - 'METRICS_CLIENT_IMAGE': '/var/local/git/grpc/bins/opt/metrics_client', - 'METRICS_CLIENT_ARGS_STR': ' '.join(metrics_client_arg_list), - 'RUN_ID': bq_settings.run_id, - 'POLL_INTERVAL_SECS': - str(stress_client_settings.stress_client_poll_interval_secs), - 'GCP_PROJECT_ID': gke_settings.project_id, - 'DATASET_ID': bq_settings.dataset_id, - 'SUMMARY_TABLE_ID': bq_settings.summary_table_id, - 'QPS_TABLE_ID': bq_settings.qps_table_id - } - - for pod_name in stress_client_settings.client_pod_names_list: - client_env['POD_NAME'] = pod_name - is_success = kubernetes_api.create_pod_and_service( - 'localhost', # Since proxy is running on localhost - kubernetes_proxy.get_port(), - 'default', # default namespace - pod_name, - gke_settings.tag_name, - [stress_client_settings.metrics_port - ], # Client pods expose metrics port - client_cmd_list, - client_arg_list, - client_env, - False # Client is not a headless service - ) - if not is_success: - print 'Error in launching client %s' % pod_name - return False - - return True - - -def _launch_server_and_client(gke_settings, stress_server_settings, - stress_client_settings, bq_settings, - kubernetes_proxy_port): - # Start kubernetes proxy - print 'Kubernetes proxy' - kubernetes_proxy = KubernetesProxy(kubernetes_proxy_port) - kubernetes_proxy.start() - - print 'Launching server..' - is_success = _launch_server(gke_settings, stress_server_settings, bq_settings, - kubernetes_proxy) - if not is_success: - print 'Error in launching server' - return False - - # Server takes a while to start. - # TODO(sree) Use Kubernetes API to query the status of the server instead of - # sleeping - print 'Waiting for %s seconds for the server to start...' % _GKE_IMAGE_WARMUP_WAIT_SECS - time.sleep(_GKE_IMAGE_WARMUP_WAIT_SECS) - - # Launch client - client_pod_name_prefix = 'stress-client' - is_success = _launch_client(gke_settings, stress_server_settings, - stress_client_settings, bq_settings, - kubernetes_proxy) - - if not is_success: - print 'Error in launching client(s)' - return False - - print 'Waiting for %s seconds for the client images to start...' % _GKE_IMAGE_WARMUP_WAIT_SECS - time.sleep(_GKE_IMAGE_WARMUP_WAIT_SECS) - return True - - -def _delete_server_and_client(stress_server_settings, stress_client_settings, - kubernetes_proxy_port): - kubernetes_proxy = KubernetesProxy(kubernetes_proxy_port) - kubernetes_proxy.start() - - # Delete clients first - is_success = True - for pod_name in stress_client_settings.client_pod_names_list: - is_success = kubernetes_api.delete_pod_and_service( - 'localhost', kubernetes_proxy_port, 'default', pod_name) - if not is_success: - return False - - # Delete server - is_success = kubernetes_api.delete_pod_and_service( - 'localhost', kubernetes_proxy_port, 'default', - stress_server_settings.server_pod_name) - return is_success - - -def run_test_main(test_settings, gke_settings, stress_server_settings, - stress_client_clients): - is_success = True - - if test_settings.build_docker_image: - is_success = _build_docker_image(gke_settings.docker_image_name, - gke_settings.tag_name) - if not is_success: - return False - - is_success = _push_docker_image_to_gke_registry(gke_settings.tag_name) - if not is_success: - return False - - # Create a unique id for this run (Note: Using timestamp instead of UUID to - # make it easier to deduce the date/time of the run just by looking at the run - # run id. This is useful in debugging when looking at records in Biq query) - run_id = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S') - dataset_id = '%s_%s' % (_DATASET_ID_PREFIX, run_id) - - # Big Query settings (common for both Stress Server and Client) - bq_settings = BigQuerySettings(run_id, dataset_id, _SUMMARY_TABLE_ID, - _QPS_TABLE_ID) - - bq_helper = BigQueryHelper(run_id, '', '', args.project_id, dataset_id, - _SUMMARY_TABLE_ID, _QPS_TABLE_ID) - bq_helper.initialize() - - try: - is_success = _launch_server_and_client(gke_settings, stress_server_settings, - stress_client_settings, bq_settings, - test_settings.kubernetes_proxy_port) - if not is_success: - return False - - start_time = datetime.datetime.now() - end_time = start_time + datetime.timedelta( - seconds=test_settings.test_duration_secs) - print 'Running the test until %s' % end_time.isoformat() - - while True: - if datetime.datetime.now() > end_time: - print 'Test was run for %d seconds' % test_settings.test_duration_secs - break - - # Check if either stress server or clients have failed - if bq_helper.check_if_any_tests_failed(): - is_success = False - print 'Some tests failed.' - break - - # Things seem to be running fine. Wait until next poll time to check the - # status - print 'Sleeping for %d seconds..' % test_settings.test_poll_interval_secs - time.sleep(test_settings.test_poll_interval_secs) - - # Print BiqQuery tables - bq_helper.print_summary_records() - bq_helper.print_qps_records() - - finally: - # If is_success is False at this point, it means that the stress tests were - # started successfully but failed while running the tests. In this case we - # do should not delete the pods (since they contain all the failure - # information) - if is_success: - _delete_server_and_client(stress_server_settings, stress_client_settings, - test_settings.kubernetes_proxy_port) - - return is_success - - -argp = argparse.ArgumentParser( - description='Launch stress tests in GKE', - formatter_class=argparse.ArgumentDefaultsHelpFormatter) -argp.add_argument('--project_id', - required=True, - help='The Google Cloud Platform Project Id') -argp.add_argument('--num_clients', - default=1, - type=int, - help='Number of client instances to start') -argp.add_argument('--docker_image_name', - default=_DEFAULT_DOCKER_IMAGE_NAME, - help='The name of the docker image containing stress client ' - 'and stress servers') -argp.add_argument('--build_docker_image', - dest='build_docker_image', - action='store_true', - help='Build a docker image and push to Google Container ' - 'Registry') -argp.add_argument('--do_not_build_docker_image', - dest='build_docker_image', - action='store_false', - help='Do not build and push docker image to Google Container ' - 'Registry') -argp.set_defaults(build_docker_image=True) - -argp.add_argument('--test_poll_interval_secs', - default=_DEFAULT_TEST_POLL_INTERVAL_SECS, - type=int, - help='How frequently should this script should monitor the ' - 'health of stress clients and servers running in the GKE ' - 'cluster') -argp.add_argument('--test_duration_secs', - default=_DEFAULT_TEST_DURATION_SECS, - type=int, - help='How long should this test be run') -argp.add_argument('--kubernetes_proxy_port', - default=_DEFAULT_KUBERNETES_PROXY_PORT, - type=int, - help='The port on which the kubernetes proxy (on localhost)' - ' is started') -argp.add_argument('--stress_server_port', - default=_DEFAULT_STRESS_SERVER_PORT, - type=int, - help='The port on which the stress server (in GKE ' - 'containers) listens') -argp.add_argument('--stress_client_metrics_port', - default=_DEFAULT_METRICS_PORT, - type=int, - help='The port on which the stress clients (in GKE ' - 'containers) expose metrics') -argp.add_argument('--stress_client_poll_interval_secs', - default=_DEFAULT_STRESS_CLIENT_POLL_INTERVAL_SECS, - type=int, - help='How frequently should the stress client wrapper script' - ' running inside GKE should monitor health of the actual ' - ' stress client process and upload the metrics to BigQuery') -argp.add_argument('--stress_client_metrics_collection_interval_secs', - default=_DEFAULT_METRICS_COLLECTION_INTERVAL_SECS, - type=int, - help='How frequently should metrics be collected in-memory on' - ' the stress clients (running inside GKE containers). Note ' - 'that this is NOT the same as the upload-to-BigQuery ' - 'frequency. The metrics upload frequency is controlled by the' - ' --stress_client_poll_interval_secs flag') -argp.add_argument('--stress_client_num_channels_per_server', - default=_DEFAULT_NUM_CHANNELS_PER_SERVER, - type=int, - help='The number of channels created to each server from a ' - 'stress client') -argp.add_argument('--stress_client_num_stubs_per_channel', - default=_DEFAULT_NUM_STUBS_PER_CHANNEL, - type=int, - help='The number of stubs created per channel. This number ' - 'indicates the max number of RPCs that can be made in ' - 'parallel on each channel at any given time') -argp.add_argument('--stress_client_test_cases', - default=_DEFAULT_TEST_CASES_STR, - help='List of test cases (with weights) to be executed by the' - ' stress test client. The list is in the following format:\n' - ' ..\n' - ' (Note: The weights do not have to add up to 100)') - -if __name__ == '__main__': - args = argp.parse_args() - - test_settings = TestSettings( - args.build_docker_image, args.test_poll_interval_secs, - args.test_duration_secs, args.kubernetes_proxy_port) - - gke_settings = GkeSettings(args.project_id, args.docker_image_name) - - stress_server_settings = StressServerSettings(_SERVER_POD_NAME, - args.stress_server_port) - stress_client_settings = StressClientSettings( - args.num_clients, _CLIENT_POD_NAME_PREFIX, _SERVER_POD_NAME, - args.stress_server_port, args.stress_client_metrics_port, - args.stress_client_metrics_collection_interval_secs, - args.stress_client_poll_interval_secs, - args.stress_client_num_channels_per_server, - args.stress_client_num_stubs_per_channel, args.stress_client_test_cases) - - run_test_main(test_settings, gke_settings, stress_server_settings, - stress_client_settings)