Merge pull request #14118 from grpc/v1.9.x

Upmerge changes from v1.9.x
pull/14086/head^2
Jan Tattermusch 7 years ago committed by GitHub
commit 56427c0163
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs
  2. 4
      src/csharp/Grpc.Core/GrpcEnvironment.cs
  3. 22
      src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
  4. 11
      src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs
  5. 34
      src/csharp/Grpc.Core/Internal/IPooledObject.cs
  6. 23
      src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs

@ -59,6 +59,16 @@ namespace Grpc.Core.Internal.Tests
Assert.AreNotSame(origLeased, pool.Lease()); Assert.AreNotSame(origLeased, pool.Lease());
} }
[Test]
public void LeaseSetsReturnAction()
{
var pool = new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 10, 0);
var origLeased = pool.Lease();
origLeased.ReturnAction(origLeased);
pool.Dispose();
Assert.AreNotSame(origLeased, pool.Lease());
}
[Test] [Test]
public void Constructor() public void Constructor()
{ {
@ -67,8 +77,14 @@ namespace Grpc.Core.Internal.Tests
Assert.Throws<ArgumentException>(() => new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 10, -1)); Assert.Throws<ArgumentException>(() => new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 10, -1));
} }
class TestPooledObject : IDisposable class TestPooledObject : IPooledObject<TestPooledObject>
{ {
public Action<TestPooledObject> ReturnAction;
public void SetReturnToPoolAction(Action<TestPooledObject> returnAction)
{
this.ReturnAction = returnAction;
}
public void Dispose() public void Dispose()
{ {

@ -299,8 +299,8 @@ namespace Grpc.Core
private GrpcEnvironment() private GrpcEnvironment()
{ {
GrpcNativeInit(); GrpcNativeInit();
batchContextPool = new DefaultObjectPool<BatchContextSafeHandle>(() => BatchContextSafeHandle.Create(this.batchContextPool), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity); batchContextPool = new DefaultObjectPool<BatchContextSafeHandle>(() => BatchContextSafeHandle.Create(), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity);
requestCallContextPool = new DefaultObjectPool<RequestCallContextSafeHandle>(() => RequestCallContextSafeHandle.Create(this.requestCallContextPool), requestCallContextPoolSharedCapacity, requestCallContextPoolThreadLocalCapacity); requestCallContextPool = new DefaultObjectPool<RequestCallContextSafeHandle>(() => RequestCallContextSafeHandle.Create(), requestCallContextPoolSharedCapacity, requestCallContextPoolThreadLocalCapacity);
threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers); threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers);
threadPool.Start(); threadPool.Start();
} }

@ -33,22 +33,21 @@ namespace Grpc.Core.Internal
/// <summary> /// <summary>
/// grpcsharp_batch_context /// grpcsharp_batch_context
/// </summary> /// </summary>
internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback, IPooledObject<BatchContextSafeHandle>
{ {
static readonly NativeMethods Native = NativeMethods.Get(); static readonly NativeMethods Native = NativeMethods.Get();
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<BatchContextSafeHandle>(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<BatchContextSafeHandle>();
IObjectPool<BatchContextSafeHandle> ownedByPool; Action<BatchContextSafeHandle> returnToPoolAction;
CompletionCallbackData completionCallbackData; CompletionCallbackData completionCallbackData;
private BatchContextSafeHandle() private BatchContextSafeHandle()
{ {
} }
public static BatchContextSafeHandle Create(IObjectPool<BatchContextSafeHandle> ownedByPool = null) public static BatchContextSafeHandle Create()
{ {
var ctx = Native.grpcsharp_batch_context_create(); var ctx = Native.grpcsharp_batch_context_create();
ctx.ownedByPool = ownedByPool;
return ctx; return ctx;
} }
@ -60,6 +59,12 @@ namespace Grpc.Core.Internal
} }
} }
public void SetReturnToPoolAction(Action<BatchContextSafeHandle> returnAction)
{
GrpcPreconditions.CheckState(returnToPoolAction == null);
returnToPoolAction = returnAction;
}
public void SetCompletionCallback(BatchCompletionDelegate callback, object state) public void SetCompletionCallback(BatchCompletionDelegate callback, object state)
{ {
GrpcPreconditions.CheckState(completionCallbackData.Callback == null); GrpcPreconditions.CheckState(completionCallbackData.Callback == null);
@ -109,10 +114,15 @@ namespace Grpc.Core.Internal
public void Recycle() public void Recycle()
{ {
if (ownedByPool != null) if (returnToPoolAction != null)
{ {
Native.grpcsharp_batch_context_reset(this); Native.grpcsharp_batch_context_reset(this);
ownedByPool.Return(this);
var origReturnAction = returnToPoolAction;
// Not clearing all the references to the pool could prevent garbage collection of the pool object
// and thus cause memory leaks.
returnToPoolAction = null;
origReturnAction(this);
} }
else else
{ {

@ -27,9 +27,10 @@ namespace Grpc.Core.Internal
/// Pool of objects that combines a shared pool and a thread local pool. /// Pool of objects that combines a shared pool and a thread local pool.
/// </summary> /// </summary>
internal class DefaultObjectPool<T> : IObjectPool<T> internal class DefaultObjectPool<T> : IObjectPool<T>
where T : class, IDisposable where T : class, IPooledObject<T>
{ {
readonly object myLock = new object(); readonly object myLock = new object();
readonly Action<T> returnAction;
readonly Func<T> itemFactory; readonly Func<T> itemFactory;
// Queue shared between threads, access needs to be synchronized. // Queue shared between threads, access needs to be synchronized.
@ -54,6 +55,7 @@ namespace Grpc.Core.Internal
{ {
GrpcPreconditions.CheckArgument(sharedCapacity >= 0); GrpcPreconditions.CheckArgument(sharedCapacity >= 0);
GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0); GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0);
this.returnAction = Return;
this.itemFactory = GrpcPreconditions.CheckNotNull(itemFactory, nameof(itemFactory)); this.itemFactory = GrpcPreconditions.CheckNotNull(itemFactory, nameof(itemFactory));
this.sharedQueue = new Queue<T>(sharedCapacity); this.sharedQueue = new Queue<T>(sharedCapacity);
this.sharedCapacity = sharedCapacity; this.sharedCapacity = sharedCapacity;
@ -73,6 +75,13 @@ namespace Grpc.Core.Internal
/// in the thread local pool, it will continue returning new objects created by the factory). /// in the thread local pool, it will continue returning new objects created by the factory).
/// </summary> /// </summary>
public T Lease() public T Lease()
{
var item = LeaseInternal();
item.SetReturnToPoolAction(returnAction);
return item;
}
private T LeaseInternal()
{ {
var localData = threadLocalData.Value; var localData = threadLocalData.Value;
if (localData.Queue.Count > 0) if (localData.Queue.Count > 0)

@ -0,0 +1,34 @@
#region Copyright notice and license
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
namespace Grpc.Core.Internal
{
/// <summary>
/// An object that can be pooled in <c>IObjectPool</c>.
/// </summary>
/// <typeparam name="T"></typeparam>
internal interface IPooledObject<T> : IDisposable
{
/// <summary>
/// Set the action that will be invoked to return a leased object to the pool.
/// </summary>
void SetReturnToPoolAction(Action<T> returnAction);
}
}

@ -20,26 +20,26 @@ using System;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using Grpc.Core; using Grpc.Core;
using Grpc.Core.Logging; using Grpc.Core.Logging;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal namespace Grpc.Core.Internal
{ {
/// <summary> /// <summary>
/// grpcsharp_request_call_context /// grpcsharp_request_call_context
/// </summary> /// </summary>
internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback, IPooledObject<RequestCallContextSafeHandle>
{ {
static readonly NativeMethods Native = NativeMethods.Get(); static readonly NativeMethods Native = NativeMethods.Get();
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<RequestCallContextSafeHandle>(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<RequestCallContextSafeHandle>();
IObjectPool<RequestCallContextSafeHandle> ownedByPool; Action<RequestCallContextSafeHandle> returnToPoolAction;
private RequestCallContextSafeHandle() private RequestCallContextSafeHandle()
{ {
} }
public static RequestCallContextSafeHandle Create(IObjectPool<RequestCallContextSafeHandle> ownedByPool = null) public static RequestCallContextSafeHandle Create()
{ {
var ctx = Native.grpcsharp_request_call_context_create(); var ctx = Native.grpcsharp_request_call_context_create();
ctx.ownedByPool = ownedByPool;
return ctx; return ctx;
} }
@ -51,6 +51,12 @@ namespace Grpc.Core.Internal
} }
} }
public void SetReturnToPoolAction(Action<RequestCallContextSafeHandle> returnAction)
{
GrpcPreconditions.CheckState(returnToPoolAction == null);
returnToPoolAction = returnAction;
}
public RequestCallCompletionDelegate CompletionCallback { get; set; } public RequestCallCompletionDelegate CompletionCallback { get; set; }
// Gets data of server_rpc_new completion. // Gets data of server_rpc_new completion.
@ -76,10 +82,15 @@ namespace Grpc.Core.Internal
public void Recycle() public void Recycle()
{ {
if (ownedByPool != null) if (returnToPoolAction != null)
{ {
Native.grpcsharp_request_call_context_reset(this); Native.grpcsharp_request_call_context_reset(this);
ownedByPool.Return(this);
var origReturnAction = returnToPoolAction;
// Not clearing all the references to the pool could prevent garbage collection of the pool object
// and thus cause memory leaks.
returnToPoolAction = null;
origReturnAction(this);
} }
else else
{ {

Loading…
Cancel
Save