Merge pull request #13444 from jtattermusch/avoid_delegate_alloc_completion_registry

Avoid unnecessary delegate alloc in completion registry
pull/13462/head
Jan Tattermusch 7 years ago committed by GitHub
commit f70ecdb640
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      src/csharp/Grpc.Core.Tests/PInvokeTest.cs
  2. 28
      src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
  3. 58
      src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
  4. 8
      src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
  5. 4
      src/csharp/Grpc.Core/Internal/NativeMethods.cs
  6. 23
      src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs
  7. 12
      src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs
  8. 2
      src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs

@ -63,7 +63,7 @@ namespace Grpc.Core.Tests
[Ignore("Prevent running on Jenkins")]
public void NativeCallbackBenchmark()
{
OpCompletionDelegate handler = Handler;
NativeCallbackTestDelegate handler = Handler;
counter = 0;
BenchmarkUtil.RunBenchmark(
@ -91,7 +91,7 @@ namespace Grpc.Core.Tests
10000, 10000,
() =>
{
Native.grpcsharp_test_callback(new OpCompletionDelegate(Handler));
Native.grpcsharp_test_callback(new NativeCallbackTestDelegate(Handler));
});
Assert.AreNotEqual(0, counter);
}

@ -20,15 +20,22 @@ using System;
using System.Runtime.InteropServices;
using System.Text;
using Grpc.Core;
using Grpc.Core.Logging;
namespace Grpc.Core.Internal
{
internal interface IOpCompletionCallback
{
void OnComplete(bool success);
}
/// <summary>
/// grpcsharp_batch_context
/// </summary>
internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid
internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback
{
static readonly NativeMethods Native = NativeMethods.Get();
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<BatchContextSafeHandle>();
private BatchContextSafeHandle()
{
@ -47,6 +54,8 @@ namespace Grpc.Core.Internal
}
}
public BatchCompletionDelegate CompletionCallback { get; set; }
// Gets data of recv_initial_metadata completion.
public Metadata GetReceivedInitialMetadata()
{
@ -92,5 +101,22 @@ namespace Grpc.Core.Internal
Native.grpcsharp_batch_context_destroy(handle);
return true;
}
void IOpCompletionCallback.OnComplete(bool success)
{
try
{
CompletionCallback(success, this);
}
catch (Exception e)
{
Logger.Error(e, "Exception occured while invoking batch completion delegate.");
}
finally
{
CompletionCallback = null;
Dispose();
}
}
}
}

