Merge pull request #13521 from jtattermusch/csharp_requestcallcontext_pooling

C#: Reuse RequestCallContextSafeHandle objects by pooling them.
pull/13562/head^2
Jan Tattermusch 7 years ago committed by GitHub
commit 7a64e6964e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
  2. 28
      src/csharp/Grpc.Core/GrpcEnvironment.cs
  3. 8
      src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
  4. 2
      src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
  5. 3
      src/csharp/Grpc.Core/Internal/NativeMethods.cs
  6. 22
      src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs
  7. 3
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  8. 1
      src/csharp/Grpc.Core/Server.cs
  9. 4
      src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs
  10. 2
      src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs
  11. 13
      src/csharp/ext/grpc_csharp_ext.c

@ -40,7 +40,7 @@ namespace Grpc.Core.Internal.Tests
public void CreateAsyncAndShutdown()
{
var env = GrpcEnvironment.AddRef();
var cq = CompletionQueueSafeHandle.CreateAsync(new CompletionRegistry(env, () => BatchContextSafeHandle.Create()));
var cq = CompletionQueueSafeHandle.CreateAsync(new CompletionRegistry(env, () => BatchContextSafeHandle.Create(), () => RequestCallContextSafeHandle.Create()));
cq.Shutdown();
var ev = cq.Next();
cq.Dispose();

@ -35,6 +35,8 @@ namespace Grpc.Core
const int MinDefaultThreadPoolSize = 4;
const int DefaultBatchContextPoolSharedCapacity = 10000;
const int DefaultBatchContextPoolThreadLocalCapacity = 64;
const int DefaultRequestCallContextPoolSharedCapacity = 10000;
const int DefaultRequestCallContextPoolThreadLocalCapacity = 64;
static object staticLock = new object();
static GrpcEnvironment instance;
@ -44,12 +46,15 @@ namespace Grpc.Core
static bool inlineHandlers;
static int batchContextPoolSharedCapacity = DefaultBatchContextPoolSharedCapacity;
static int batchContextPoolThreadLocalCapacity = DefaultBatchContextPoolThreadLocalCapacity;
static int requestCallContextPoolSharedCapacity = DefaultRequestCallContextPoolSharedCapacity;
static int requestCallContextPoolThreadLocalCapacity = DefaultRequestCallContextPoolThreadLocalCapacity;
static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
static readonly HashSet<Server> registeredServers = new HashSet<Server>();
static ILogger logger = new LogLevelFilterLogger(new ConsoleLogger(), LogLevel.Off, true);
readonly IObjectPool<BatchContextSafeHandle> batchContextPool;
readonly IObjectPool<RequestCallContextSafeHandle> requestCallContextPool;
readonly GrpcThreadPool threadPool;
readonly DebugStats debugStats = new DebugStats();
readonly AtomicCounter cqPickerCounter = new AtomicCounter();
@ -262,6 +267,26 @@ namespace Grpc.Core
}
}
/// <summary>
/// Sets the parameters for a pool that caches request call context instances. Reusing request call context instances
/// instead of creating a new one for every requested call in C core helps reducing the GC pressure.
/// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
/// This is an advanced setting and you should only use it if you know what you are doing.
/// Most users should rely on the default value provided by gRPC library.
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.
/// </summary>
public static void SetRequestCallContextPoolParams(int sharedCapacity, int threadLocalCapacity)
{
lock (staticLock)
{
GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
GrpcPreconditions.CheckArgument(sharedCapacity >= 0, "Shared capacity needs to be a non-negative number");
GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0, "Thread local capacity needs to be a non-negative number");
requestCallContextPoolSharedCapacity = sharedCapacity;
requestCallContextPoolThreadLocalCapacity = threadLocalCapacity;
}
}
/// <summary>
/// Occurs when <c>GrpcEnvironment</c> is about the start the shutdown logic.
/// If <c>GrpcEnvironment</c> is later initialized and shutdown, the event will be fired again (unless unregistered first).
@ -275,6 +300,7 @@ namespace Grpc.Core
{
GrpcNativeInit();
batchContextPool = new DefaultObjectPool<BatchContextSafeHandle>(() => BatchContextSafeHandle.Create(this.batchContextPool), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity);
requestCallContextPool = new DefaultObjectPool<RequestCallContextSafeHandle>(() => RequestCallContextSafeHandle.Create(this.requestCallContextPool), requestCallContextPoolSharedCapacity, requestCallContextPoolThreadLocalCapacity);
threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers);
threadPool.Start();
}
@ -292,6 +318,8 @@ namespace Grpc.Core
internal IObjectPool<BatchContextSafeHandle> BatchContextPool => batchContextPool;
internal IObjectPool<RequestCallContextSafeHandle> RequestCallContextPool => requestCallContextPool;
internal bool IsAlive
{
get

@ -37,14 +37,16 @@ namespace Grpc.Core.Internal
readonly GrpcEnvironment environment;
readonly Func<BatchContextSafeHandle> batchContextFactory;
readonly Func<RequestCallContextSafeHandle> requestCallContextFactory;
readonly Dictionary<IntPtr, IOpCompletionCallback> dict = new Dictionary<IntPtr, IOpCompletionCallback>(new IntPtrComparer());
SpinLock spinLock = new SpinLock(Debugger.IsAttached);
IntPtr lastRegisteredKey; // only for testing
public CompletionRegistry(GrpcEnvironment environment, Func<BatchContextSafeHandle> batchContextFactory)
public CompletionRegistry(GrpcEnvironment environment, Func<BatchContextSafeHandle> batchContextFactory, Func<RequestCallContextSafeHandle> requestCallContextFactory)
{
this.environment = GrpcPreconditions.CheckNotNull(environment);
this.batchContextFactory = GrpcPreconditions.CheckNotNull(batchContextFactory);
this.requestCallContextFactory = GrpcPreconditions.CheckNotNull(requestCallContextFactory);
}
public void Register(IntPtr key, IOpCompletionCallback callback)
@ -73,10 +75,12 @@ namespace Grpc.Core.Internal
return ctx;
}
public void RegisterRequestCallCompletion(RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback)
public RequestCallContextSafeHandle RegisterRequestCallCompletion(RequestCallCompletionDelegate callback)
{
var ctx = requestCallContextFactory();
ctx.CompletionCallback = callback;
Register(ctx.Handle, ctx);
return ctx;
}
public IOpCompletionCallback Extract(IntPtr key)

