From d7079b20080646a11a4878ab156912777fac3248 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 16 May 2017 19:55:57 +0200 Subject: [PATCH 1/2] cache byteBufRequest for generic C# qps client --- src/csharp/Grpc.IntegrationTesting/ClientRunners.cs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index c9f7c42b71d..85d58ec287e 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -140,6 +140,7 @@ namespace Grpc.IntegrationTesting readonly ClientType clientType; readonly RpcType rpcType; readonly PayloadConfig payloadConfig; + readonly Lazy cachedByteBufferRequest; readonly Histogram histogram; readonly List runnerTasks; @@ -155,6 +156,7 @@ namespace Grpc.IntegrationTesting this.clientType = clientType; this.rpcType = rpcType; this.payloadConfig = payloadConfig; + this.cachedByteBufferRequest = new Lazy(() => new byte[payloadConfig.BytebufParams.ReqSize]); this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible); this.runnerTasks = new List(); @@ -286,7 +288,7 @@ namespace Grpc.IntegrationTesting private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer) { - var request = CreateByteBufferRequest(); + var request = cachedByteBufferRequest.Value; var stopwatch = new Stopwatch(); var callDetails = new CallInvocationDetails(channel, GenericService.StreamingCallMethod, new CallOptions()); @@ -351,11 +353,6 @@ namespace Grpc.IntegrationTesting }; } - private byte[] CreateByteBufferRequest() - { - return new byte[payloadConfig.BytebufParams.ReqSize]; - } - private static Payload CreateZerosPayload(int size) { return new Payload { Body = ByteString.CopyFrom(new byte[size]) }; From 19e3d3ba9f1c8e71793e2ad4f648c363d85438f2 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 17 May 2017 11:05:06 +0200 Subject: [PATCH 2/2] get rid of Histogram lock contention in qps worker --- .../Grpc.IntegrationTesting/ClientRunners.cs | 19 ++++--- .../Grpc.IntegrationTesting/Histogram.cs | 57 ++++++++++++++----- .../Grpc.IntegrationTesting/HistogramTest.cs | 26 ++++++++- 3 files changed, 79 insertions(+), 23 deletions(-) diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index 85d58ec287e..8a44f8d68f6 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -141,7 +141,7 @@ namespace Grpc.IntegrationTesting readonly RpcType rpcType; readonly PayloadConfig payloadConfig; readonly Lazy cachedByteBufferRequest; - readonly Histogram histogram; + readonly ThreadLocal threadLocalHistogram; readonly List runnerTasks; readonly CancellationTokenSource stoppedCts = new CancellationTokenSource(); @@ -157,7 +157,7 @@ namespace Grpc.IntegrationTesting this.rpcType = rpcType; this.payloadConfig = payloadConfig; this.cachedByteBufferRequest = new Lazy(() => new byte[payloadConfig.BytebufParams.ReqSize]); - this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible); + this.threadLocalHistogram = new ThreadLocal(() => new Histogram(histogramParams.Resolution, histogramParams.MaxPossible), true); this.runnerTasks = new List(); foreach (var channel in this.channels) @@ -173,7 +173,12 @@ namespace Grpc.IntegrationTesting public ClientStats GetStats(bool reset) { - var histogramData = histogram.GetSnapshot(reset); + var histogramData = new HistogramData(); + foreach (var hist in threadLocalHistogram.Values) + { + hist.GetSnapshot(histogramData, reset); + } + var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds; if (reset) @@ -234,7 +239,7 @@ namespace Grpc.IntegrationTesting stopwatch.Stop(); // spec requires data point in nanoseconds. - histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); timer.WaitForNext(); } @@ -253,7 +258,7 @@ namespace Grpc.IntegrationTesting stopwatch.Stop(); // spec requires data point in nanoseconds. - histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); await timer.WaitForNextAsync(); } @@ -275,7 +280,7 @@ namespace Grpc.IntegrationTesting stopwatch.Stop(); // spec requires data point in nanoseconds. - histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); await timer.WaitForNextAsync(); } @@ -303,7 +308,7 @@ namespace Grpc.IntegrationTesting stopwatch.Stop(); // spec requires data point in nanoseconds. - histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + threadLocalHistogram.Value.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); await timer.WaitForNextAsync(); } diff --git a/src/csharp/Grpc.IntegrationTesting/Histogram.cs b/src/csharp/Grpc.IntegrationTesting/Histogram.cs index 28d1f078a93..9d33c497e6b 100644 --- a/src/csharp/Grpc.IntegrationTesting/Histogram.cs +++ b/src/csharp/Grpc.IntegrationTesting/Histogram.cs @@ -84,15 +84,27 @@ namespace Grpc.IntegrationTesting } } - /// - /// Gets snapshot of stats and reset + /// Gets snapshot of stats and optionally resets the histogram. /// public HistogramData GetSnapshot(bool reset = false) { lock (myLock) { - return GetSnapshotUnsafe(reset); + var histogramData = new HistogramData(); + GetSnapshotUnsafe(histogramData, reset); + return histogramData; + } + } + + /// + /// Merges snapshot of stats into mergeTo and optionally resets the histogram. + /// + public void GetSnapshot(HistogramData mergeTo, bool reset) + { + lock (myLock) + { + GetSnapshotUnsafe(mergeTo, reset); } } @@ -117,24 +129,39 @@ namespace Grpc.IntegrationTesting this.buckets[FindBucket(value)]++; } - private HistogramData GetSnapshotUnsafe(bool reset) + private void GetSnapshotUnsafe(HistogramData mergeTo, bool reset) { - var data = new HistogramData + GrpcPreconditions.CheckArgument(mergeTo.Bucket.Count == 0 || mergeTo.Bucket.Count == buckets.Length); + if (mergeTo.Count == 0) { - Count = count, - Sum = sum, - SumOfSquares = sumOfSquares, - MinSeen = min, - MaxSeen = max, - Bucket = { buckets } - }; + mergeTo.MinSeen = min; + mergeTo.MaxSeen = max; + } + else + { + mergeTo.MinSeen = Math.Min(mergeTo.MinSeen, min); + mergeTo.MaxSeen = Math.Max(mergeTo.MaxSeen, max); + } + mergeTo.Count += count; + mergeTo.Sum += sum; + mergeTo.SumOfSquares += sumOfSquares; - if (reset) + if (mergeTo.Bucket.Count == 0) { - ResetUnsafe(); + mergeTo.Bucket.AddRange(buckets); + } + else + { + for (int i = 0; i < buckets.Length; i++) + { + mergeTo.Bucket[i] += buckets[i]; + } } - return data; + if (reset) + { + ResetUnsafe(); + } } private void ResetUnsafe() diff --git a/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs b/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs index fa160cbd15b..e8a2ed0c5b9 100644 --- a/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/HistogramTest.cs @@ -73,13 +73,37 @@ namespace Grpc.IntegrationTesting { var hist = new Histogram(0.01, 60e9); hist.AddObservation(-0.5); // should be in the first bucket - hist.AddObservation(1e12); // should be in the last bucket + hist.AddObservation(1e12); // should be in the last bucket var data = hist.GetSnapshot(); Assert.AreEqual(1, data.Bucket[0]); Assert.AreEqual(1, data.Bucket[data.Bucket.Count - 1]); } + [Test] + public void MergeSnapshots() + { + var data = new HistogramData(); + + var hist1 = new Histogram(0.01, 60e9); + hist1.AddObservation(-0.5); // should be in the first bucket + hist1.AddObservation(1e12); // should be in the last bucket + hist1.GetSnapshot(data, false); + + var hist2 = new Histogram(0.01, 60e9); + hist2.AddObservation(10000); + hist2.AddObservation(11000); + hist2.GetSnapshot(data, false); + + Assert.AreEqual(4, data.Count); + Assert.AreEqual(-0.5, data.MinSeen); + Assert.AreEqual(1e12, data.MaxSeen); + Assert.AreEqual(1, data.Bucket[0]); + Assert.AreEqual(1, data.Bucket[925]); + Assert.AreEqual(1, data.Bucket[935]); + Assert.AreEqual(1, data.Bucket[data.Bucket.Count - 1]); + } + [Test] public void Reset() {