Merge pull request #6493 from jtattermusch/csharp_streaming_api_exceptions

C#: make writes throw RpcException if the client-side call has already finished
pull/6529/head
Jan Tattermusch 9 years ago
commit 1fc79fccf4
  1. 51
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
  2. 15
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  3. 2
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  4. 4
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs

@ -181,13 +181,14 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void ClientStreaming_WriteFailure()
public void ClientStreaming_WriteCompletionFailure()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
// TODO: maybe IOException or waiting for RPCException is more appropriate here.
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
fakeCall.UnaryResponseClientHandler(true,
@ -199,7 +200,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void ClientStreaming_WriteAfterReceivingStatusFails()
public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@ -210,7 +211,44 @@ namespace Grpc.Core.Internal.Tests
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
[Test]
public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException2()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange);
var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode);
}
[Test]
public void ClientStreaming_WriteAfterCompleteThrowsInvalidOperationException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
requestStream.CompleteAsync();
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
fakeCall.SendCompletionHandler(true);
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
}
[Test]
@ -229,7 +267,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void ClientStreaming_WriteAfterCancellationRequestFails()
public void ClientStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@ -340,7 +378,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void DuplexStreaming_WriteAfterReceivingStatusFails()
public void DuplexStreaming_WriteAfterReceivingStatusThrowsRpcException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@ -352,7 +390,8 @@ namespace Grpc.Core.Internal.Tests
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await requestStream.WriteAsync("request1"));
var ex = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("request1"));
Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
[Test]
@ -372,7 +411,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void DuplexStreaming_WriteAfterCancellationRequestFails()
public void DuplexStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);

@ -57,7 +57,7 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
// Indicates that steaming call has finished.
// Indicates that response streaming call has finished.
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
// Response headers set here once received.
@ -443,6 +443,19 @@ namespace Grpc.Core.Internal
}
}
protected override void CheckSendingAllowed(bool allowFinished)
{
base.CheckSendingAllowed(true);
// throwing RpcException if we already received status on client
// side makes the most sense.
// Note that this throws even for StatusCode.OK.
if (!allowFinished && finishedStatus.HasValue)
{
throw new RpcException(finishedStatus.Value.Status);
}
}
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>

@ -213,7 +213,7 @@ namespace Grpc.Core.Internal
{
}
protected void CheckSendingAllowed(bool allowFinished)
protected virtual void CheckSendingAllowed(bool allowFinished)
{
GrpcPreconditions.CheckState(started);
CheckNotCancelled();

@ -492,6 +492,10 @@ namespace Grpc.IntegrationTesting
{
// Deadline was reached before write has started. Eat the exception and continue.
}
catch (RpcException)
{
// Deadline was reached before write has started. Eat the exception and continue.
}
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.

Loading…
Cancel
Save