Merge pull request #6643 from jtattermusch/csharp_profiling2

Simplify profiling in C# qps clients.
pull/6663/head
Jan Tattermusch 9 years ago
commit ec3e616e33
  1. 6
      src/csharp/Grpc.Core/Properties/AssemblyInfo.cs
  2. 55
      src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
  3. 2
      src/csharp/Grpc.IntegrationTesting/WorkerServiceImpl.cs

@ -16,6 +16,12 @@ using System.Runtime.CompilerServices;
"0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" +
"27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" +
"71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")]
[assembly: InternalsVisibleTo("Grpc.IntegrationTesting,PublicKey=" +
"00240000048000009400000006020000002400005253413100040000010001002f5797a92c6fcde81bd4098f43" +
"0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" +
"27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" +
"71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")]
#else
[assembly: InternalsVisibleTo("Grpc.Core.Tests")]
[assembly: InternalsVisibleTo("Grpc.IntegrationTesting")]
#endif

@ -32,6 +32,7 @@
#endregion
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
@ -41,7 +42,9 @@ using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Profiling;
using Grpc.Core.Utils;
using NUnit.Framework;
using Grpc.Testing;
@ -55,6 +58,15 @@ namespace Grpc.IntegrationTesting
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientRunners>();
// Profilers to use for clients.
static readonly BlockingCollection<BasicProfiler> profilers = new BlockingCollection<BasicProfiler>();
internal static void AddProfiler(BasicProfiler profiler)
{
GrpcPreconditions.CheckNotNull(profiler);
profilers.Add(profiler);
}
/// <summary>
/// Creates a started client runner.
/// </summary>
@ -83,7 +95,8 @@ namespace Grpc.IntegrationTesting
config.OutstandingRpcsPerChannel,
config.LoadParams,
config.PayloadConfig,
config.HistogramParams);
config.HistogramParams,
() => GetNextProfiler());
}
private static List<Channel> CreateChannels(int clientChannels, IEnumerable<string> serverTargets, SecurityParams securityParams)
@ -110,9 +123,16 @@ namespace Grpc.IntegrationTesting
}
return result;
}
private static BasicProfiler GetNextProfiler()
{
BasicProfiler result = null;
profilers.TryTake(out result);
return result;
}
}
public class ClientRunnerImpl : IClientRunner
internal class ClientRunnerImpl : IClientRunner
{
const double SecondsToNanos = 1e9;
@ -125,8 +145,9 @@ namespace Grpc.IntegrationTesting
readonly List<Task> runnerTasks;
readonly CancellationTokenSource stoppedCts = new CancellationTokenSource();
readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
readonly AtomicCounter statsResetCount = new AtomicCounter();
public ClientRunnerImpl(List<Channel> channels, ClientType clientType, RpcType rpcType, int outstandingRpcsPerChannel, LoadParams loadParams, PayloadConfig payloadConfig, HistogramParams histogramParams)
public ClientRunnerImpl(List<Channel> channels, ClientType clientType, RpcType rpcType, int outstandingRpcsPerChannel, LoadParams loadParams, PayloadConfig payloadConfig, HistogramParams histogramParams, Func<BasicProfiler> profilerFactory)
{
GrpcPreconditions.CheckArgument(outstandingRpcsPerChannel > 0, "outstandingRpcsPerChannel");
GrpcPreconditions.CheckNotNull(histogramParams, "histogramParams");
@ -142,7 +163,8 @@ namespace Grpc.IntegrationTesting
for (int i = 0; i < outstandingRpcsPerChannel; i++)
{
var timer = CreateTimer(loadParams, 1.0 / this.channels.Count / outstandingRpcsPerChannel);
this.runnerTasks.Add(RunClientAsync(channel, timer));
var optionalProfiler = profilerFactory();
this.runnerTasks.Add(RunClientAsync(channel, timer, optionalProfiler));
}
}
}
@ -152,6 +174,11 @@ namespace Grpc.IntegrationTesting
var histogramData = histogram.GetSnapshot(reset);
var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds;
if (reset)
{
statsResetCount.Increment();
}
// TODO: populate user time and system time
return new ClientStats
{
@ -175,14 +202,28 @@ namespace Grpc.IntegrationTesting
}
}
private void RunUnary(Channel channel, IInterarrivalTimer timer)
private void RunUnary(Channel channel, IInterarrivalTimer timer, BasicProfiler optionalProfiler)
{
if (optionalProfiler != null)
{
Profilers.SetForCurrentThread(optionalProfiler);
}
bool profilerReset = false;
var client = BenchmarkService.NewClient(channel);
var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
while (!stoppedCts.Token.IsCancellationRequested)
{
// after the first stats reset, also reset the profiler.
if (optionalProfiler != null && !profilerReset && statsResetCount.Count > 0)
{
optionalProfiler.Reset();
profilerReset = true;
}
stopwatch.Restart();
client.UnaryCall(request);
stopwatch.Stop();
@ -268,7 +309,7 @@ namespace Grpc.IntegrationTesting
}
}
private Task RunClientAsync(Channel channel, IInterarrivalTimer timer)
private Task RunClientAsync(Channel channel, IInterarrivalTimer timer, BasicProfiler optionalProfiler)
{
if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams)
{
@ -282,7 +323,7 @@ namespace Grpc.IntegrationTesting
{
GrpcPreconditions.CheckArgument(rpcType == RpcType.Unary, "Sync client can only be used for Unary calls in C#");
// create a dedicated thread for the synchronous client
return Task.Factory.StartNew(() => RunUnary(channel, timer), TaskCreationOptions.LongRunning);
return Task.Factory.StartNew(() => RunUnary(channel, timer, optionalProfiler), TaskCreationOptions.LongRunning);
}
else if (clientType == ClientType.AsyncClient)
{

@ -64,7 +64,7 @@ namespace Grpc.Testing
{
Stats = runner.GetStats(false),
Port = runner.BoundPort,
Cores = 0, // TODO: set number of cores
Cores = Environment.ProcessorCount,
});
while (await requestStream.MoveNext())

Loading…
Cancel
Save