Merge pull request #5904 from jtattermusch/csharp_perf_intergrate

C# performance worker improvements
pull/5846/head
Jan Tattermusch 9 years ago
commit d2eb23974d
  1. 11
      src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs
  2. 195
      src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
  3. 71
      src/csharp/Grpc.IntegrationTesting/GenericService.cs
  4. 1
      src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
  5. 12
      src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs
  6. 65
      src/csharp/Grpc.IntegrationTesting/ServerRunners.cs

@ -1,6 +1,6 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@ -46,16 +46,13 @@ namespace Grpc.Testing
/// </summary>
public class BenchmarkServiceImpl : BenchmarkService.IBenchmarkService
{
private readonly int responseSize;
public BenchmarkServiceImpl(int responseSize)
public BenchmarkServiceImpl()
{
this.responseSize = responseSize;
}
public Task<SimpleResponse> UnaryCall(SimpleRequest request, ServerCallContext context)
{
var response = new SimpleResponse { Payload = CreateZerosPayload(responseSize) };
var response = new SimpleResponse { Payload = CreateZerosPayload(request.ResponseSize) };
return Task.FromResult(response);
}
@ -63,7 +60,7 @@ namespace Grpc.Testing
{
await requestStream.ForEachAsync(async request =>
{
var response = new SimpleResponse { Payload = CreateZerosPayload(responseSize) };
var response = new SimpleResponse { Payload = CreateZerosPayload(request.ResponseSize) };
await responseStream.WriteAsync(response);
});
}

@ -41,6 +41,7 @@ using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
using NUnit.Framework;
using Grpc.Testing;
@ -50,42 +51,65 @@ namespace Grpc.IntegrationTesting
/// <summary>
/// Helper methods to start client runners for performance testing.
/// </summary>
public static class ClientRunners
public class ClientRunners
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientRunners>();
/// <summary>
/// Creates a started client runner.
/// </summary>
public static IClientRunner CreateStarted(ClientConfig config)
{
Logger.Debug("ClientConfig: {0}", config);
string target = config.ServerTargets.Single();
GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop);
GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop,
"Only closed loop scenario supported for C#");
GrpcPreconditions.CheckArgument(config.ClientChannels == 1, "ClientConfig.ClientChannels needs to be 1");
var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
var channel = new Channel(target, credentials);
if (config.OutstandingRpcsPerChannel != 0)
{
Logger.Warning("ClientConfig.OutstandingRpcsPerChannel is not supported for C#. Ignoring the value");
}
if (config.AsyncClientThreads != 0)
{
Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value");
}
if (config.CoreLimit != 0)
{
Logger.Warning("ClientConfig.CoreLimit is not supported for C#. Ignoring the value");
}
if (config.CoreList.Count > 0)
{
Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value");
}
switch (config.RpcType)
var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
List<ChannelOption> channelOptions = null;
if (config.SecurityParams != null && config.SecurityParams.ServerHostOverride != "")
{
case RpcType.UNARY:
return new SyncUnaryClientRunner(channel,
config.PayloadConfig.SimpleParams.ReqSize,
config.HistogramParams);
case RpcType.STREAMING:
default:
throw new ArgumentException("Unsupported RpcType.");
channelOptions = new List<ChannelOption>
{
new ChannelOption(ChannelOptions.SslTargetNameOverride, config.SecurityParams.ServerHostOverride)
};
}
var channel = new Channel(target, credentials, channelOptions);
return new ClientRunnerImpl(channel,
config.ClientType,
config.RpcType,
config.PayloadConfig,
config.HistogramParams);
}
}
/// <summary>
/// Client that starts synchronous unary calls in a closed loop.
/// </summary>
public class SyncUnaryClientRunner : IClientRunner
public class ClientRunnerImpl : IClientRunner
{
const double SecondsToNanos = 1e9;
readonly Channel channel;
readonly int payloadSize;
readonly ClientType clientType;
readonly RpcType rpcType;
readonly PayloadConfig payloadConfig;
readonly Histogram histogram;
readonly BenchmarkService.IBenchmarkServiceClient client;
@ -93,15 +117,19 @@ namespace Grpc.IntegrationTesting
readonly CancellationTokenSource stoppedCts;
readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
public SyncUnaryClientRunner(Channel channel, int payloadSize, HistogramParams histogramParams)
public ClientRunnerImpl(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams)
{
this.channel = GrpcPreconditions.CheckNotNull(channel);
this.payloadSize = payloadSize;
this.clientType = clientType;
this.rpcType = rpcType;
this.payloadConfig = payloadConfig;
this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible);
this.stoppedCts = new CancellationTokenSource();
this.client = BenchmarkService.NewClient(channel);
this.runnerTask = Task.Factory.StartNew(Run, TaskCreationOptions.LongRunning);
var threadBody = GetThreadBody();
this.runnerTask = Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning);
}
public ClientStats GetStats(bool reset)
@ -126,12 +154,9 @@ namespace Grpc.IntegrationTesting
await channel.ShutdownAsync();
}
private void Run()
private void RunClosedLoopUnary()
{
var request = new SimpleRequest
{
Payload = CreateZerosPayload(payloadSize)
};
var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
while (!stoppedCts.Token.IsCancellationRequested)
@ -145,6 +170,124 @@ namespace Grpc.IntegrationTesting
}
}
private async Task RunClosedLoopUnaryAsync()
{
var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
while (!stoppedCts.Token.IsCancellationRequested)
{
stopwatch.Restart();
await client.UnaryCallAsync(request);
stopwatch.Stop();
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
}
}
private async Task RunClosedLoopStreamingAsync()
{
var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
using (var call = client.StreamingCall())
{
while (!stoppedCts.Token.IsCancellationRequested)
{
stopwatch.Restart();
await call.RequestStream.WriteAsync(request);
await call.ResponseStream.MoveNext();
stopwatch.Stop();
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
}
// finish the streaming call
await call.RequestStream.CompleteAsync();
Assert.IsFalse(await call.ResponseStream.MoveNext());
}
}
private async Task RunGenericClosedLoopStreamingAsync()
{
var request = CreateByteBufferRequest();
var stopwatch = new Stopwatch();
var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, GenericService.StreamingCallMethod, new CallOptions());
using (var call = Calls.AsyncDuplexStreamingCall(callDetails))
{
while (!stoppedCts.Token.IsCancellationRequested)
{
stopwatch.Restart();
await call.RequestStream.WriteAsync(request);
await call.ResponseStream.MoveNext();
stopwatch.Stop();
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
}
// finish the streaming call
await call.RequestStream.CompleteAsync();
Assert.IsFalse(await call.ResponseStream.MoveNext());
}
}
private Action GetThreadBody()
{
if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams)
{
GrpcPreconditions.CheckArgument(clientType == ClientType.ASYNC_CLIENT, "Generic client only supports async API");
GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls");
return () =>
{
RunGenericClosedLoopStreamingAsync().Wait();
};
}
GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
if (clientType == ClientType.SYNC_CLIENT)
{
GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#");
return RunClosedLoopUnary;
}
else if (clientType == ClientType.ASYNC_CLIENT)
{
switch (rpcType)
{
case RpcType.UNARY:
return () =>
{
RunClosedLoopUnaryAsync().Wait();
};
case RpcType.STREAMING:
return () =>
{
RunClosedLoopStreamingAsync().Wait();
};
}
}
throw new ArgumentException("Unsupported configuration.");
}
private SimpleRequest CreateSimpleRequest()
{
GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
return new SimpleRequest
{
Payload = CreateZerosPayload(payloadConfig.SimpleParams.ReqSize),
ResponseSize = payloadConfig.SimpleParams.RespSize
};
}
private byte[] CreateByteBufferRequest()
{
return new byte[payloadConfig.BytebufParams.ReqSize];
}
private static Payload CreateZerosPayload(int size)
{
return new Payload { Body = ByteString.CopyFrom(new byte[size]) };

@ -0,0 +1,71 @@
#region Copyright notice and license
// Copyright 2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Core.Utils;
using NUnit.Framework;
using Grpc.Testing;
namespace Grpc.IntegrationTesting
{
/// <summary>
/// Utility methods for defining and calling a service that doesn't use protobufs
/// for serialization/deserialization.
/// </summary>
public static class GenericService
{
readonly static Marshaller<byte[]> ByteArrayMarshaller = new Marshaller<byte[]>((b) => b, (b) => b);
public readonly static Method<byte[], byte[]> StreamingCallMethod = new Method<byte[], byte[]>(
MethodType.DuplexStreaming,
"grpc.testing.BenchmarkService",
"StreamingCall",
ByteArrayMarshaller,
ByteArrayMarshaller
);
public static ServerServiceDefinition BindHandler(DuplexStreamingServerMethod<byte[], byte[]> handler)
{
return ServerServiceDefinition.CreateBuilder(StreamingCallMethod.ServiceName)
.AddMethod(StreamingCallMethod, handler).Build();
}
}
}

@ -120,6 +120,7 @@
<Compile Include="WorkerServiceImpl.cs" />
<Compile Include="QpsWorker.cs" />
<Compile Include="WallClockStopwatch.cs" />
<Compile Include="GenericService.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -55,14 +55,7 @@ namespace Grpc.IntegrationTesting
{
var serverConfig = new ServerConfig
{
ServerType = ServerType.ASYNC_SERVER,
PayloadConfig = new PayloadConfig
{
SimpleParams = new SimpleProtoParams
{
RespSize = 100
}
}
ServerType = ServerType.ASYNC_SERVER
};
serverRunner = ServerRunners.CreateStarted(serverConfig);
}
@ -88,7 +81,8 @@ namespace Grpc.IntegrationTesting
{
SimpleParams = new SimpleProtoParams
{
ReqSize = 100
ReqSize = 100,
RespSize = 100
}
},
HistogramParams = new HistogramParams

@ -41,6 +41,7 @@ using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
using NUnit.Framework;
using Grpc.Testing;
@ -50,27 +51,78 @@ namespace Grpc.IntegrationTesting
/// <summary>
/// Helper methods to start server runners for performance testing.
/// </summary>
public static class ServerRunners
public class ServerRunners
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ServerRunners>();
/// <summary>
/// Creates a started server runner.
/// </summary>
public static IServerRunner CreateStarted(ServerConfig config)
{
GrpcPreconditions.CheckArgument(config.ServerType == ServerType.ASYNC_SERVER);
Logger.Debug("ServerConfig: {0}", config);
var credentials = config.SecurityParams != null ? TestCredentials.CreateSslServerCredentials() : ServerCredentials.Insecure;
// TODO: qps_driver needs to setup payload properly...
int responseSize = config.PayloadConfig != null ? config.PayloadConfig.SimpleParams.RespSize : 0;
if (config.AsyncServerThreads != 0)
{
Logger.Warning("ServerConfig.AsyncServerThreads is not supported for C#. Ignoring the value");
}
if (config.CoreLimit != 0)
{
Logger.Warning("ServerConfig.CoreLimit is not supported for C#. Ignoring the value");
}
if (config.CoreList.Count > 0)
{
Logger.Warning("ServerConfig.CoreList is not supported for C#. Ignoring the value");
}
ServerServiceDefinition service = null;
if (config.ServerType == ServerType.ASYNC_SERVER)
{
GrpcPreconditions.CheckArgument(config.PayloadConfig == null,
"ServerConfig.PayloadConfig shouldn't be set for BenchmarkService based server.");
service = BenchmarkService.BindService(new BenchmarkServiceImpl());
}
else if (config.ServerType == ServerType.ASYNC_GENERIC_SERVER)
{
var genericService = new GenericServiceImpl(config.PayloadConfig.BytebufParams.RespSize);
service = GenericService.BindHandler(genericService.StreamingCall);
}
else
{
throw new ArgumentException("Unsupported ServerType");
}
var server = new Server
{
Services = { BenchmarkService.BindService(new BenchmarkServiceImpl(responseSize)) },
Services = { service },
Ports = { new ServerPort("[::]", config.Port, credentials) }
};
server.Start();
return new ServerRunnerImpl(server);
}
private class GenericServiceImpl
{
readonly byte[] response;
public GenericServiceImpl(int responseSize)
{
this.response = new byte[responseSize];
}
/// <summary>
/// Generic streaming call handler.
/// </summary>
public async Task StreamingCall(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext context)
{
await requestStream.ForEachAsync(async request =>
{
await responseStream.WriteAsync(response);
});
}
}
}
/// <summary>
@ -119,6 +171,5 @@ namespace Grpc.IntegrationTesting
{
return server.ShutdownAsync();
}
}
}
}

Loading…
Cancel
Save