Merge pull request #722 from jtattermusch/csharp_performance

Added support for true synchronous unary call & some performance tests.
pull/785/head
Tim Emiola 10 years ago
commit 517aa0c535
  1. 11
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  2. 1
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  3. 9
      src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
  4. 145
      src/csharp/Grpc.Core.Tests/PInvokeTest.cs
  5. 15
      src/csharp/Grpc.Core/Calls.cs
  6. 2
      src/csharp/Grpc.Core/Grpc.Core.csproj
  7. 31
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  8. 10
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  9. 3
      src/csharp/Grpc.Core/RpcException.cs
  10. 2
      src/csharp/Grpc.Core/ServerCallHandler.cs
  11. 68
      src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs
  12. 57
      src/csharp/Grpc.Core/Utils/ExceptionHelper.cs
  13. 11
      src/csharp/Grpc.IntegrationTesting/Client.cs
  14. 25
      src/csharp/ext/grpc_csharp_ext.c

@ -101,15 +101,8 @@ namespace Grpc.Core.Tests
using (Channel channel = new Channel(host + ":" + port))
{
var call = new Call<string, string>(unaryEchoStringMethod, channel);
var stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = 0; i < 1000; i++)
{
Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken));
}
stopwatch.Stop();
Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms");
BenchmarkUtil.RunBenchmark(100, 1000,
() => { Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); });
}
server.ShutdownAsync().Wait();

@ -41,6 +41,7 @@
<Compile Include="ServerTest.cs" />
<Compile Include="GrpcEnvironmentTest.cs" />
<Compile Include="TimespecTest.cs" />
<Compile Include="PInvokeTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -41,14 +41,16 @@ namespace Grpc.Core.Tests
public class GrpcEnvironmentTest
{
[Test]
public void InitializeAndShutdownGrpcEnvironment() {
public void InitializeAndShutdownGrpcEnvironment()
{
GrpcEnvironment.Initialize();
Assert.IsNotNull(GrpcEnvironment.ThreadPool.CompletionQueue);
GrpcEnvironment.Shutdown();
}
[Test]
public void SubsequentInvocations() {
public void SubsequentInvocations()
{
GrpcEnvironment.Initialize();
GrpcEnvironment.Initialize();
GrpcEnvironment.Shutdown();
@ -56,7 +58,8 @@ namespace Grpc.Core.Tests
}
[Test]
public void InitializeAfterShutdown() {
public void InitializeAfterShutdown()
{
GrpcEnvironment.Initialize();
var tp1 = GrpcEnvironment.ThreadPool;
GrpcEnvironment.Shutdown();

@ -0,0 +1,145 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
using System.Runtime.InteropServices;
namespace Grpc.Core.Tests
{
public class PInvokeTest
{
int counter;
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_test_nop(IntPtr ptr);
[TestFixtureSetUp]
public void Init()
{
GrpcEnvironment.Initialize();
}
[TestFixtureTearDown]
public void Cleanup()
{
GrpcEnvironment.Shutdown();
}
/// <summary>
/// (~1.26us .NET Windows)
/// </summary>
[Test]
public void CompletionQueueCreateDestroyBenchmark()
{
BenchmarkUtil.RunBenchmark(
100000, 1000000,
() => {
CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create();
cq.Dispose();
}
);
}
/// <summary>
/// Approximate results:
/// (~80ns Mono Linux)
/// (~110ns .NET Windows)
/// </summary>
[Test]
public void NativeCallbackBenchmark()
{
CompletionCallbackDelegate handler = Handler;
counter = 0;
BenchmarkUtil.RunBenchmark(
1000000, 10000000,
() => {
grpcsharp_test_callback(handler);
}
);
Assert.AreNotEqual(0, counter);
}
/// <summary>
/// Creating a new native-to-managed callback has significant overhead
/// compared to using an existing one. We need to be aware of this.
/// (~50us on Mono Linux!!!)
/// (~1.1us on .NET Windows)
/// </summary>
[Test]
public void NewNativeCallbackBenchmark()
{
counter = 0;
BenchmarkUtil.RunBenchmark(
10000, 10000,
() => {
grpcsharp_test_callback(new CompletionCallbackDelegate(Handler));
}
);
Assert.AreNotEqual(0, counter);
}
/// <summary>
/// Tests overhead of a simple PInvoke call.
/// (~46ns .NET Windows)
/// </summary>
[Test]
public void NopPInvokeBenchmark()
{
CompletionCallbackDelegate handler = Handler;
BenchmarkUtil.RunBenchmark(
1000000, 100000000,
() => {
grpcsharp_test_nop(IntPtr.Zero);
}
);
}
private void Handler(GRPCOpError op, IntPtr ptr) {
counter ++;
}
}
}

@ -47,19 +47,8 @@ namespace Grpc.Core
{
public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{
//TODO: implement this in real synchronous style.
try {
return AsyncUnaryCall(call, req, token).Result;
} catch(AggregateException ae) {
foreach (var e in ae.InnerExceptions)
{
if (e is RpcException)
{
throw e;
}
}
throw;
}
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
return asyncCall.UnaryCall(call.Channel, call.MethodName, req);
}
public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)

@ -62,6 +62,8 @@
<Compile Include="Internal\ClientStreamingInputObserver.cs" />
<Compile Include="Internal\ServerStreamingOutputObserver.cs" />
<Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" />
<Compile Include="Utils\BenchmarkUtil.cs" />
<Compile Include="Utils\ExceptionHelper.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -38,6 +38,7 @@ using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
@ -112,6 +113,36 @@ namespace Grpc.Core.Internal
InitializeInternal(call, true);
}
public TRead UnaryCall(Channel channel, String methodName, TWrite msg)
{
using(CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{
// TODO: handle serialization error...
byte[] payload = serializer(msg);
unaryResponseTcs = new TaskCompletionSource<TRead>();
lock (myLock)
{
Initialize(channel, cq, methodName);
started = true;
halfcloseRequested = true;
readingDone = true;
}
call.BlockingUnary(cq, payload, unaryResponseHandler);
try
{
// Once the blocking call returns, the result should be available synchronously.
return unaryResponseTcs.Task.Result;
}
catch (AggregateException ae)
{
throw ExceptionHelper.UnwrapRpcException(ae);
}
}
}
public Task<TRead> UnaryCallAsync(TWrite msg)
{
lock (myLock)

@ -62,6 +62,11 @@ namespace Grpc.Core.Internal
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
@ -113,6 +118,11 @@ namespace Grpc.Core.Internal
AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length)));
}
public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback)
{
grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong) payload.Length));
}
public void StartClientStreaming(CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_start_client_streaming(this, callback));

