Merge pull request #13476 from jtattermusch/csharp_batchcontext_pooling

Reuse BatchContextSafeHandle objects by pooling them (take two)
pull/13567/head^2
Jan Tattermusch 7 years ago committed by GitHub
commit ba89ad4065
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
  2. 79
      src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs
  3. 33
      src/csharp/Grpc.Core/GrpcEnvironment.cs
  4. 25
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  5. 22
      src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
  6. 33
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  7. 3
      src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
  8. 10
      src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
  9. 196
      src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs
  10. 2
      src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
  11. 35
      src/csharp/Grpc.Core/Internal/IObjectPool.cs
  12. 3
      src/csharp/Grpc.Core/Internal/NativeMethods.cs
  13. 3
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  14. 6
      src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs
  15. 5
      src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs
  16. 13
      src/csharp/ext/grpc_csharp_ext.c
  17. 1
      src/csharp/tests.json

@ -40,7 +40,7 @@ namespace Grpc.Core.Internal.Tests
public void CreateAsyncAndShutdown()
{
var env = GrpcEnvironment.AddRef();
var cq = CompletionQueueSafeHandle.CreateAsync(new CompletionRegistry(env));
var cq = CompletionQueueSafeHandle.CreateAsync(new CompletionRegistry(env, () => BatchContextSafeHandle.Create()));
cq.Shutdown();
var ev = cq.Next();
cq.Dispose();

@ -0,0 +1,79 @@
#region Copyright notice and license
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
public class DefaultObjectPoolTest
{
[Test]
[TestCase(10, 2)]
[TestCase(10, 1)]
[TestCase(0, 2)]
[TestCase(2, 0)]
public void ObjectIsReused(int sharedCapacity, int threadLocalCapacity)
{
var pool = new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), sharedCapacity, threadLocalCapacity);
var origLeased = pool.Lease();
pool.Return(origLeased);
Assert.AreSame(origLeased, pool.Lease());
Assert.AreNotSame(origLeased, pool.Lease());
}
[Test]
public void ZeroCapacities()
{
var pool = new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 0, 0);
var origLeased = pool.Lease();
pool.Return(origLeased);
Assert.AreNotSame(origLeased, pool.Lease());
}
[Test]
public void DisposeCleansSharedPool()
{
var pool = new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 10, 0);
var origLeased = pool.Lease();
pool.Return(origLeased);
pool.Dispose();
Assert.AreNotSame(origLeased, pool.Lease());
}
[Test]
public void Constructor()
{
Assert.Throws<ArgumentNullException>(() => new DefaultObjectPool<TestPooledObject>(null, 10, 2));
Assert.Throws<ArgumentException>(() => new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), -1, 10));
Assert.Throws<ArgumentException>(() => new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 10, -1));
}
class TestPooledObject : IDisposable
{
public void Dispose()
{
}
}
}
}

