diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs index 7529c44c4e6..43f816bb1c6 100644 --- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs +++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs @@ -63,7 +63,7 @@ namespace Grpc.Core.Tests [Ignore("Prevent running on Jenkins")] public void NativeCallbackBenchmark() { - OpCompletionDelegate handler = Handler; + NativeCallbackTestDelegate handler = Handler; counter = 0; BenchmarkUtil.RunBenchmark( @@ -91,7 +91,7 @@ namespace Grpc.Core.Tests 10000, 10000, () => { - Native.grpcsharp_test_callback(new OpCompletionDelegate(Handler)); + Native.grpcsharp_test_callback(new NativeCallbackTestDelegate(Handler)); }); Assert.AreNotEqual(0, counter); } diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index cd5e3d8911a..d839413a808 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -20,15 +20,22 @@ using System; using System.Runtime.InteropServices; using System.Text; using Grpc.Core; +using Grpc.Core.Logging; namespace Grpc.Core.Internal { + internal interface IOpCompletionCallback + { + void OnComplete(bool success); + } + /// /// grpcsharp_batch_context /// - internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid + internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback { static readonly NativeMethods Native = NativeMethods.Get(); + static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); private BatchContextSafeHandle() { @@ -47,6 +54,8 @@ namespace Grpc.Core.Internal } } + public BatchCompletionDelegate CompletionCallback { get; set; } + // Gets data of recv_initial_metadata completion. public Metadata GetReceivedInitialMetadata() { @@ -92,5 +101,22 @@ namespace Grpc.Core.Internal Native.grpcsharp_batch_context_destroy(handle); return true; } + + void IOpCompletionCallback.OnComplete(bool success) + { + try + { + CompletionCallback(success, this); + } + catch (Exception e) + { + Logger.Error(e, "Exception occured while invoking batch completion delegate."); + } + finally + { + CompletionCallback = null; + Dispose(); + } + } } } diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs index 1102c8d14fb..33734864bdd 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs @@ -25,8 +25,6 @@ using Grpc.Core.Utils; namespace Grpc.Core.Internal { - internal delegate void OpCompletionDelegate(bool success); - internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx); internal delegate void RequestCallCompletionDelegate(bool success, RequestCallContextSafeHandle ctx); @@ -36,7 +34,7 @@ namespace Grpc.Core.Internal static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); readonly GrpcEnvironment environment; - readonly Dictionary dict = new Dictionary(new IntPtrComparer()); + readonly Dictionary dict = new Dictionary(new IntPtrComparer()); readonly object myLock = new object(); IntPtr lastRegisteredKey; // only for testing @@ -45,7 +43,7 @@ namespace Grpc.Core.Internal this.environment = environment; } - public void Register(IntPtr key, OpCompletionDelegate callback) + public void Register(IntPtr key, IOpCompletionCallback callback) { environment.DebugStats.PendingBatchCompletions.Increment(); lock (myLock) @@ -57,21 +55,19 @@ namespace Grpc.Core.Internal public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback) { - // TODO(jtattermusch): get rid of new delegate creation here - OpCompletionDelegate opCallback = ((success) => HandleBatchCompletion(success, ctx, callback)); - Register(ctx.Handle, opCallback); + ctx.CompletionCallback = callback; + Register(ctx.Handle, ctx); } public void RegisterRequestCallCompletion(RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback) { - // TODO(jtattermusch): get rid of new delegate creation here - OpCompletionDelegate opCallback = ((success) => HandleRequestCallCompletion(success, ctx, callback)); - Register(ctx.Handle, opCallback); + ctx.CompletionCallback = callback; + Register(ctx.Handle, ctx); } - public OpCompletionDelegate Extract(IntPtr key) + public IOpCompletionCallback Extract(IntPtr key) { - OpCompletionDelegate value = null; + IOpCompletionCallback value = null; lock (myLock) { value = dict[key]; @@ -89,44 +85,6 @@ namespace Grpc.Core.Internal get { return this.lastRegisteredKey; } } - private static void HandleBatchCompletion(bool success, BatchContextSafeHandle ctx, BatchCompletionDelegate callback) - { - try - { - callback(success, ctx); - } - catch (Exception e) - { - Logger.Error(e, "Exception occured while invoking batch completion delegate."); - } - finally - { - if (ctx != null) - { - ctx.Dispose(); - } - } - } - - private static void HandleRequestCallCompletion(bool success, RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback) - { - try - { - callback(success, ctx); - } - catch (Exception e) - { - Logger.Error(e, "Exception occured while invoking request call completion delegate."); - } - finally - { - if (ctx != null) - { - ctx.Dispose(); - } - } - } - /// /// IntPtr doesn't implement IEquatable{IntPtr} so we need to use custom comparer to avoid boxing. /// diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index f7f723c00bb..bd0229a9dd4 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -68,8 +68,8 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount, "Thread pool size cannot be smaller than the number of completion queues used."); - this.runCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true)); - this.runCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false)); + this.runCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((IOpCompletionCallback) callback, true)); + this.runCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((IOpCompletionCallback) callback, false)); } public void Start() @@ -225,11 +225,11 @@ namespace Grpc.Core.Internal return list.AsReadOnly(); } - private void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success) + private void RunCompletionQueueEventCallback(IOpCompletionCallback callback, bool success) { try { - callback(success); + callback.OnComplete(success); } catch (Exception e) { diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index 22faa19d9bf..d517252cfe6 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -29,6 +29,8 @@ using Grpc.Core.Utils; namespace Grpc.Core.Internal { + internal delegate void NativeCallbackTestDelegate(bool success); + /// /// Provides access to all native methods provided by NativeExtension. /// An extra level of indirection is added to P/Invoke calls to allow intelligent loading @@ -420,7 +422,7 @@ namespace Grpc.Core.Internal public delegate Timespec gprsharp_convert_clock_type_delegate(Timespec t, ClockType targetClock); public delegate int gprsharp_sizeof_timespec_delegate(); - public delegate CallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback); + public delegate CallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] NativeCallbackTestDelegate callback); public delegate IntPtr grpcsharp_test_nop_delegate(IntPtr ptr); public delegate void grpcsharp_test_override_method_delegate(string methodName, string variant); } diff --git a/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs index b7af0c102d2..09f5c3e4526 100644 --- a/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs @@ -19,15 +19,17 @@ using System; using System.Runtime.InteropServices; using Grpc.Core; +using Grpc.Core.Logging; namespace Grpc.Core.Internal { /// /// grpcsharp_request_call_context /// - internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid + internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback { static readonly NativeMethods Native = NativeMethods.Get(); + static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); private RequestCallContextSafeHandle() { @@ -46,6 +48,8 @@ namespace Grpc.Core.Internal } } + public RequestCallCompletionDelegate CompletionCallback { get; set; } + // Gets data of server_rpc_new completion. public ServerRpcNew GetServerRpcNew(Server server) { @@ -72,5 +76,22 @@ namespace Grpc.Core.Internal Native.grpcsharp_request_call_context_destroy(handle); return true; } + + void IOpCompletionCallback.OnComplete(bool success) + { + try + { + CompletionCallback(success, this); + } + catch (Exception e) + { + Logger.Error(e, "Exception occured while invoking request call completion delegate."); + } + finally + { + CompletionCallback = null; + Dispose(); + } + } } } diff --git a/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs index 810de119a8b..0e2fb070f19 100644 --- a/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs +++ b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs @@ -52,18 +52,26 @@ namespace Grpc.Microbenchmarks { var completionRegistry = new CompletionRegistry(environment); var ctx = BatchContextSafeHandle.Create(); - var completionDelegate = new OpCompletionDelegate((success) => {}); var stopwatch = Stopwatch.StartNew(); for (int i = 0; i < iterations; i++) { - completionRegistry.Register(ctx.DangerousGetHandle(), completionDelegate); + completionRegistry.Register(ctx.Handle, ctx); var callback = completionRegistry.Extract(completionRegistry.LastRegisteredKey); + // NOTE: we are not calling the callback to avoid disposing ctx. } stopwatch.Stop(); Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds); ctx.Dispose(); } + + private class NopCompletionCallback : IOpCompletionCallback + { + public void OnComplete(bool success) + { + + } + } } } diff --git a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs index de67874580d..6a8a29d7701 100644 --- a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs +++ b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs @@ -68,7 +68,7 @@ namespace Grpc.Microbenchmarks { call.StartSendMessage(sendCompletionHandler, payload, writeFlags, false); var callback = completionRegistry.Extract(completionRegistry.LastRegisteredKey); - callback(true); + callback.OnComplete(true); } stopwatch.Stop(); Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds);