From 7a3bd5b7d6cc22925582bdb5aec67d71848df803 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 14 Aug 2017 09:25:55 +0200 Subject: [PATCH 1/3] more correct atomic counter --- src/csharp/Grpc.Core/Internal/AtomicCounter.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/csharp/Grpc.Core/Internal/AtomicCounter.cs b/src/csharp/Grpc.Core/Internal/AtomicCounter.cs index 64e16e4c54d..20e25f9d883 100644 --- a/src/csharp/Grpc.Core/Internal/AtomicCounter.cs +++ b/src/csharp/Grpc.Core/Internal/AtomicCounter.cs @@ -64,7 +64,7 @@ namespace Grpc.Core.Internal { get { - return counter; + return Interlocked.Read(ref counter); } } } From 98ed73c38992384a3a7982d6c8e73b082b7f1169 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 14 Aug 2017 09:26:22 +0200 Subject: [PATCH 2/3] wait for queued continuation to finish --- .../Grpc.Core/Internal/GrpcThreadPool.cs | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index 19b44c26189..ea72209178c 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -33,8 +33,7 @@ namespace Grpc.Core.Internal internal class GrpcThreadPool { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); - static readonly WaitCallback RunCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true)); - static readonly WaitCallback RunCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false)); + const int FinishContinuationsSleepMillis = 10; readonly GrpcEnvironment environment; readonly object myLock = new object(); @@ -42,6 +41,9 @@ namespace Grpc.Core.Internal readonly int poolSize; readonly int completionQueueCount; readonly bool inlineHandlers; + readonly WaitCallback runCompletionQueueEventCallbackSuccess; + readonly WaitCallback runCompletionQueueEventCallbackFailure; + readonly AtomicCounter queuedContinuationCounter = new AtomicCounter(); readonly List threadProfilers = new List(); // profilers assigned to threadpool threads @@ -64,6 +66,9 @@ namespace Grpc.Core.Internal this.inlineHandlers = inlineHandlers; GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount, "Thread pool size cannot be smaller than the number of completion queues used."); + + this.runCompletionQueueEventCallbackSuccess = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, true)); + this.runCompletionQueueEventCallbackFailure = new WaitCallback((callback) => RunCompletionQueueEventCallback((OpCompletionDelegate) callback, false)); } public void Start() @@ -173,7 +178,8 @@ namespace Grpc.Core.Internal // Use cached delegates to avoid unnecessary allocations if (!inlineHandlers) { - ThreadPool.QueueUserWorkItem(success ? RunCompletionQueueEventCallbackSuccess : RunCompletionQueueEventCallbackFailure, callback); + queuedContinuationCounter.Increment(); + ThreadPool.QueueUserWorkItem(success ? runCompletionQueueEventCallbackSuccess : runCompletionQueueEventCallbackFailure, callback); } else { @@ -187,6 +193,16 @@ namespace Grpc.Core.Internal } } while (ev.type != CompletionQueueEvent.CompletionType.Shutdown); + + // Continuations are running on default threadpool that consists of background threads. + // GrpcThreadPool thread (a foreground thread) will not exit unless all queued work had + // been finished to prevent terminating the continuations queued prematurely. + while (queuedContinuationCounter.Count != 0) + { + // Only happens on shutdown and having pending continuations shouldn't very common, + // so sleeping here for a little bit is fine. + Thread.Sleep(FinishContinuationsSleepMillis); + } } private static IReadOnlyCollection CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount) @@ -200,7 +216,7 @@ namespace Grpc.Core.Internal return list.AsReadOnly(); } - private static void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success) + private void RunCompletionQueueEventCallback(OpCompletionDelegate callback, bool success) { try { @@ -210,6 +226,10 @@ namespace Grpc.Core.Internal { Logger.Error(e, "Exception occured while invoking completion delegate"); } + finally + { + queuedContinuationCounter.Decrement(); + } } } } From 6bfe44daba9c2c549f5d3b435340c6e8ac1fee81 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 16 Aug 2017 20:46:07 +0200 Subject: [PATCH 3/3] give C# continuations 10 secs to finish on shutdown --- src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index ea72209178c..3c94b602c0d 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -34,6 +34,7 @@ namespace Grpc.Core.Internal { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); const int FinishContinuationsSleepMillis = 10; + const int MaxFinishContinuationsSleepTotalMillis = 10000; readonly GrpcEnvironment environment; readonly object myLock = new object(); @@ -197,11 +198,19 @@ namespace Grpc.Core.Internal // Continuations are running on default threadpool that consists of background threads. // GrpcThreadPool thread (a foreground thread) will not exit unless all queued work had // been finished to prevent terminating the continuations queued prematurely. + int sleepIterations = 0; while (queuedContinuationCounter.Count != 0) { // Only happens on shutdown and having pending continuations shouldn't very common, // so sleeping here for a little bit is fine. + if (sleepIterations >= MaxFinishContinuationsSleepTotalMillis / FinishContinuationsSleepMillis) + { + Logger.Warning("Shutting down gRPC thread [{0}] with unfinished callbacks (Timed out waiting for callbacks to finish).", + Thread.CurrentThread.Name); + break; + } Thread.Sleep(FinishContinuationsSleepMillis); + sleepIterations ++; } }