Added support for true synchronous unary call and added some performance tests.

pull/722/head
Jan Tattermusch 10 years ago
parent 7c15ee88a2
commit 50faa8f78b
  1. 11
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  2. 1
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  3. 139
      src/csharp/Grpc.Core.Tests/PInvokeTest.cs
  4. 27
      src/csharp/Grpc.Core/Calls.cs
  5. 1
      src/csharp/Grpc.Core/Grpc.Core.csproj
  6. 25
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  7. 10
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  8. 68
      src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs
  9. 11
      src/csharp/Grpc.IntegrationTesting/Client.cs
  10. 25
      src/csharp/ext/grpc_csharp_ext.c

@ -101,15 +101,8 @@ namespace Grpc.Core.Tests
using (Channel channel = new Channel(host + ":" + port)) using (Channel channel = new Channel(host + ":" + port))
{ {
var call = new Call<string, string>(unaryEchoStringMethod, channel); var call = new Call<string, string>(unaryEchoStringMethod, channel);
BenchmarkUtil.RunBenchmark(100, 1000,
var stopwatch = new Stopwatch(); () => { Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); });
stopwatch.Start();
for (int i = 0; i < 1000; i++)
{
Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken));
}
stopwatch.Stop();
Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms");
} }
server.ShutdownAsync().Wait(); server.ShutdownAsync().Wait();

@ -41,6 +41,7 @@
<Compile Include="ServerTest.cs" /> <Compile Include="ServerTest.cs" />
<Compile Include="GrpcEnvironmentTest.cs" /> <Compile Include="GrpcEnvironmentTest.cs" />
<Compile Include="TimespecTest.cs" /> <Compile Include="TimespecTest.cs" />
<Compile Include="PInvokeTest.cs" />
</ItemGroup> </ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup> <ItemGroup>

@ -0,0 +1,139 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
using System.Runtime.InteropServices;
namespace Grpc.Core.Tests
{
public class PInvokeTest
{
int counter;
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_test_nop(IntPtr ptr);
[TestFixtureSetUp]
public void Init()
{
GrpcEnvironment.Initialize();
}
[TestFixtureTearDown]
public void Cleanup()
{
GrpcEnvironment.Shutdown();
}
[Test]
public void CompletionQueueCreateDestroyBenchmark()
{
BenchmarkUtil.RunBenchmark(
100000, 1000000,
() => {
CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create();
cq.Dispose();
}
);
}
/// <summary>
/// Approximate results:
/// (mono ~80ns)
/// </summary>
[Test]
public void NativeCallbackBenchmark()
{
CompletionCallbackDelegate handler = Handler;
counter = 0;
BenchmarkUtil.RunBenchmark(
1000000, 10000000,
() => {
grpcsharp_test_callback(handler);
}
);
Assert.AreNotEqual(0, counter);
}
/// <summary>
/// Creating a new native-to-managed callback has significant overhead
/// compared to using an existing one. We need to be aware of this.
/// (~50us on mono)
/// </summary>
[Test]
public void NewNativeCallbackBenchmark()
{
counter = 0;
BenchmarkUtil.RunBenchmark(
10000, 10000,
() => {
grpcsharp_test_callback(new CompletionCallbackDelegate(Handler));
}
);
Assert.AreNotEqual(0, counter);
}
/// <summary>
/// Tests overhead of a simple PInvoke call.
/// </summary>
[Test]
public void NopPInvokeBenchmark()
{
CompletionCallbackDelegate handler = Handler;
BenchmarkUtil.RunBenchmark(
1000000, 100000000,
() => {
grpcsharp_test_nop(IntPtr.Zero);
}
);
}
private void Handler(GRPCOpError op, IntPtr ptr) {
counter ++;
}
}
}

@ -47,19 +47,22 @@ namespace Grpc.Core
{ {
public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
return asyncCall.UnaryCall(call.Channel, call.MethodName, req);
//TODO: implement this in real synchronous style. //TODO: implement this in real synchronous style.
try { // try {
return AsyncUnaryCall(call, req, token).Result; // return AsyncUnaryCall(call, req, token).Result;
} catch(AggregateException ae) { // } catch(AggregateException ae) {
foreach (var e in ae.InnerExceptions) // foreach (var e in ae.InnerExceptions)
{ // {
if (e is RpcException) // if (e is RpcException)
{ // {
throw e; // throw e;
} // }
} // }
throw; // throw;
} // }
} }
public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)

@ -62,6 +62,7 @@
<Compile Include="Internal\ClientStreamingInputObserver.cs" /> <Compile Include="Internal\ClientStreamingInputObserver.cs" />
<Compile Include="Internal\ServerStreamingOutputObserver.cs" /> <Compile Include="Internal\ServerStreamingOutputObserver.cs" />
<Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" /> <Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" />
<Compile Include="Utils\BenchmarkUtil.cs" />
</ItemGroup> </ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup> <ItemGroup>

