diff --git a/src/csharp/Grpc.Core/Internal/AtomicCounter.cs b/src/csharp/Grpc.Core/Internal/AtomicCounter.cs index 64e16e4c54d..20e25f9d883 100644 --- a/src/csharp/Grpc.Core/Internal/AtomicCounter.cs +++ b/src/csharp/Grpc.Core/Internal/AtomicCounter.cs @@ -64,7 +64,7 @@ namespace Grpc.Core.Internal { get { - return counter; + return Interlocked.Read(ref counter); } } } diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index 19b44c26189..3c94b602c0d 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -33,8 +33,8 @@ namespace Grpc.Core.Internal internal class GrpcThreadPool { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); - static readonly WaitCallback RunCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true)); - static readonly WaitCallback RunCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false)); + const int FinishContinuationsSleepMillis = 10; + const int MaxFinishContinuationsSleepTotalMillis = 10000; readonly GrpcEnvironment environment; readonly object myLock = new object(); @@ -42,6 +42,9 @@ namespace Grpc.Core.Internal readonly int poolSize; readonly int completionQueueCount; readonly bool inlineHandlers; + readonly WaitCallback runCompletionQueueEventCallbackSuccess; + readonly WaitCallback runCompletionQueueEventCallbackFailure; + readonly AtomicCounter queuedContinuationCounter = new AtomicCounter(); readonly List threadProfilers = new List(); // profilers assigned to threadpool threads @@ -64,6 +67,9 @@ namespace Grpc.Core.Internal this.inlineHandlers = inlineHandlers; GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount, "Thread pool size cannot be smaller than the number of completion queues used."); + + this.runCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true)); + this.runCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false)); } public void Start() @@ -173,7 +179,8 @@ namespace Grpc.Core.Internal // Use cached delegates to avoid unnecessary allocations if (!inlineHandlers) { - ThreadPool.QueueUserWorkItem(success ? RunCompletionQueueEventCallbackSuccess : RunCompletionQueueEventCallbackFailure, callback); + queuedContinuationCounter.Increment(); + ThreadPool.QueueUserWorkItem(success ? runCompletionQueueEventCallbackSuccess : runCompletionQueueEventCallbackFailure, callback); } else { @@ -187,6 +194,24 @@ namespace Grpc.Core.Internal } } while (ev.type != CompletionQueueEvent.CompletionType.Shutdown); + + // Continuations are running on default threadpool that consists of background threads. + // GrpcThreadPool thread (a foreground thread) will not exit unless all queued work had + // been finished to prevent terminating the continuations queued prematurely. + int sleepIterations = 0; + while (queuedContinuationCounter.Count != 0) + { + // Only happens on shutdown and having pending continuations shouldn't very common, + // so sleeping here for a little bit is fine. + if (sleepIterations >= MaxFinishContinuationsSleepTotalMillis / FinishContinuationsSleepMillis) + { + Logger.Warning("Shutting down gRPC thread [{0}] with unfinished callbacks (Timed out waiting for callbacks to finish).", + Thread.CurrentThread.Name); + break; + } + Thread.Sleep(FinishContinuationsSleepMillis); + sleepIterations ++; + } } private static IReadOnlyCollection CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount) @@ -200,7 +225,7 @@ namespace Grpc.Core.Internal return list.AsReadOnly(); } - private static void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success) + private void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success) { try { @@ -210,6 +235,10 @@ namespace Grpc.Core.Internal { Logger.Error(e, "Exception occured while invoking completion delegate"); } + finally + { + queuedContinuationCounter.Decrement(); + } } } }