Merge pull request #6641 from jtattermusch/backport_csharp_to_14_2

Backport C# changes to 0.14 branch
pull/7068/head
Jan Tattermusch 9 years ago
commit 3cdb0ef3d0
  1. 39
      src/csharp/Grpc.Core.Tests/ChannelTest.cs
  2. 51
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
  3. 34
      src/csharp/Grpc.Core/Channel.cs
  4. 15
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  5. 2
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  6. 4
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs

@ -32,6 +32,7 @@
#endregion
using System;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
@ -89,5 +90,43 @@ namespace Grpc.Core.Tests
channel.ShutdownAsync().Wait();
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await channel.ShutdownAsync());
}
[Test]
public async Task ShutdownTokenCancelledAfterShutdown()
{
var channel = new Channel("localhost", ChannelCredentials.Insecure);
Assert.IsFalse(channel.ShutdownToken.IsCancellationRequested);
var shutdownTask = channel.ShutdownAsync();
Assert.IsTrue(channel.ShutdownToken.IsCancellationRequested);
await shutdownTask;
}
[Test]
public async Task StateIsFatalFailureAfterShutdown()
{
var channel = new Channel("localhost", ChannelCredentials.Insecure);
await channel.ShutdownAsync();
Assert.AreEqual(ChannelState.FatalFailure, channel.State);
}
[Test]
public async Task ShutdownFinishesWaitForStateChangedAsync()
{
var channel = new Channel("localhost", ChannelCredentials.Insecure);
var stateChangedTask = channel.WaitForStateChangedAsync(ChannelState.Idle);
var shutdownTask = channel.ShutdownAsync();
await stateChangedTask;
await shutdownTask;
}
[Test]
public async Task OperationsThrowAfterShutdown()
{
var channel = new Channel("localhost", ChannelCredentials.Insecure);
await channel.ShutdownAsync();
Assert.ThrowsAsync(typeof(ObjectDisposedException), async () => await channel.WaitForStateChangedAsync(ChannelState.Idle));
Assert.Throws(typeof(ObjectDisposedException), () => { var x = channel.ResolvedTarget; });
Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await channel.ConnectAsync());
}
}
}

@ -181,13 +181,14 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void ClientStreaming_WriteFailure()
public void ClientStreaming_WriteCompletionFailure()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
// TODO: maybe IOException or waiting for RPCException is more appropriate here.
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
fakeCall.UnaryResponseClientHandler(true,
@ -199,7 +200,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void ClientStreaming_WriteAfterReceivingStatusFails()
public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@ -210,7 +211,44 @@ namespace Grpc.Core.Internal.Tests
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
[Test]
public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException2()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange);
var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode);
}
[Test]
public void ClientStreaming_WriteAfterCompleteThrowsInvalidOperationException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
requestStream.CompleteAsync();
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
fakeCall.SendCompletionHandler(true);
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
}
[Test]
@ -229,7 +267,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void ClientStreaming_WriteAfterCancellationRequestFails()
public void ClientStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@ -340,7 +378,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void DuplexStreaming_WriteAfterReceivingStatusFails()
public void DuplexStreaming_WriteAfterReceivingStatusThrowsRpcException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@ -352,7 +390,8 @@ namespace Grpc.Core.Internal.Tests
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await requestStream.WriteAsync("request1"));
var ex = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("request1"));
Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
[Test]
@ -372,7 +411,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void DuplexStreaming_WriteAfterCancellationRequestFails()
public void DuplexStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);

@ -32,6 +32,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@ -51,6 +52,7 @@ namespace Grpc.Core
readonly object myLock = new object();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
readonly CancellationTokenSource shutdownTokenSource = new CancellationTokenSource();
readonly string target;
readonly GrpcEnvironment environment;
@ -101,12 +103,13 @@ namespace Grpc.Core
/// <summary>
/// Gets current connectivity state of this channel.
/// After channel is has been shutdown, <c>ChannelState.FatalFailure</c> will be returned.
/// </summary>
public ChannelState State
{
get
{
return handle.CheckConnectivityState(false);
return GetConnectivityState(false);
}
}
@ -154,6 +157,17 @@ namespace Grpc.Core
}
}
/// <summary>
/// Returns a token that gets cancelled once <c>ShutdownAsync</c> is invoked.
/// </summary>
public CancellationToken ShutdownToken
{
get
{
return this.shutdownTokenSource.Token;
}
}
/// <summary>
/// Allows explicitly requesting channel to connect without starting an RPC.
/// Returned task completes once state Ready was seen. If the deadline is reached,
@ -164,7 +178,7 @@ namespace Grpc.Core
/// <param name="deadline">The deadline. <c>null</c> indicates no deadline.</param>
public async Task ConnectAsync(DateTime? deadline = null)
{
var currentState = handle.CheckConnectivityState(true);
var currentState = GetConnectivityState(true);
while (currentState != ChannelState.Ready)
{
if (currentState == ChannelState.FatalFailure)
@ -172,7 +186,7 @@ namespace Grpc.Core
throw new OperationCanceledException("Channel has reached FatalFailure state.");
}
await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false);
currentState = handle.CheckConnectivityState(false);
currentState = GetConnectivityState(false);
}
}
@ -188,6 +202,8 @@ namespace Grpc.Core
shutdownRequested = true;
}
shutdownTokenSource.Cancel();
var activeCallCount = activeCallCounter.Count;
if (activeCallCount > 0)
{
@ -231,6 +247,18 @@ namespace Grpc.Core
activeCallCounter.Decrement();
}
private ChannelState GetConnectivityState(bool tryToConnect)
{
try
{
return handle.CheckConnectivityState(tryToConnect);
}
catch (ObjectDisposedException)
{
return ChannelState.FatalFailure;
}
}
private static void EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options)
{
var key = ChannelOptions.PrimaryUserAgentString;

@ -57,7 +57,7 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
// Indicates that steaming call has finished.
// Indicates that response streaming call has finished.
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
// Response headers set here once received.
@ -443,6 +443,19 @@ namespace Grpc.Core.Internal
}
}
protected override void CheckSendingAllowed(bool allowFinished)
{
base.CheckSendingAllowed(true);
// throwing RpcException if we already received status on client
// side makes the most sense.
// Note that this throws even for StatusCode.OK.
if (!allowFinished && finishedStatus.HasValue)
{
throw new RpcException(finishedStatus.Value.Status);
}
}
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>

@ -213,7 +213,7 @@ namespace Grpc.Core.Internal
{
}
protected void CheckSendingAllowed(bool allowFinished)
protected virtual void CheckSendingAllowed(bool allowFinished)
{
GrpcPreconditions.CheckState(started);
CheckNotCancelled();

@ -492,6 +492,10 @@ namespace Grpc.IntegrationTesting
{
// Deadline was reached before write has started. Eat the exception and continue.
}
catch (RpcException)
{
// Deadline was reached before write has started. Eat the exception and continue.
}
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.

Loading…
Cancel
Save