@ -33,6 +33,8 @@ namespace Grpc.Core
public class GrpcEnvironment
{
const int MinDefaultThreadPoolSize = 4;
const int DefaultBatchContextPoolSharedCapacity = 10000;
const int DefaultBatchContextPoolThreadLocalCapacity = 64;
static object staticLock = new object();
static GrpcEnvironment instance;
@ -40,11 +42,14 @@ namespace Grpc.Core
static int? customThreadPoolSize;
static int? customCompletionQueueCount;
static bool inlineHandlers;
static int batchContextPoolSharedCapacity = DefaultBatchContextPoolSharedCapacity;
static int batchContextPoolThreadLocalCapacity = DefaultBatchContextPoolThreadLocalCapacity;
static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
static readonly HashSet<Server> registeredServers = new HashSet<Server>();
static ILogger logger = new LogLevelFilterLogger(new ConsoleLogger(), LogLevel.Off, true);
readonly IObjectPool<BatchContextSafeHandle> batchContextPool;
readonly GrpcThreadPool threadPool;
readonly DebugStats debugStats = new DebugStats();
readonly AtomicCounter cqPickerCounter = new AtomicCounter();
@ -186,7 +191,7 @@ namespace Grpc.Core
/// <summary>
/// Sets the number of threads in the gRPC thread pool that polls for internal RPC events.
/// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
/// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
/// Setting thread pool size is an advanced setting and you should only use it if you know what you are doing.
/// Most users should rely on the default value provided by gRPC library.
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.
@ -203,7 +208,7 @@ namespace Grpc.Core
/// <summary>
/// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events.
/// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
/// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
/// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing.
/// Most users should rely on the default value provided by gRPC library.
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.
@ -237,6 +242,26 @@ namespace Grpc.Core
}
}
/// <summary>
/// Sets the parameters for a pool that caches batch context instances. Reusing batch context instances
/// instead of creating a new one for every C core operation helps reducing the GC pressure.
/// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
/// This is an advanced setting and you should only use it if you know what you are doing.
/// Most users should rely on the default value provided by gRPC library.
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.
/// </summary>
public static void SetBatchContextPoolParams(int sharedCapacity, int threadLocalCapacity)
{
lock (staticLock)
{
GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
GrpcPreconditions.CheckArgument(sharedCapacity >= 0, "Shared capacity needs to be a non-negative number");
GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0, "Thread local capacity needs to be a non-negative number");
batchContextPoolSharedCapacity = sharedCapacity;
batchContextPoolThreadLocalCapacity = threadLocalCapacity;
}
}
/// <summary>
/// Occurs when <c>GrpcEnvironment</c> is about the start the shutdown logic.
/// If <c>GrpcEnvironment</c> is later initialized and shutdown, the event will be fired again (unless unregistered first).
@ -249,6 +274,7 @@ namespace Grpc.Core
private GrpcEnvironment()
{
GrpcNativeInit();
batchContextPool = new DefaultObjectPool<BatchContextSafeHandle>(() => BatchContextSafeHandle.Create(this.batchContextPool), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity);
threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers);
threadPool.Start();
}
@ -264,6 +290,8 @@ namespace Grpc.Core
}
}
internal IObjectPool<BatchContextSafeHandle> BatchContextPool => batchContextPool;
internal bool IsAlive
{
get
@ -325,6 +353,7 @@ namespace Grpc.Core
await Task.Run(() => ShuttingDown?.Invoke(this, null)).ConfigureAwait(false);
await threadPool.StopAsync().ConfigureAwait(false);
batchContextPool.Dispose();
GrpcNativeShutdown();
isShutdown = true;

@ -92,23 +92,28 @@ namespace Grpc.Core.Internal
}
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
using (var ctx = BatchContextSafeHandle.Create())
{
call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
var ev = cq.Pluck(ctx.Handle);
bool success = (ev.success != 0);
var ctx = details.Channel.Environment.BatchContextPool.Lease();
try
{
using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch"))
call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
var ev = cq.Pluck(ctx.Handle);
bool success = (ev.success != 0);
try
{
using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch"))
{
HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
}
}
catch (Exception e)
{
HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
Logger.Error(e, "Exception occured while invoking completion delegate.");
}
}
catch (Exception e)
finally
{
Logger.Error(e, "Exception occured while invoking completion delegate.");
ctx.Recycle();
}
}

@ -38,15 +38,18 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<BatchContextSafeHandle>();
IObjectPool<BatchContextSafeHandle> ownedByPool;
CompletionCallbackData completionCallbackData;
private BatchContextSafeHandle()
{
}
public static BatchContextSafeHandle Create()
public static BatchContextSafeHandle Create(IObjectPool<BatchContextSafeHandle> ownedByPool = null)
{
return Native.grpcsharp_batch_context_create();
var ctx = Native.grpcsharp_batch_context_create();
ctx.ownedByPool = ownedByPool;
return ctx;
}
public IntPtr Handle
@ -104,6 +107,19 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0;
}
public void Recycle()
{
if (ownedByPool != null)
{
Native.grpcsharp_batch_context_reset(this);
ownedByPool.Return(this);
}
else
{
Dispose();
}
}
protected override bool ReleaseHandle()
{
Native.grpcsharp_batch_context_destroy(handle);
@ -123,7 +139,7 @@ namespace Grpc.Core.Internal
finally
{
completionCallbackData = default(CompletionCallbackData);
Dispose();
Recycle();
}
}

@ -70,8 +70,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback);
var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IUnaryResponseClientCallback, callback);
Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags)
.CheckOk();
}
@ -87,8 +86,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback);
var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IUnaryResponseClientCallback, callback);
Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray, callFlags).CheckOk();
}
}
@ -97,8 +95,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback);
var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedStatusOnClientCallback, callback);
Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags).CheckOk();
}
}
@ -107,8 +104,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback);
var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedStatusOnClientCallback, callback);
Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, callFlags).CheckOk();
}
}
@ -117,8 +113,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback);
var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback);
Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata ? 1 : 0).CheckOk();
}
}
@ -127,8 +122,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback);
var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback);
Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
}
@ -138,9 +132,8 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendStatusFromServerCompletionCallback, callback);
var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendStatusFromServerCompletionCallback, callback);
var statusDetailBytes = MarshalUtils.GetBytesUTF8(status.Detail);
Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, statusDetailBytes, new UIntPtr((ulong)statusDetailBytes.Length), metadataArray, sendEmptyInitialMetadata ? 1 : 0,
optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
@ -151,8 +144,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedMessageCallback, callback);
var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedMessageCallback, callback);
Native.grpcsharp_call_recv_message(this, ctx).CheckOk();
}
}
@ -161,8 +153,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedResponseHeadersCallback, callback);
var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedResponseHeadersCallback, callback);
Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
}
}
@ -171,8 +162,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedCloseOnServerCallback, callback);
var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedCloseOnServerCallback, callback);
Native.grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
}
@ -181,8 +171,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback);
var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback);
Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
}
}