@ -112,6 +112,31 @@ namespace Grpc.Core.Internal
InitializeInternal(call, true); InitializeInternal(call, true);
} }
public TRead UnaryCall(Channel channel, String methodName, TWrite msg)
{
using(CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{
// TODO: handle serialization error...
byte[] payload = serializer(msg);
unaryResponseTcs = new TaskCompletionSource<TRead>();
lock (myLock)
{
Initialize(channel, cq, methodName);
started = true;
halfcloseRequested = true;
readingDone = true;
}
call.BlockingUnary(cq, payload, unaryResponseHandler);
// task should be finished once BlockingUnary returns.
return unaryResponseTcs.Task.Result;
// TODO: unwrap aggregate exception...
}
}
public Task<TRead> UnaryCallAsync(TWrite msg) public Task<TRead> UnaryCallAsync(TWrite msg)
{ {
lock (myLock) lock (myLock)

@ -62,6 +62,11 @@ namespace Grpc.Core.Internal
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len); byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
@ -113,6 +118,11 @@ namespace Grpc.Core.Internal
AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length))); AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length)));
} }
public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback)
{
grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong) payload.Length));
}
public void StartClientStreaming(CompletionCallbackDelegate callback) public void StartClientStreaming(CompletionCallbackDelegate callback)
{ {
AssertCallOk(grpcsharp_call_start_client_streaming(this, callback)); AssertCallOk(grpcsharp_call_start_client_streaming(this, callback));

@ -0,0 +1,68 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics;
namespace Grpc.Core.Utils
{
public static class BenchmarkUtil
{
/// <summary>
/// Runs a simple benchmark preceded by warmup phase.
/// </summary>
public static void RunBenchmark(int warmupIterations, int benchmarkIterations, Action action)
{
Console.WriteLine("Warmup iterations: " + warmupIterations);
for (int i = 0; i < warmupIterations; i++)
{
action();
}
Console.WriteLine("Benchmark iterations: " + benchmarkIterations);
var stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = 0; i < benchmarkIterations; i++)
{
action();
}
stopwatch.Stop();
Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms");
Console.WriteLine("Ops per second: " + (int) ((double) benchmarkIterations * 1000 / stopwatch.ElapsedMilliseconds));
}
}
}

@ -33,7 +33,9 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics;
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Google.ProtocolBuffers; using Google.ProtocolBuffers;
using Grpc.Core; using Grpc.Core;
using Grpc.Core.Utils; using Grpc.Core.Utils;
@ -128,6 +130,9 @@ namespace Grpc.IntegrationTesting
case "empty_stream": case "empty_stream":
RunEmptyStream(client); RunEmptyStream(client);
break; break;
case "benchmark_empty_unary":
RunBenchmarkEmptyUnary(client);
break;
default: default:
throw new ArgumentException("Unknown test case " + testCase); throw new ArgumentException("Unknown test case " + testCase);
} }
@ -267,6 +272,12 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("Passed!"); Console.WriteLine("Passed!");
} }
// This is not an official interop test, but it's useful.
private void RunBenchmarkEmptyUnary(TestServiceGrpc.ITestServiceClient client)
{
BenchmarkUtil.RunBenchmark(10000, 10000,
() => { client.EmptyCall(Empty.DefaultInstance);});
}
private Payload CreateZerosPayload(int size) { private Payload CreateZerosPayload(int size) {
return Payload.CreateBuilder().SetBody(ByteString.CopyFrom(new byte[size])).Build(); return Payload.CreateBuilder().SetBody(ByteString.CopyFrom(new byte[size])).Build();

@ -343,6 +343,18 @@ grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
} }
/* Synchronous unary call */
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_call_blocking_unary(grpc_call *call, grpc_completion_queue *dedicated_cq, callback_funcptr callback,
const char *send_buffer, size_t send_buffer_len) {
GPR_ASSERT(grpcsharp_call_start_unary(call, callback, send_buffer, send_buffer_len) == GRPC_CALL_OK);
/* TODO: we would like to use pluck, but we don't know the tag */
GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) == GRPC_OP_COMPLETE);
grpc_completion_queue_shutdown(dedicated_cq);
GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) == GRPC_QUEUE_SHUTDOWN);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_client_streaming(grpc_call *call, grpcsharp_call_start_client_streaming(grpc_call *call,
callback_funcptr callback) { callback_funcptr callback) {
@ -566,3 +578,16 @@ grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq,
server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details), server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details),
&(ctx->server_rpc_new.request_metadata), cq, ctx); &(ctx->server_rpc_new.request_metadata), cq, ctx);
} }
/* For testing */
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_test_callback(callback_funcptr callback) {
callback(GRPC_OP_OK, NULL);
}
/* For testing */
GPR_EXPORT void *GPR_CALLTYPE
grpcsharp_test_nop(void *ptr) {
return ptr;
}

Loading…
Cancel
Save