@ -219,7 +219,7 @@ namespace Grpc.Core.Internal
var list = new List<CompletionQueueSafeHandle>();
for (int i = 0; i < completionQueueCount; i++)
{
var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease());
var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease(), () => environment.RequestCallContextPool.Lease());
list.Add(CompletionQueueSafeHandle.CreateAsync(completionRegistry));
}
return list.AsReadOnly();

@ -61,6 +61,7 @@ namespace Grpc.Core.Internal
public readonly Delegates.grpcsharp_request_call_context_host_delegate grpcsharp_request_call_context_host;
public readonly Delegates.grpcsharp_request_call_context_deadline_delegate grpcsharp_request_call_context_deadline;
public readonly Delegates.grpcsharp_request_call_context_request_metadata_delegate grpcsharp_request_call_context_request_metadata;
public readonly Delegates.grpcsharp_request_call_context_reset_delegate grpcsharp_request_call_context_reset;
public readonly Delegates.grpcsharp_request_call_context_destroy_delegate grpcsharp_request_call_context_destroy;
public readonly Delegates.grpcsharp_composite_call_credentials_create_delegate grpcsharp_composite_call_credentials_create;
@ -179,6 +180,7 @@ namespace Grpc.Core.Internal
this.grpcsharp_request_call_context_host = GetMethodDelegate<Delegates.grpcsharp_request_call_context_host_delegate>(library);
this.grpcsharp_request_call_context_deadline = GetMethodDelegate<Delegates.grpcsharp_request_call_context_deadline_delegate>(library);
this.grpcsharp_request_call_context_request_metadata = GetMethodDelegate<Delegates.grpcsharp_request_call_context_request_metadata_delegate>(library);
this.grpcsharp_request_call_context_reset = GetMethodDelegate<Delegates.grpcsharp_request_call_context_reset_delegate>(library);
this.grpcsharp_request_call_context_destroy = GetMethodDelegate<Delegates.grpcsharp_request_call_context_destroy_delegate>(library);
this.grpcsharp_composite_call_credentials_create = GetMethodDelegate<Delegates.grpcsharp_composite_call_credentials_create_delegate>(library);
@ -322,6 +324,7 @@ namespace Grpc.Core.Internal
public delegate IntPtr grpcsharp_request_call_context_host_delegate(RequestCallContextSafeHandle ctx, out UIntPtr hostLength);
public delegate Timespec grpcsharp_request_call_context_deadline_delegate(RequestCallContextSafeHandle ctx);
public delegate IntPtr grpcsharp_request_call_context_request_metadata_delegate(RequestCallContextSafeHandle ctx);
public delegate void grpcsharp_request_call_context_reset_delegate(RequestCallContextSafeHandle ctx);
public delegate void grpcsharp_request_call_context_destroy_delegate(IntPtr ctx);
public delegate CallCredentialsSafeHandle grpcsharp_composite_call_credentials_create_delegate(CallCredentialsSafeHandle creds1, CallCredentialsSafeHandle creds2);

