diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index 7e564a2fba5..39be35c2190 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -101,15 +101,8 @@ namespace Grpc.Core.Tests using (Channel channel = new Channel(host + ":" + port)) { var call = new Call(unaryEchoStringMethod, channel); - - var stopwatch = new Stopwatch(); - stopwatch.Start(); - for (int i = 0; i < 1000; i++) - { - Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); - } - stopwatch.Stop(); - Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms"); + BenchmarkUtil.RunBenchmark(100, 1000, + () => { Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); }); } server.ShutdownAsync().Wait(); diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index 687be3c0cb6..a365320f052 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -41,6 +41,7 @@ + diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs index 8d3aef946a6..596918c231c 100644 --- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs +++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs @@ -41,14 +41,16 @@ namespace Grpc.Core.Tests public class GrpcEnvironmentTest { [Test] - public void InitializeAndShutdownGrpcEnvironment() { + public void InitializeAndShutdownGrpcEnvironment() + { GrpcEnvironment.Initialize(); Assert.IsNotNull(GrpcEnvironment.ThreadPool.CompletionQueue); GrpcEnvironment.Shutdown(); } [Test] - public void SubsequentInvocations() { + public void SubsequentInvocations() + { GrpcEnvironment.Initialize(); GrpcEnvironment.Initialize(); GrpcEnvironment.Shutdown(); @@ -56,7 +58,8 @@ namespace Grpc.Core.Tests } [Test] - public void InitializeAfterShutdown() { + public void InitializeAfterShutdown() + { GrpcEnvironment.Initialize(); var tp1 = GrpcEnvironment.ThreadPool; GrpcEnvironment.Shutdown(); diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs new file mode 100644 index 00000000000..282d521ba37 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs @@ -0,0 +1,145 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; +using System.Runtime.InteropServices; + +namespace Grpc.Core.Tests +{ + public class PInvokeTest + { + int counter; + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_test_nop(IntPtr ptr); + + [TestFixtureSetUp] + public void Init() + { + GrpcEnvironment.Initialize(); + } + + [TestFixtureTearDown] + public void Cleanup() + { + GrpcEnvironment.Shutdown(); + } + + /// + /// (~1.26us .NET Windows) + /// + [Test] + public void CompletionQueueCreateDestroyBenchmark() + { + BenchmarkUtil.RunBenchmark( + 100000, 1000000, + () => { + CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create(); + cq.Dispose(); + } + ); + } + + + /// + /// Approximate results: + /// (~80ns Mono Linux) + /// (~110ns .NET Windows) + /// + [Test] + public void NativeCallbackBenchmark() + { + CompletionCallbackDelegate handler = Handler; + + counter = 0; + BenchmarkUtil.RunBenchmark( + 1000000, 10000000, + () => { + grpcsharp_test_callback(handler); + } + ); + Assert.AreNotEqual(0, counter); + } + + /// + /// Creating a new native-to-managed callback has significant overhead + /// compared to using an existing one. We need to be aware of this. + /// (~50us on Mono Linux!!!) + /// (~1.1us on .NET Windows) + /// + [Test] + public void NewNativeCallbackBenchmark() + { + counter = 0; + BenchmarkUtil.RunBenchmark( + 10000, 10000, + () => { + grpcsharp_test_callback(new CompletionCallbackDelegate(Handler)); + } + ); + Assert.AreNotEqual(0, counter); + } + + /// + /// Tests overhead of a simple PInvoke call. + /// (~46ns .NET Windows) + /// + [Test] + public void NopPInvokeBenchmark() + { + CompletionCallbackDelegate handler = Handler; + + BenchmarkUtil.RunBenchmark( + 1000000, 100000000, + () => { + grpcsharp_test_nop(IntPtr.Zero); + } + ); + } + + private void Handler(GRPCOpError op, IntPtr ptr) { + counter ++; + } + } +} + diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index b67332676ac..ee2208e5c25 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -47,19 +47,8 @@ namespace Grpc.Core { public static TResponse BlockingUnaryCall(Call call, TRequest req, CancellationToken token) { - //TODO: implement this in real synchronous style. - try { - return AsyncUnaryCall(call, req, token).Result; - } catch(AggregateException ae) { - foreach (var e in ae.InnerExceptions) - { - if (e is RpcException) - { - throw e; - } - } - throw; - } + var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer); + return asyncCall.UnaryCall(call.Channel, call.MethodName, req); } public static async Task AsyncUnaryCall(Call call, TRequest req, CancellationToken token) diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 4ad32e10e43..135ce26cbd2 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -62,6 +62,8 @@ + + diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 5e96092e270..dadc9ab76cf 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -38,6 +38,7 @@ using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Grpc.Core.Internal; +using Grpc.Core.Utils; namespace Grpc.Core.Internal { @@ -112,6 +113,36 @@ namespace Grpc.Core.Internal InitializeInternal(call, true); } + public TRead UnaryCall(Channel channel, String methodName, TWrite msg) + { + using(CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create()) + { + // TODO: handle serialization error... + byte[] payload = serializer(msg); + + unaryResponseTcs = new TaskCompletionSource(); + + lock (myLock) + { + Initialize(channel, cq, methodName); + started = true; + halfcloseRequested = true; + readingDone = true; + } + call.BlockingUnary(cq, payload, unaryResponseHandler); + + try + { + // Once the blocking call returns, the result should be available synchronously. + return unaryResponseTcs.Task.Result; + } + catch (AggregateException ae) + { + throw ExceptionHelper.UnwrapRpcException(ae); + } + } + } + public Task UnaryCallAsync(TWrite msg) { lock (myLock) diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 659a383b4bd..1c0bc98f062 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -62,6 +62,11 @@ namespace Grpc.Core.Internal [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, byte[] send_buffer, UIntPtr send_buffer_len); + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); @@ -113,6 +118,11 @@ namespace Grpc.Core.Internal AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length))); } + public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback) + { + grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong) payload.Length)); + } + public void StartClientStreaming(CompletionCallbackDelegate callback) { AssertCallOk(grpcsharp_call_start_client_streaming(this, callback)); diff --git a/src/csharp/Grpc.Core/RpcException.cs b/src/csharp/Grpc.Core/RpcException.cs index 5a9d0039bc9..e1cf64ca56a 100644 --- a/src/csharp/Grpc.Core/RpcException.cs +++ b/src/csharp/Grpc.Core/RpcException.cs @@ -49,7 +49,8 @@ namespace Grpc.Core this.status = status; } - public Status Status { + public Status Status + { get { return status; diff --git a/src/csharp/Grpc.Core/ServerCallHandler.cs b/src/csharp/Grpc.Core/ServerCallHandler.cs index 1296947f34d..289f97aecee 100644 --- a/src/csharp/Grpc.Core/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/ServerCallHandler.cs @@ -111,6 +111,8 @@ namespace Grpc.Core var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver()); + // TODO: this makes the call finish before all reads can be done which causes trouble + // in AsyncCall.HandleReadFinished callback. Revisit this. asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, "No such method.")).Wait(); finishedTask.Wait(); diff --git a/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs b/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs new file mode 100644 index 00000000000..3f0dae84cf8 --- /dev/null +++ b/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs @@ -0,0 +1,68 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Collections.Concurrent; +using System.Diagnostics; + +namespace Grpc.Core.Utils +{ + public static class BenchmarkUtil + { + /// + /// Runs a simple benchmark preceded by warmup phase. + /// + public static void RunBenchmark(int warmupIterations, int benchmarkIterations, Action action) + { + Console.WriteLine("Warmup iterations: " + warmupIterations); + for (int i = 0; i < warmupIterations; i++) + { + action(); + } + + Console.WriteLine("Benchmark iterations: " + benchmarkIterations); + var stopwatch = new Stopwatch(); + stopwatch.Start(); + for (int i = 0; i < benchmarkIterations; i++) + { + action(); + } + stopwatch.Stop(); + Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms"); + Console.WriteLine("Ops per second: " + (int) ((double) benchmarkIterations * 1000 / stopwatch.ElapsedMilliseconds)); + } + } +} + diff --git a/src/csharp/Grpc.Core/Utils/ExceptionHelper.cs b/src/csharp/Grpc.Core/Utils/ExceptionHelper.cs new file mode 100644 index 00000000000..18702e1cc42 --- /dev/null +++ b/src/csharp/Grpc.Core/Utils/ExceptionHelper.cs @@ -0,0 +1,57 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core.Utils +{ + public static class ExceptionHelper + { + /// + /// If inner exceptions contain RpcException, rethrows it. + /// Otherwise, rethrows the original aggregate exception. + /// Always throws, the exception return type is here only to make the. + /// + public static Exception UnwrapRpcException(AggregateException ae) { + foreach (var e in ae.InnerExceptions) + { + if (e is RpcException) + { + throw e; + } + } + throw ae; + } + } +} + diff --git a/src/csharp/Grpc.IntegrationTesting/Client.cs b/src/csharp/Grpc.IntegrationTesting/Client.cs index bb650a112de..0c70744cea5 100644 --- a/src/csharp/Grpc.IntegrationTesting/Client.cs +++ b/src/csharp/Grpc.IntegrationTesting/Client.cs @@ -33,7 +33,9 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Text.RegularExpressions; +using System.Threading.Tasks; using Google.ProtocolBuffers; using Grpc.Core; using Grpc.Core.Utils; @@ -128,6 +130,9 @@ namespace Grpc.IntegrationTesting case "empty_stream": RunEmptyStream(client); break; + case "benchmark_empty_unary": + RunBenchmarkEmptyUnary(client); + break; default: throw new ArgumentException("Unknown test case " + testCase); } @@ -267,6 +272,12 @@ namespace Grpc.IntegrationTesting Console.WriteLine("Passed!"); } + // This is not an official interop test, but it's useful. + private void RunBenchmarkEmptyUnary(TestServiceGrpc.ITestServiceClient client) + { + BenchmarkUtil.RunBenchmark(10000, 10000, + () => { client.EmptyCall(Empty.DefaultInstance);}); + } private Payload CreateZerosPayload(int size) { return Payload.CreateBuilder().SetBody(ByteString.CopyFrom(new byte[size])).Build(); diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 5f9f22cab10..18e0431e3b2 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -343,6 +343,18 @@ grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback, return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } +/* Synchronous unary call */ +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_call_blocking_unary(grpc_call *call, grpc_completion_queue *dedicated_cq, callback_funcptr callback, + const char *send_buffer, size_t send_buffer_len) { + GPR_ASSERT(grpcsharp_call_start_unary(call, callback, send_buffer, send_buffer_len) == GRPC_CALL_OK); + + /* TODO: we would like to use pluck, but we don't know the tag */ + GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) == GRPC_OP_COMPLETE); + grpc_completion_queue_shutdown(dedicated_cq); + GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) == GRPC_QUEUE_SHUTDOWN); +} + GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_client_streaming(grpc_call *call, callback_funcptr callback) { @@ -566,3 +578,16 @@ grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq, server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details), &(ctx->server_rpc_new.request_metadata), cq, ctx); } + + +/* For testing */ +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_test_callback(callback_funcptr callback) { + callback(GRPC_OP_OK, NULL); +} + +/* For testing */ +GPR_EXPORT void *GPR_CALLTYPE +grpcsharp_test_nop(void *ptr) { + return ptr; +}