diff --git a/src/csharp/Grpc.Microbenchmarks/CommonThreadedBase.cs b/src/csharp/Grpc.Microbenchmarks/CommonThreadedBase.cs index c6bdf471b8a..c42c6481ffb 100644 --- a/src/csharp/Grpc.Microbenchmarks/CommonThreadedBase.cs +++ b/src/csharp/Grpc.Microbenchmarks/CommonThreadedBase.cs @@ -17,6 +17,8 @@ #endregion using System; +using System.Collections.Concurrent; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; @@ -35,22 +37,42 @@ namespace Grpc.Microbenchmarks { protected virtual bool NeedsEnvironment => true; - [Params(1, 2, 4, 8, 12)] + [Params(1, 2, 4, 6)] public int ThreadCount { get; set; } protected GrpcEnvironment Environment { get; private set; } + private List workers; + + private List> dispatchQueues; + [GlobalSetup] public virtual void Setup() { - ThreadPool.GetMinThreads(out var workers, out var iocp); - if (workers <= ThreadCount) ThreadPool.SetMinThreads(ThreadCount + 1, iocp); + dispatchQueues = new List>(); + workers = new List(); + for (int i = 0; i < ThreadCount; i++) + { + var dispatchQueue = new BlockingCollection(); + var thread = new Thread(new ThreadStart(() => WorkerThreadBody(dispatchQueue))); + thread.Name = string.Format("threaded benchmark worker {0}", i); + thread.Start(); + workers.Add(thread); + dispatchQueues.Add(dispatchQueue); + } + if (NeedsEnvironment) Environment = GrpcEnvironment.AddRef(); } [GlobalCleanup] public virtual void Cleanup() { + for (int i = 0; i < ThreadCount; i++) + { + dispatchQueues[i].Add(null); // null action request termination of the worker thread. + workers[i].Join(); + } + if (Environment != null) { Environment = null; @@ -58,9 +80,50 @@ namespace Grpc.Microbenchmarks } } + /// + /// Runs the operation in parallel (once on each worker thread). + /// This method tries to incur as little + /// overhead as possible, but there is some inherent overhead + /// that is hard to avoid (thread hop etc.). Therefore it is strongly + /// recommended that the benchmarked operation runs long enough to + /// make this overhead negligible. + /// protected void RunConcurrent(Action operation) { - Parallel.For(0, ThreadCount, _ => operation()); + var workItemTasks = new Task[ThreadCount]; + for (int i = 0; i < ThreadCount; i++) + { + var tcs = new TaskCompletionSource(); + var workItem = new Action(() => + { + try + { + operation(); + tcs.SetResult(null); + } + catch (Exception e) + { + tcs.SetException(e); + } + }); + workItemTasks[i] = tcs.Task; + dispatchQueues[i].Add(workItem); + } + Task.WaitAll(workItemTasks); + } + + private void WorkerThreadBody(BlockingCollection dispatchQueue) + { + while(true) + { + var workItem = dispatchQueue.Take(); + if (workItem == null) + { + // stop the worker if null action was provided + break; + } + workItem(); + } } } } diff --git a/src/csharp/Grpc.Microbenchmarks/ScalabityExampleBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/ScalabityExampleBenchmark.cs new file mode 100644 index 00000000000..1ef117c83e7 --- /dev/null +++ b/src/csharp/Grpc.Microbenchmarks/ScalabityExampleBenchmark.cs @@ -0,0 +1,56 @@ +#region Copyright notice and license + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Runtime.InteropServices; +using BenchmarkDotNet.Attributes; +using Grpc.Core.Internal; + +namespace Grpc.Microbenchmarks +{ + public class ScalabilityExampleBenchmark : CommonThreadedBase + { + protected override bool NeedsEnvironment => false; + + // An example of testing scalability of a method that scales perfectly. + // This method provides a baseline for how well can CommonThreadedBase + // measure scalability. + const int Iterations = 50 * 1000 * 1000; // High number to make the overhead of RunConcurrent negligible. + [Benchmark(OperationsPerInvoke = Iterations)] + public void PerfectScalingExample() + { + RunConcurrent(() => { RunBody(); }); + } + + private int RunBody() + { + int result = 0; + for (int i = 0; i < Iterations; i++) + { + // perform some operation that is completely independent from + // other threads and therefore should scale perfectly if given + // a dedicated thread. + for (int j = 0; j < 100; j++) + { + result = result ^ i ^ j ; + } + } + return result; + } + } +}