@ -30,14 +30,17 @@ namespace Grpc.Core.Internal
{
static readonly NativeMethods Native = NativeMethods.Get();
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<RequestCallContextSafeHandle>();
IObjectPool<RequestCallContextSafeHandle> ownedByPool;
private RequestCallContextSafeHandle()
{
}
public static RequestCallContextSafeHandle Create()
public static RequestCallContextSafeHandle Create(IObjectPool<RequestCallContextSafeHandle> ownedByPool = null)
{
return Native.grpcsharp_request_call_context_create();
var ctx = Native.grpcsharp_request_call_context_create();
ctx.ownedByPool = ownedByPool;
return ctx;
}
public IntPtr Handle
@ -71,6 +74,19 @@ namespace Grpc.Core.Internal
return new ServerRpcNew(server, call, method, host, deadline, metadata);
}
public void Recycle()
{
if (ownedByPool != null)
{
Native.grpcsharp_request_call_context_reset(this);
ownedByPool.Return(this);
}
else
{
Dispose();
}
}
protected override bool ReleaseHandle()
{
Native.grpcsharp_request_call_context_destroy(handle);
@ -90,7 +106,7 @@ namespace Grpc.Core.Internal
finally
{
CompletionCallback = null;
Dispose();
Recycle();
}
}
}

@ -75,8 +75,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
var ctx = RequestCallContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterRequestCallCompletion(ctx, callback);
var ctx = completionQueue.CompletionRegistry.RegisterRequestCallCompletion(callback);
Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk();
}
}

@ -300,6 +300,7 @@ namespace Grpc.Core
{
if (!shutdownRequested)
{
// TODO(jtattermusch): avoid unnecessary delegate allocation
handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq);
}
}

@ -43,7 +43,7 @@ namespace Grpc.Microbenchmarks
public void Run(int threadCount, int iterations, bool useSharedRegistry)
{
Console.WriteLine(string.Format("CompletionRegistryBenchmark: threads={0}, iterations={1}, useSharedRegistry={2}", threadCount, iterations, useSharedRegistry));
CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create()) : null;
CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create(), () => RequestCallContextSafeHandle.Create()) : null;
var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, sharedRegistry));
threadedBenchmark.Run();
// TODO: parametrize by number of pending completions
@ -51,7 +51,7 @@ namespace Grpc.Microbenchmarks
private void ThreadBody(int iterations, CompletionRegistry optionalSharedRegistry)
{
var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create());
var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment, () => throw new NotImplementedException(), () => throw new NotImplementedException());
var ctx = BatchContextSafeHandle.Create();
var stopwatch = Stopwatch.StartNew();

@ -52,7 +52,7 @@ namespace Grpc.Microbenchmarks
private void ThreadBody(int iterations, int payloadSize)
{
var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease());
var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease(), () => throw new NotImplementedException());
var cq = CompletionQueueSafeHandle.CreateAsync(completionRegistry);
var call = CreateFakeCall(cq);

@ -226,17 +226,22 @@ grpcsharp_batch_context_destroy(grpcsharp_batch_context* ctx) {
}
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_request_call_context_destroy(grpcsharp_request_call_context* ctx) {
if (!ctx) {
return;
}
grpcsharp_request_call_context_reset(grpcsharp_request_call_context* ctx) {
/* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is
supposed
to take its ownership. */
grpc_call_details_destroy(&(ctx->call_details));
grpcsharp_metadata_array_destroy_metadata_only(&(ctx->request_metadata));
memset(ctx, 0, sizeof(grpcsharp_request_call_context));
}
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_request_call_context_destroy(grpcsharp_request_call_context* ctx) {
if (!ctx) {
return;
}
grpcsharp_request_call_context_reset(ctx);
gpr_free(ctx);
}

Loading…
Cancel
Save