improve channel behavior in shutdown situations

pull/6560/head
Jan Tattermusch 9 years ago
parent 1acbe3e8a8
commit 528fb6651c
  1. 39
      src/csharp/Grpc.Core.Tests/ChannelTest.cs
  2. 34
      src/csharp/Grpc.Core/Channel.cs

@ -32,6 +32,7 @@
#endregion #endregion
using System; using System;
using System.Threading.Tasks;
using Grpc.Core; using Grpc.Core;
using Grpc.Core.Internal; using Grpc.Core.Internal;
using Grpc.Core.Utils; using Grpc.Core.Utils;
@ -89,5 +90,43 @@ namespace Grpc.Core.Tests
channel.ShutdownAsync().Wait(); channel.ShutdownAsync().Wait();
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await channel.ShutdownAsync()); Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await channel.ShutdownAsync());
} }
[Test]
public async Task ShutdownTokenCancelledAfterShutdown()
{
var channel = new Channel("localhost", ChannelCredentials.Insecure);
Assert.IsFalse(channel.ShutdownToken.IsCancellationRequested);
var shutdownTask = channel.ShutdownAsync();
Assert.IsTrue(channel.ShutdownToken.IsCancellationRequested);
await shutdownTask;
}
[Test]
public async Task StateIsFatalFailureAfterShutdown()
{
var channel = new Channel("localhost", ChannelCredentials.Insecure);
await channel.ShutdownAsync();
Assert.AreEqual(ChannelState.FatalFailure, channel.State);
}
[Test]
public async Task ShutdownFinishesWaitForStateChangedAsync()
{
var channel = new Channel("localhost", ChannelCredentials.Insecure);
var stateChangedTask = channel.WaitForStateChangedAsync(ChannelState.Idle);
var shutdownTask = channel.ShutdownAsync();
await stateChangedTask;
await shutdownTask;
}
[Test]
public async Task OperationsThrowAfterShutdown()
{
var channel = new Channel("localhost", ChannelCredentials.Insecure);
await channel.ShutdownAsync();
Assert.ThrowsAsync(typeof(ObjectDisposedException), async () => await channel.WaitForStateChangedAsync(ChannelState.Idle));
Assert.Throws(typeof(ObjectDisposedException), () => { var x = channel.ResolvedTarget; });
Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await channel.ConnectAsync());
}
} }
} }

@ -32,6 +32,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Grpc.Core.Internal; using Grpc.Core.Internal;
@ -51,6 +52,7 @@ namespace Grpc.Core
readonly object myLock = new object(); readonly object myLock = new object();
readonly AtomicCounter activeCallCounter = new AtomicCounter(); readonly AtomicCounter activeCallCounter = new AtomicCounter();
readonly CancellationTokenSource shutdownTokenSource = new CancellationTokenSource();
readonly string target; readonly string target;
readonly GrpcEnvironment environment; readonly GrpcEnvironment environment;
@ -101,12 +103,13 @@ namespace Grpc.Core
/// <summary> /// <summary>
/// Gets current connectivity state of this channel. /// Gets current connectivity state of this channel.
/// After channel is has been shutdown, <c>ChannelState.FatalFailure</c> will be returned.
/// </summary> /// </summary>
public ChannelState State public ChannelState State
{ {
get get
{ {
return handle.CheckConnectivityState(false); return GetConnectivityState(false);
} }
} }
@ -154,6 +157,17 @@ namespace Grpc.Core
} }
} }
/// <summary>
/// Returns a token that gets cancelled once <c>ShutdownAsync</c> is invoked.
/// </summary>
public CancellationToken ShutdownToken
{
get
{
return this.shutdownTokenSource.Token;
}
}
/// <summary> /// <summary>
/// Allows explicitly requesting channel to connect without starting an RPC. /// Allows explicitly requesting channel to connect without starting an RPC.
/// Returned task completes once state Ready was seen. If the deadline is reached, /// Returned task completes once state Ready was seen. If the deadline is reached,
@ -164,7 +178,7 @@ namespace Grpc.Core
/// <param name="deadline">The deadline. <c>null</c> indicates no deadline.</param> /// <param name="deadline">The deadline. <c>null</c> indicates no deadline.</param>
public async Task ConnectAsync(DateTime? deadline = null) public async Task ConnectAsync(DateTime? deadline = null)
{ {
var currentState = handle.CheckConnectivityState(true); var currentState = GetConnectivityState(true);
while (currentState != ChannelState.Ready) while (currentState != ChannelState.Ready)
{ {
if (currentState == ChannelState.FatalFailure) if (currentState == ChannelState.FatalFailure)
@ -172,7 +186,7 @@ namespace Grpc.Core
throw new OperationCanceledException("Channel has reached FatalFailure state."); throw new OperationCanceledException("Channel has reached FatalFailure state.");
} }
await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false); await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false);
currentState = handle.CheckConnectivityState(false); currentState = GetConnectivityState(false);
} }
} }
@ -188,6 +202,8 @@ namespace Grpc.Core
shutdownRequested = true; shutdownRequested = true;
} }
shutdownTokenSource.Cancel();
var activeCallCount = activeCallCounter.Count; var activeCallCount = activeCallCounter.Count;
if (activeCallCount > 0) if (activeCallCount > 0)
{ {
@ -231,6 +247,18 @@ namespace Grpc.Core
activeCallCounter.Decrement(); activeCallCounter.Decrement();
} }
private ChannelState GetConnectivityState(bool tryToConnect)
{
try
{
return handle.CheckConnectivityState(tryToConnect);
}
catch (ObjectDisposedException)
{
return ChannelState.FatalFailure;
}
}
private static void EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options) private static void EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options)
{ {
var key = ChannelOptions.PrimaryUserAgentString; var key = ChannelOptions.PrimaryUserAgentString;

Loading…
Cancel
Save