@ -25,8 +25,6 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
internal delegate void OpCompletionDelegate(bool success);
internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx);
internal delegate void RequestCallCompletionDelegate(bool success, RequestCallContextSafeHandle ctx);
@ -36,7 +34,7 @@ namespace Grpc.Core.Internal
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<CompletionRegistry>();
readonly GrpcEnvironment environment;
readonly Dictionary<IntPtr, OpCompletionDelegate> dict = new Dictionary<IntPtr, OpCompletionDelegate>(new IntPtrComparer());
readonly Dictionary<IntPtr, IOpCompletionCallback> dict = new Dictionary<IntPtr, IOpCompletionCallback>(new IntPtrComparer());
readonly object myLock = new object();
IntPtr lastRegisteredKey; // only for testing
@ -45,7 +43,7 @@ namespace Grpc.Core.Internal
this.environment = environment;
}
public void Register(IntPtr key, OpCompletionDelegate callback)
public void Register(IntPtr key, IOpCompletionCallback callback)
{
environment.DebugStats.PendingBatchCompletions.Increment();
lock (myLock)
@ -57,21 +55,19 @@ namespace Grpc.Core.Internal
public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
{
// TODO(jtattermusch): get rid of new delegate creation here
OpCompletionDelegate opCallback = ((success) => HandleBatchCompletion(success, ctx, callback));
Register(ctx.Handle, opCallback);
ctx.CompletionCallback = callback;
Register(ctx.Handle, ctx);
}
public void RegisterRequestCallCompletion(RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback)
{
// TODO(jtattermusch): get rid of new delegate creation here
OpCompletionDelegate opCallback = ((success) => HandleRequestCallCompletion(success, ctx, callback));
Register(ctx.Handle, opCallback);
ctx.CompletionCallback = callback;
Register(ctx.Handle, ctx);
}
public OpCompletionDelegate Extract(IntPtr key)
public IOpCompletionCallback Extract(IntPtr key)
{
OpCompletionDelegate value = null;
IOpCompletionCallback value = null;
lock (myLock)
{
value = dict[key];
@ -89,44 +85,6 @@ namespace Grpc.Core.Internal
get { return this.lastRegisteredKey; }
}
private static void HandleBatchCompletion(bool success, BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
{
try
{
callback(success, ctx);
}
catch (Exception e)
{
Logger.Error(e, "Exception occured while invoking batch completion delegate.");
}
finally
{
if (ctx != null)
{
ctx.Dispose();
}
}
}
private static void HandleRequestCallCompletion(bool success, RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback)
{
try
{
callback(success, ctx);
}
catch (Exception e)
{
Logger.Error(e, "Exception occured while invoking request call completion delegate.");
}
finally
{
if (ctx != null)
{
ctx.Dispose();
}
}
}
/// <summary>
/// IntPtr doesn't implement <c>IEquatable{IntPtr}</c> so we need to use custom comparer to avoid boxing.
/// </summary>

@ -68,8 +68,8 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
"Thread pool size cannot be smaller than the number of completion queues used.");
this.runCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true));
this.runCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false));
this.runCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((IOpCompletionCallback) callback, true));
this.runCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((IOpCompletionCallback) callback, false));
}
public void Start()
@ -225,11 +225,11 @@ namespace Grpc.Core.Internal
return list.AsReadOnly();
}
private void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success)
private void RunCompletionQueueEventCallback(IOpCompletionCallback callback, bool success)
{
try
{
callback(success);
callback.OnComplete(success);
}
catch (Exception e)
{

@ -29,6 +29,8 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
internal delegate void NativeCallbackTestDelegate(bool success);
/// <summary>
/// Provides access to all native methods provided by <c>NativeExtension</c>.
/// An extra level of indirection is added to P/Invoke calls to allow intelligent loading
@ -420,7 +422,7 @@ namespace Grpc.Core.Internal
public delegate Timespec gprsharp_convert_clock_type_delegate(Timespec t, ClockType targetClock);
public delegate int gprsharp_sizeof_timespec_delegate();
public delegate CallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
public delegate CallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] NativeCallbackTestDelegate callback);
public delegate IntPtr grpcsharp_test_nop_delegate(IntPtr ptr);
public delegate void grpcsharp_test_override_method_delegate(string methodName, string variant);
}

@ -19,15 +19,17 @@
using System;
using System.Runtime.InteropServices;
using Grpc.Core;
using Grpc.Core.Logging;
namespace Grpc.Core.Internal
{
/// <summary>
/// grpcsharp_request_call_context
/// </summary>
internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid
internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback
{
static readonly NativeMethods Native = NativeMethods.Get();
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<RequestCallContextSafeHandle>();
private RequestCallContextSafeHandle()
{
@ -46,6 +48,8 @@ namespace Grpc.Core.Internal
}
}
public RequestCallCompletionDelegate CompletionCallback { get; set; }
// Gets data of server_rpc_new completion.
public ServerRpcNew GetServerRpcNew(Server server)
{
@ -72,5 +76,22 @@ namespace Grpc.Core.Internal
Native.grpcsharp_request_call_context_destroy(handle);
return true;
}
void IOpCompletionCallback.OnComplete(bool success)
{
try
{
CompletionCallback(success, this);
}
catch (Exception e)
{
Logger.Error(e, "Exception occured while invoking request call completion delegate.");
}
finally
{
CompletionCallback = null;
Dispose();
}
}
}
}

@ -52,18 +52,26 @@ namespace Grpc.Microbenchmarks
{
var completionRegistry = new CompletionRegistry(environment);
var ctx = BatchContextSafeHandle.Create();
var completionDelegate = new OpCompletionDelegate((success) => {});
var stopwatch = Stopwatch.StartNew();
for (int i = 0; i < iterations; i++)
{
completionRegistry.Register(ctx.DangerousGetHandle(), completionDelegate);
completionRegistry.Register(ctx.Handle, ctx);
var callback = completionRegistry.Extract(completionRegistry.LastRegisteredKey);
// NOTE: we are not calling the callback to avoid disposing ctx.
}
stopwatch.Stop();
Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds);
ctx.Dispose();
}
private class NopCompletionCallback : IOpCompletionCallback
{
public void OnComplete(bool success)
{
}
}
}
}

@ -68,7 +68,7 @@ namespace Grpc.Microbenchmarks
{
call.StartSendMessage(sendCompletionHandler, payload, writeFlags, false);
var callback = completionRegistry.Extract(completionRegistry.LastRegisteredKey);
callback(true);
callback.OnComplete(true);
}
stopwatch.Stop();
Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds);

Loading…
Cancel
Save