Merge pull request #11155 from jtattermusch/csharp_dont_alloc_request

C# qps worker improvements
pull/11158/head^2
Jan Tattermusch 8 years ago committed by GitHub
commit 495cf83c86
  1. 28
      src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
  2. 57
      src/csharp/Grpc.IntegrationTesting/Histogram.cs
  3. 26
      src/csharp/Grpc.IntegrationTesting/HistogramTest.cs

@ -140,7 +140,8 @@ namespace Grpc.IntegrationTesting
readonly ClientType clientType;
readonly RpcType rpcType;
readonly PayloadConfig payloadConfig;
readonly Histogram histogram;
readonly Lazy<byte[]> cachedByteBufferRequest;
readonly ThreadLocal<Histogram> threadLocalHistogram;
readonly List<Task> runnerTasks;
readonly CancellationTokenSource stoppedCts = new CancellationTokenSource();
@ -155,7 +156,8 @@ namespace Grpc.IntegrationTesting
this.clientType = clientType;
this.rpcType = rpcType;
this.payloadConfig = payloadConfig;
this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible);
this.cachedByteBufferRequest = new Lazy<byte[]>(() => new byte[payloadConfig.BytebufParams.ReqSize]);
this.threadLocalHistogram = new ThreadLocal<Histogram>(() => new Histogram(histogramParams.Resolution, histogramParams.MaxPossible), true);
this.runnerTasks = new List<Task>();
foreach (var channel in this.channels)
@ -171,7 +173,12 @@ namespace Grpc.IntegrationTesting
public ClientStats GetStats(bool reset)
{
var histogramData = histogram.GetSnapshot(reset);
var histogramData = new HistogramData();
foreach (var hist in threadLocalHistogram.Values)
{
hist.GetSnapshot(histogramData, reset);
}
var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds;
if (reset)
@ -232,7 +239,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop();
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
timer.WaitForNext();
}
@ -251,7 +258,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop();
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync();
}
@ -273,7 +280,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop();
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync();
}
@ -286,7 +293,7 @@ namespace Grpc.IntegrationTesting
private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer)
{
var request = CreateByteBufferRequest();
var request = cachedByteBufferRequest.Value;
var stopwatch = new Stopwatch();
var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, GenericService.StreamingCallMethod, new CallOptions());
@ -301,7 +308,7 @@ namespace Grpc.IntegrationTesting
stopwatch.Stop();
// spec requires data point in nanoseconds.
histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
await timer.WaitForNextAsync();
}
@ -351,11 +358,6 @@ namespace Grpc.IntegrationTesting
};
}
private byte[] CreateByteBufferRequest()
{
return new byte[payloadConfig.BytebufParams.ReqSize];
}
private static Payload CreateZerosPayload(int size)
{
return new Payload { Body = ByteString.CopyFrom(new byte[size]) };

@ -84,15 +84,27 @@ namespace Grpc.IntegrationTesting
}
}
/// <summary>
/// Gets snapshot of stats and reset
/// Gets snapshot of stats and optionally resets the histogram.
/// </summary>
public HistogramData GetSnapshot(bool reset = false)
{
lock (myLock)
{
return GetSnapshotUnsafe(reset);
var histogramData = new HistogramData();
GetSnapshotUnsafe(histogramData, reset);
return histogramData;
}
}
/// <summary>
/// Merges snapshot of stats into <c>mergeTo</c> and optionally resets the histogram.
/// </summary>
public void GetSnapshot(HistogramData mergeTo, bool reset)
{
lock (myLock)
{
GetSnapshotUnsafe(mergeTo, reset);
}
}
@ -117,24 +129,39 @@ namespace Grpc.IntegrationTesting
this.buckets[FindBucket(value)]++;
}
private HistogramData GetSnapshotUnsafe(bool reset)
private void GetSnapshotUnsafe(HistogramData mergeTo, bool reset)
{
var data = new HistogramData
GrpcPreconditions.CheckArgument(mergeTo.Bucket.Count == 0 || mergeTo.Bucket.Count == buckets.Length);
if (mergeTo.Count == 0)
{
Count = count,
Sum = sum,
SumOfSquares = sumOfSquares,
MinSeen = min,
MaxSeen = max,
Bucket = { buckets }
};
mergeTo.MinSeen = min;
mergeTo.MaxSeen = max;
}
else
{
mergeTo.MinSeen = Math.Min(mergeTo.MinSeen, min);
mergeTo.MaxSeen = Math.Max(mergeTo.MaxSeen, max);
}
mergeTo.Count += count;
mergeTo.Sum += sum;
mergeTo.SumOfSquares += sumOfSquares;
if (reset)
if (mergeTo.Bucket.Count == 0)
{
ResetUnsafe();
mergeTo.Bucket.AddRange(buckets);
}
else
{
for (int i = 0; i < buckets.Length; i++)
{
mergeTo.Bucket[i] += buckets[i];
}
}
return data;
if (reset)
{
ResetUnsafe();
}
}
private void ResetUnsafe()

@ -73,13 +73,37 @@ namespace Grpc.IntegrationTesting
{
var hist = new Histogram(0.01, 60e9);
hist.AddObservation(-0.5); // should be in the first bucket
hist.AddObservation(1e12); // should be in the last bucket
hist.AddObservation(1e12); // should be in the last bucket
var data = hist.GetSnapshot();
Assert.AreEqual(1, data.Bucket[0]);
Assert.AreEqual(1, data.Bucket[data.Bucket.Count - 1]);
}
[Test]
public void MergeSnapshots()
{
var data = new HistogramData();
var hist1 = new Histogram(0.01, 60e9);
hist1.AddObservation(-0.5); // should be in the first bucket
hist1.AddObservation(1e12); // should be in the last bucket
hist1.GetSnapshot(data, false);
var hist2 = new Histogram(0.01, 60e9);
hist2.AddObservation(10000);
hist2.AddObservation(11000);
hist2.GetSnapshot(data, false);
Assert.AreEqual(4, data.Count);
Assert.AreEqual(-0.5, data.MinSeen);
Assert.AreEqual(1e12, data.MaxSeen);
Assert.AreEqual(1, data.Bucket[0]);
Assert.AreEqual(1, data.Bucket[925]);
Assert.AreEqual(1, data.Bucket[935]);
Assert.AreEqual(1, data.Bucket[data.Bucket.Count - 1]);
}
[Test]
public void Reset()
{

Loading…
Cancel
Save