Merge branch 'master' into mdzoba-add-gyp-build

pull/12206/head
Maxim Dzoba 8 years ago
commit 0e20b8bd1f
  1. 2
      src/csharp/Grpc.Core/Internal/AtomicCounter.cs
  2. 37
      src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs

@ -64,7 +64,7 @@ namespace Grpc.Core.Internal
{ {
get get
{ {
return counter; return Interlocked.Read(ref counter);
} }
} }
} }

@ -33,8 +33,8 @@ namespace Grpc.Core.Internal
internal class GrpcThreadPool internal class GrpcThreadPool
{ {
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<GrpcThreadPool>(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<GrpcThreadPool>();
static readonly WaitCallback RunCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true)); const int FinishContinuationsSleepMillis = 10;
static readonly WaitCallback RunCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false)); const int MaxFinishContinuationsSleepTotalMillis = 10000;
readonly GrpcEnvironment environment; readonly GrpcEnvironment environment;
readonly object myLock = new object(); readonly object myLock = new object();
@ -42,6 +42,9 @@ namespace Grpc.Core.Internal
readonly int poolSize; readonly int poolSize;
readonly int completionQueueCount; readonly int completionQueueCount;
readonly bool inlineHandlers; readonly bool inlineHandlers;
readonly WaitCallback runCompletionQueueEventCallbackSuccess;
readonly WaitCallback runCompletionQueueEventCallbackFailure;
readonly AtomicCounter queuedContinuationCounter = new AtomicCounter();
readonly List<BasicProfiler> threadProfilers = new List<BasicProfiler>(); // profilers assigned to threadpool threads readonly List<BasicProfiler> threadProfilers = new List<BasicProfiler>(); // profilers assigned to threadpool threads
@ -64,6 +67,9 @@ namespace Grpc.Core.Internal
this.inlineHandlers = inlineHandlers; this.inlineHandlers = inlineHandlers;
GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount, GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
"Thread pool size cannot be smaller than the number of completion queues used."); "Thread pool size cannot be smaller than the number of completion queues used.");
this.runCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true));
this.runCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false));
} }
public void Start() public void Start()
@ -173,7 +179,8 @@ namespace Grpc.Core.Internal
// Use cached delegates to avoid unnecessary allocations // Use cached delegates to avoid unnecessary allocations
if (!inlineHandlers) if (!inlineHandlers)
{ {
ThreadPool.QueueUserWorkItem(success ? RunCompletionQueueEventCallbackSuccess : RunCompletionQueueEventCallbackFailure, callback); queuedContinuationCounter.Increment();
ThreadPool.QueueUserWorkItem(success ? runCompletionQueueEventCallbackSuccess : runCompletionQueueEventCallbackFailure, callback);
} }
else else
{ {
@ -187,6 +194,24 @@ namespace Grpc.Core.Internal
} }
} }
while (ev.type != CompletionQueueEvent.CompletionType.Shutdown); while (ev.type != CompletionQueueEvent.CompletionType.Shutdown);
// Continuations are running on default threadpool that consists of background threads.
// GrpcThreadPool thread (a foreground thread) will not exit unless all queued work had
// been finished to prevent terminating the continuations queued prematurely.
int sleepIterations = 0;
while (queuedContinuationCounter.Count != 0)
{
// Only happens on shutdown and having pending continuations shouldn't very common,
// so sleeping here for a little bit is fine.
if (sleepIterations >= MaxFinishContinuationsSleepTotalMillis / FinishContinuationsSleepMillis)
{
Logger.Warning("Shutting down gRPC thread [{0}] with unfinished callbacks (Timed out waiting for callbacks to finish).",
Thread.CurrentThread.Name);
break;
}
Thread.Sleep(FinishContinuationsSleepMillis);
sleepIterations ++;
}
} }
private static IReadOnlyCollection<CompletionQueueSafeHandle> CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount) private static IReadOnlyCollection<CompletionQueueSafeHandle> CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount)
@ -200,7 +225,7 @@ namespace Grpc.Core.Internal
return list.AsReadOnly(); return list.AsReadOnly();
} }
private static void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success) private void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success)
{ {
try try
{ {
@ -210,6 +235,10 @@ namespace Grpc.Core.Internal
{ {
Logger.Error(e, "Exception occured while invoking completion delegate"); Logger.Error(e, "Exception occured while invoking completion delegate");
} }
finally
{
queuedContinuationCounter.Decrement();
}
} }
} }
} }

Loading…
Cancel
Save