@ -66,8 +66,7 @@ namespace Grpc.Core.Internal
public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback, object callbackState)
{
var ctx = BatchContextSafeHandle.Create();
cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback, callbackState);
var ctx = cq.CompletionRegistry.RegisterBatchCompletion(callback, callbackState);
Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx);
}

@ -36,13 +36,15 @@ namespace Grpc.Core.Internal
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<CompletionRegistry>();
readonly GrpcEnvironment environment;
readonly Func<BatchContextSafeHandle> batchContextFactory;
readonly Dictionary<IntPtr, IOpCompletionCallback> dict = new Dictionary<IntPtr, IOpCompletionCallback>(new IntPtrComparer());
SpinLock spinLock = new SpinLock(Debugger.IsAttached);
IntPtr lastRegisteredKey; // only for testing
public CompletionRegistry(GrpcEnvironment environment)
public CompletionRegistry(GrpcEnvironment environment, Func<BatchContextSafeHandle> batchContextFactory)
{
this.environment = environment;
this.environment = GrpcPreconditions.CheckNotNull(environment);
this.batchContextFactory = GrpcPreconditions.CheckNotNull(batchContextFactory);
}
public void Register(IntPtr key, IOpCompletionCallback callback)
@ -63,10 +65,12 @@ namespace Grpc.Core.Internal
}
}
public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback, object state)
public BatchContextSafeHandle RegisterBatchCompletion(BatchCompletionDelegate callback, object state)
{
var ctx = batchContextFactory();
ctx.SetCompletionCallback(callback, state);
Register(ctx.Handle, ctx);
return ctx;
}
public void RegisterRequestCallCompletion(RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback)