@ -49,7 +49,8 @@ namespace Grpc.Core
this.status = status;
}
public Status Status {
public Status Status
{
get
{
return status;

@ -111,6 +111,8 @@ namespace Grpc.Core
var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver<byte[]>());
// TODO: this makes the call finish before all reads can be done which causes trouble
// in AsyncCall.HandleReadFinished callback. Revisit this.
asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, "No such method.")).Wait();
finishedTask.Wait();

@ -0,0 +1,68 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics;
namespace Grpc.Core.Utils
{
public static class BenchmarkUtil
{
/// <summary>
/// Runs a simple benchmark preceded by warmup phase.
/// </summary>
public static void RunBenchmark(int warmupIterations, int benchmarkIterations, Action action)
{
Console.WriteLine("Warmup iterations: " + warmupIterations);
for (int i = 0; i < warmupIterations; i++)
{
action();
}
Console.WriteLine("Benchmark iterations: " + benchmarkIterations);
var stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = 0; i < benchmarkIterations; i++)
{
action();
}
stopwatch.Stop();
Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms");
Console.WriteLine("Ops per second: " + (int) ((double) benchmarkIterations * 1000 / stopwatch.ElapsedMilliseconds));
}
}
}

@ -0,0 +1,57 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
namespace Grpc.Core.Utils
{
public static class ExceptionHelper
{
/// <summary>
/// If inner exceptions contain RpcException, rethrows it.
/// Otherwise, rethrows the original aggregate exception.
/// Always throws, the exception return type is here only to make the.
/// </summary>
public static Exception UnwrapRpcException(AggregateException ae) {
foreach (var e in ae.InnerExceptions)
{
if (e is RpcException)
{
throw e;
}
}
throw ae;
}
}
}

@ -33,7 +33,9 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Google.ProtocolBuffers;
using Grpc.Core;
using Grpc.Core.Utils;
@ -128,6 +130,9 @@ namespace Grpc.IntegrationTesting
case "empty_stream":
RunEmptyStream(client);
break;
case "benchmark_empty_unary":
RunBenchmarkEmptyUnary(client);
break;
default:
throw new ArgumentException("Unknown test case " + testCase);
}
@ -267,6 +272,12 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("Passed!");
}
// This is not an official interop test, but it's useful.
private void RunBenchmarkEmptyUnary(TestServiceGrpc.ITestServiceClient client)
{
BenchmarkUtil.RunBenchmark(10000, 10000,
() => { client.EmptyCall(Empty.DefaultInstance);});
}
private Payload CreateZerosPayload(int size) {
return Payload.CreateBuilder().SetBody(ByteString.CopyFrom(new byte[size])).Build();

@ -343,6 +343,18 @@ grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
/* Synchronous unary call */
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_call_blocking_unary(grpc_call *call, grpc_completion_queue *dedicated_cq, callback_funcptr callback,
const char *send_buffer, size_t send_buffer_len) {
GPR_ASSERT(grpcsharp_call_start_unary(call, callback, send_buffer, send_buffer_len) == GRPC_CALL_OK);
/* TODO: we would like to use pluck, but we don't know the tag */
GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) == GRPC_OP_COMPLETE);
grpc_completion_queue_shutdown(dedicated_cq);
GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) == GRPC_QUEUE_SHUTDOWN);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_client_streaming(grpc_call *call,
callback_funcptr callback) {
@ -566,3 +578,16 @@ grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq,
server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details),
&(ctx->server_rpc_new.request_metadata), cq, ctx);
}
/* For testing */
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_test_callback(callback_funcptr callback) {
callback(GRPC_OP_OK, NULL);
}
/* For testing */
GPR_EXPORT void *GPR_CALLTYPE
grpcsharp_test_nop(void *ptr) {
return ptr;
}

Loading…
Cancel
Save