From 528fb6651cf6c19032e30dacdf437a3b7caf05e3 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 12 May 2016 08:38:41 -0700 Subject: [PATCH] improve channel behavior in shutdown situations --- src/csharp/Grpc.Core.Tests/ChannelTest.cs | 39 +++++++++++++++++++++++ src/csharp/Grpc.Core/Channel.cs | 34 ++++++++++++++++++-- 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/src/csharp/Grpc.Core.Tests/ChannelTest.cs b/src/csharp/Grpc.Core.Tests/ChannelTest.cs index 6330f50faed..850d70ce926 100644 --- a/src/csharp/Grpc.Core.Tests/ChannelTest.cs +++ b/src/csharp/Grpc.Core.Tests/ChannelTest.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Threading.Tasks; using Grpc.Core; using Grpc.Core.Internal; using Grpc.Core.Utils; @@ -89,5 +90,43 @@ namespace Grpc.Core.Tests channel.ShutdownAsync().Wait(); Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await channel.ShutdownAsync()); } + + [Test] + public async Task ShutdownTokenCancelledAfterShutdown() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + Assert.IsFalse(channel.ShutdownToken.IsCancellationRequested); + var shutdownTask = channel.ShutdownAsync(); + Assert.IsTrue(channel.ShutdownToken.IsCancellationRequested); + await shutdownTask; + } + + [Test] + public async Task StateIsFatalFailureAfterShutdown() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + await channel.ShutdownAsync(); + Assert.AreEqual(ChannelState.FatalFailure, channel.State); + } + + [Test] + public async Task ShutdownFinishesWaitForStateChangedAsync() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + var stateChangedTask = channel.WaitForStateChangedAsync(ChannelState.Idle); + var shutdownTask = channel.ShutdownAsync(); + await stateChangedTask; + await shutdownTask; + } + + [Test] + public async Task OperationsThrowAfterShutdown() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + await channel.ShutdownAsync(); + Assert.ThrowsAsync(typeof(ObjectDisposedException), async () => await channel.WaitForStateChangedAsync(ChannelState.Idle)); + Assert.Throws(typeof(ObjectDisposedException), () => { var x = channel.ResolvedTarget; }); + Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await channel.ConnectAsync()); + } } } diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 89981b1849b..93a6e6a3d95 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -32,6 +32,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Grpc.Core.Internal; @@ -51,6 +52,7 @@ namespace Grpc.Core readonly object myLock = new object(); readonly AtomicCounter activeCallCounter = new AtomicCounter(); + readonly CancellationTokenSource shutdownTokenSource = new CancellationTokenSource(); readonly string target; readonly GrpcEnvironment environment; @@ -101,12 +103,13 @@ namespace Grpc.Core /// /// Gets current connectivity state of this channel. + /// After channel is has been shutdown, ChannelState.FatalFailure will be returned. /// public ChannelState State { get { - return handle.CheckConnectivityState(false); + return GetConnectivityState(false); } } @@ -154,6 +157,17 @@ namespace Grpc.Core } } + /// + /// Returns a token that gets cancelled once ShutdownAsync is invoked. + /// + public CancellationToken ShutdownToken + { + get + { + return this.shutdownTokenSource.Token; + } + } + /// /// Allows explicitly requesting channel to connect without starting an RPC. /// Returned task completes once state Ready was seen. If the deadline is reached, @@ -164,7 +178,7 @@ namespace Grpc.Core /// The deadline. null indicates no deadline. public async Task ConnectAsync(DateTime? deadline = null) { - var currentState = handle.CheckConnectivityState(true); + var currentState = GetConnectivityState(true); while (currentState != ChannelState.Ready) { if (currentState == ChannelState.FatalFailure) @@ -172,7 +186,7 @@ namespace Grpc.Core throw new OperationCanceledException("Channel has reached FatalFailure state."); } await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false); - currentState = handle.CheckConnectivityState(false); + currentState = GetConnectivityState(false); } } @@ -188,6 +202,8 @@ namespace Grpc.Core shutdownRequested = true; } + shutdownTokenSource.Cancel(); + var activeCallCount = activeCallCounter.Count; if (activeCallCount > 0) { @@ -231,6 +247,18 @@ namespace Grpc.Core activeCallCounter.Decrement(); } + private ChannelState GetConnectivityState(bool tryToConnect) + { + try + { + return handle.CheckConnectivityState(tryToConnect); + } + catch (ObjectDisposedException) + { + return ChannelState.FatalFailure; + } + } + private static void EnsureUserAgentChannelOption(Dictionary options) { var key = ChannelOptions.PrimaryUserAgentString;