@ -0,0 +1,196 @@
#region Copyright notice and license
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Threading;
using System.Collections.Generic;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// Pool of objects that combines a shared pool and a thread local pool.
/// </summary>
internal class DefaultObjectPool<T> : IObjectPool<T>
where T : class, IDisposable
{
readonly object myLock = new object();
readonly Func<T> itemFactory;
// Queue shared between threads, access needs to be synchronized.
readonly Queue<T> sharedQueue;
readonly int sharedCapacity;
readonly ThreadLocal<ThreadLocalData> threadLocalData;
readonly int threadLocalCapacity;
readonly int rentLimit;
bool disposed;
/// <summary>
/// Initializes a new instance of <c>DefaultObjectPool</c> with given shared capacity and thread local capacity.
/// Thread local capacity should be significantly smaller than the shared capacity as we don't guarantee immediately
/// disposing the objects in the thread local pool after this pool is disposed (they will eventually be garbage collected
/// after the thread that owns them has finished).
/// On average, the shared pool will only be accessed approx. once for every <c>threadLocalCapacity / 2</c> rent or lease
/// operations.
/// </summary>
public DefaultObjectPool(Func<T> itemFactory, int sharedCapacity, int threadLocalCapacity)
{
GrpcPreconditions.CheckArgument(sharedCapacity >= 0);
GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0);
this.itemFactory = GrpcPreconditions.CheckNotNull(itemFactory, nameof(itemFactory));
this.sharedQueue = new Queue<T>(sharedCapacity);
this.sharedCapacity = sharedCapacity;
this.threadLocalData = new ThreadLocal<ThreadLocalData>(() => new ThreadLocalData(threadLocalCapacity), false);
this.threadLocalCapacity = threadLocalCapacity;
this.rentLimit = threadLocalCapacity != 1 ? threadLocalCapacity / 2 : 1;
}
/// <summary>
/// Leases an item from the pool or creates a new instance if the pool is empty.
/// Attempts to retrieve the item from the thread local pool first.
/// If the thread local pool is empty, the item is taken from the shared pool
/// along with more items that are moved to the thread local pool to avoid
/// prevent acquiring the lock for shared pool too often.
/// The methods should not be called after the pool is disposed, but it won't
/// results in an error to do so (after depleting the items potentially left
/// in the thread local pool, it will continue returning new objects created by the factory).
/// </summary>
public T Lease()
{
var localData = threadLocalData.Value;
if (localData.Queue.Count > 0)
{
return localData.Queue.Dequeue();
}
if (localData.CreateBudget > 0)
{
localData.CreateBudget --;
return itemFactory();
}
int itemsMoved = 0;
T leasedItem = null;
lock(myLock)
{
if (sharedQueue.Count > 0)
{
leasedItem = sharedQueue.Dequeue();
}
while (sharedQueue.Count > 0 && itemsMoved < rentLimit)
{
localData.Queue.Enqueue(sharedQueue.Dequeue());
itemsMoved ++;
}
}
// If the shared pool didn't contain all rentLimit items,
// next time we try to lease we will just create those
// instead of trying to grab them from the shared queue.
// This is to guarantee we won't be accessing the shared queue too often.
localData.CreateBudget = rentLimit - itemsMoved;
return leasedItem ?? itemFactory();
}
/// <summary>
/// Returns an item to the pool.
/// Attempts to add the item to the thread local pool first.
/// If the thread local pool is full, item is added to a shared pool,
/// along with half of the items for the thread local pool, which
/// should prevent acquiring the lock for shared pool too often.
/// If called after the pool is disposed, we make best effort not to
/// add anything to the thread local pool and we guarantee not to add
/// anything to the shared pool (items will be disposed instead).
/// </summary>
public void Return(T item)
{
GrpcPreconditions.CheckNotNull(item);
var localData = threadLocalData.Value;
if (localData.Queue.Count < threadLocalCapacity && !disposed)
{
localData.Queue.Enqueue(item);
return;
}
if (localData.DisposeBudget > 0)
{
localData.DisposeBudget --;
item.Dispose();
return;
}
int itemsReturned = 0;
int returnLimit = rentLimit + 1;
lock (myLock)
{
if (sharedQueue.Count < sharedCapacity && !disposed)
{
sharedQueue.Enqueue(item);
itemsReturned ++;
}
while (sharedQueue.Count < sharedCapacity && itemsReturned < returnLimit && !disposed)
{
sharedQueue.Enqueue(localData.Queue.Dequeue());
itemsReturned ++;
}
}
// If the shared pool could not accomodate all returnLimit items,
// next time we try to return we will just dispose the item
// instead of trying to return them to the shared queue.
// This is to guarantee we won't be accessing the shared queue too often.
localData.DisposeBudget = returnLimit - itemsReturned;
if (itemsReturned == 0)
{
localData.DisposeBudget --;
item.Dispose();
}
}
public void Dispose()
{
lock (myLock)
{
if (!disposed)
{
disposed = true;
while (sharedQueue.Count > 0)
{
sharedQueue.Dequeue().Dispose();
}
}
}
}
class ThreadLocalData
{
public ThreadLocalData(int capacity)
{
this.Queue = new Queue<T>(capacity);
}
public Queue<T> Queue { get; }
public int CreateBudget { get; set; }
public int DisposeBudget { get; set; }
}
}
}

@ -219,7 +219,7 @@ namespace Grpc.Core.Internal
var list = new List<CompletionQueueSafeHandle>();
for (int i = 0; i < completionQueueCount; i++)
{
var completionRegistry = new CompletionRegistry(environment);
var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease());
list.Add(CompletionQueueSafeHandle.CreateAsync(completionRegistry));
}
return list.AsReadOnly();

@ -0,0 +1,35 @@
#region Copyright notice and license
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Threading;
using System.Collections.Generic;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// Pool of objects.
/// </summary>
internal interface IObjectPool<T> : IDisposable
where T : class
{
T Lease();
void Return(T item);
}
}

