Merge pull request #6560 from jtattermusch/csharp_channel_shutdown_improvements

Improve channel behavior in shutdown situations
pull/6445/head^2
Jan Tattermusch 9 years ago
commit 26dd2b8d6b
  1. 39
      src/csharp/Grpc.Core.Tests/ChannelTest.cs
  2. 34
      src/csharp/Grpc.Core/Channel.cs

@ -32,6 +32,7 @@
#endregion
using System;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
@ -89,5 +90,43 @@ namespace Grpc.Core.Tests
channel.ShutdownAsync().Wait();
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await channel.ShutdownAsync());
}
[Test]
public async Task ShutdownTokenCancelledAfterShutdown()
{
var channel = new Channel("localhost", ChannelCredentials.Insecure);
Assert.IsFalse(channel.ShutdownToken.IsCancellationRequested);
var shutdownTask = channel.ShutdownAsync();
Assert.IsTrue(channel.ShutdownToken.IsCancellationRequested);
await shutdownTask;
}
[Test]
public async Task StateIsFatalFailureAfterShutdown()
{
var channel = new Channel("localhost", ChannelCredentials.Insecure);
await channel.ShutdownAsync();
Assert.AreEqual(ChannelState.FatalFailure, channel.State);
}
[Test]
public async Task ShutdownFinishesWaitForStateChangedAsync()
{
var channel = new Channel("localhost", ChannelCredentials.Insecure);
var stateChangedTask = channel.WaitForStateChangedAsync(ChannelState.Idle);
var shutdownTask = channel.ShutdownAsync();
await stateChangedTask;
await shutdownTask;
}
[Test]
public async Task OperationsThrowAfterShutdown()
{
var channel = new Channel("localhost", ChannelCredentials.Insecure);
await channel.ShutdownAsync();
Assert.ThrowsAsync(typeof(ObjectDisposedException), async () => await channel.WaitForStateChangedAsync(ChannelState.Idle));
Assert.Throws(typeof(ObjectDisposedException), () => { var x = channel.ResolvedTarget; });
Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await channel.ConnectAsync());
}
}
}

@ -32,6 +32,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@ -51,6 +52,7 @@ namespace Grpc.Core
readonly object myLock = new object();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
readonly CancellationTokenSource shutdownTokenSource = new CancellationTokenSource();
readonly string target;
readonly GrpcEnvironment environment;
@ -101,12 +103,13 @@ namespace Grpc.Core
/// <summary>
/// Gets current connectivity state of this channel.
/// After channel is has been shutdown, <c>ChannelState.FatalFailure</c> will be returned.
/// </summary>
public ChannelState State
{
get
{
return handle.CheckConnectivityState(false);
return GetConnectivityState(false);
}
}
@ -154,6 +157,17 @@ namespace Grpc.Core
}
}
/// <summary>
/// Returns a token that gets cancelled once <c>ShutdownAsync</c> is invoked.
/// </summary>
public CancellationToken ShutdownToken
{
get
{
return this.shutdownTokenSource.Token;
}
}
/// <summary>
/// Allows explicitly requesting channel to connect without starting an RPC.
/// Returned task completes once state Ready was seen. If the deadline is reached,
@ -164,7 +178,7 @@ namespace Grpc.Core
/// <param name="deadline">The deadline. <c>null</c> indicates no deadline.</param>
public async Task ConnectAsync(DateTime? deadline = null)
{
var currentState = handle.CheckConnectivityState(true);
var currentState = GetConnectivityState(true);
while (currentState != ChannelState.Ready)
{
if (currentState == ChannelState.FatalFailure)
@ -172,7 +186,7 @@ namespace Grpc.Core
throw new OperationCanceledException("Channel has reached FatalFailure state.");
}
await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false);
currentState = handle.CheckConnectivityState(false);
currentState = GetConnectivityState(false);
}
}
@ -188,6 +202,8 @@ namespace Grpc.Core
shutdownRequested = true;
}
shutdownTokenSource.Cancel();
var activeCallCount = activeCallCounter.Count;
if (activeCallCount > 0)
{
@ -231,6 +247,18 @@ namespace Grpc.Core
activeCallCounter.Decrement();
}
private ChannelState GetConnectivityState(bool tryToConnect)
{
try
{
return handle.CheckConnectivityState(tryToConnect);
}
catch (ObjectDisposedException)
{
return ChannelState.FatalFailure;
}
}
private static void EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options)
{
var key = ChannelOptions.PrimaryUserAgentString;

Loading…
Cancel
Save