@ -52,6 +52,7 @@ namespace Grpc.Core.Internal
public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate grpcsharp_batch_context_recv_status_on_client_details;
public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate grpcsharp_batch_context_recv_status_on_client_trailing_metadata;
public readonly Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate grpcsharp_batch_context_recv_close_on_server_cancelled;
public readonly Delegates.grpcsharp_batch_context_reset_delegate grpcsharp_batch_context_reset;
public readonly Delegates.grpcsharp_batch_context_destroy_delegate grpcsharp_batch_context_destroy;
public readonly Delegates.grpcsharp_request_call_context_create_delegate grpcsharp_request_call_context_create;
@ -169,6 +170,7 @@ namespace Grpc.Core.Internal
this.grpcsharp_batch_context_recv_status_on_client_details = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate>(library);
this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate>(library);
this.grpcsharp_batch_context_recv_close_on_server_cancelled = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate>(library);
this.grpcsharp_batch_context_reset = GetMethodDelegate<Delegates.grpcsharp_batch_context_reset_delegate>(library);
this.grpcsharp_batch_context_destroy = GetMethodDelegate<Delegates.grpcsharp_batch_context_destroy_delegate>(library);
this.grpcsharp_request_call_context_create = GetMethodDelegate<Delegates.grpcsharp_request_call_context_create_delegate>(library);
@ -311,6 +313,7 @@ namespace Grpc.Core.Internal
public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_details_delegate(BatchContextSafeHandle ctx, out UIntPtr detailsLength);
public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate(BatchContextSafeHandle ctx);
public delegate int grpcsharp_batch_context_recv_close_on_server_cancelled_delegate(BatchContextSafeHandle ctx);
public delegate void grpcsharp_batch_context_reset_delegate(BatchContextSafeHandle ctx);
public delegate void grpcsharp_batch_context_destroy_delegate(IntPtr ctx);
public delegate RequestCallContextSafeHandle grpcsharp_request_call_context_create_delegate();

@ -64,10 +64,9 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
// TODO(jtattermusch): delegate allocation by caller can be avoided by utilizing the "state" object,
// but server shutdown isn't worth optimizing right now.
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback, null);
var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(callback, null);
Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx);
}
}

@ -43,7 +43,7 @@ namespace Grpc.Microbenchmarks
public void Run(int threadCount, int iterations, bool useSharedRegistry)
{
Console.WriteLine(string.Format("CompletionRegistryBenchmark: threads={0}, iterations={1}, useSharedRegistry={2}", threadCount, iterations, useSharedRegistry));
CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment) : null;
CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create()) : null;
var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, sharedRegistry));
threadedBenchmark.Run();
// TODO: parametrize by number of pending completions
@ -51,7 +51,7 @@ namespace Grpc.Microbenchmarks
private void ThreadBody(int iterations, CompletionRegistry optionalSharedRegistry)
{
var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment);
var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create());
var ctx = BatchContextSafeHandle.Create();
var stopwatch = Stopwatch.StartNew();
@ -64,7 +64,7 @@ namespace Grpc.Microbenchmarks
stopwatch.Stop();
Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds);
ctx.Dispose();
ctx.Recycle();
}
private class NopCompletionCallback : IOpCompletionCallback

@ -52,10 +52,7 @@ namespace Grpc.Microbenchmarks
private void ThreadBody(int iterations, int payloadSize)
{
// TODO(jtattermusch): parametrize by number of pending completions.
// TODO(jtattermusch): parametrize by cached/non-cached BatchContextSafeHandle
var completionRegistry = new CompletionRegistry(environment);
var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease());
var cq = CompletionQueueSafeHandle.CreateAsync(completionRegistry);
var call = CreateFakeCall(cq);

@ -197,10 +197,7 @@ void grpcsharp_metadata_array_move(grpc_metadata_array* dest,
}
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_batch_context_destroy(grpcsharp_batch_context* ctx) {
if (!ctx) {
return;
}
grpcsharp_batch_context_reset(grpcsharp_batch_context* ctx) {
grpcsharp_metadata_array_destroy_metadata_including_entries(
&(ctx->send_initial_metadata));
@ -216,7 +213,15 @@ grpcsharp_batch_context_destroy(grpcsharp_batch_context* ctx) {
grpcsharp_metadata_array_destroy_metadata_only(
&(ctx->recv_status_on_client.trailing_metadata));
grpc_slice_unref(ctx->recv_status_on_client.status_details);
memset(ctx, 0, sizeof(grpcsharp_batch_context));
}
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_batch_context_destroy(grpcsharp_batch_context* ctx) {
if (!ctx) {
return;
}
grpcsharp_batch_context_reset(ctx);
gpr_free(ctx);
}

@ -5,6 +5,7 @@
"Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest",
"Grpc.Core.Internal.Tests.CompletionQueueEventTest",
"Grpc.Core.Internal.Tests.CompletionQueueSafeHandleTest",
"Grpc.Core.Internal.Tests.DefaultObjectPoolTest",
"Grpc.Core.Internal.Tests.MetadataArraySafeHandleTest",
"Grpc.Core.Internal.Tests.TimespecTest",
"Grpc.Core.Tests.AppDomainUnloadTest",

Loading…
Cancel
Save