Merge pull request #8084 from jtattermusch/throw_rpcexception_on_failure

Fix wrong exceptions being thrown on send failure.
pull/8139/head
Jan Tattermusch 9 years ago committed by GitHub
commit c4f9c9a486
  1. 4
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
  2. 102
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
  3. 31
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  4. 40
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  5. 6
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs

@ -33,6 +33,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -149,8 +150,7 @@ namespace Grpc.Core.Internal.Tests
var writeTask = responseStream.WriteAsync("request1"); var writeTask = responseStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false); fakeCall.SendCompletionHandler(false);
// TODO(jtattermusch): should we throw a different exception type instead? Assert.ThrowsAsync(typeof(IOException), async () => await writeTask);
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask); AssertFinished(asyncCallServer, fakeCall, finishedTask);

@ -180,21 +180,74 @@ namespace Grpc.Core.Internal.Tests
} }
[Test] [Test]
public void ClientStreaming_WriteCompletionFailure() public void ClientStreaming_WriteFailureThrowsRpcException()
{ {
var resultTask = asyncCall.ClientStreamingCallAsync(); var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall); var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1"); var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false); fakeCall.SendCompletionHandler(false);
// TODO: maybe IOException or waiting for RPCException is more appropriate here.
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask); // The write will wait for call to finish to receive the status code.
Assert.IsFalse(writeTask.IsCompleted);
fakeCall.UnaryResponseClientHandler(true, fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Internal), CreateClientSideStatus(StatusCode.Internal),
null, null,
new Metadata()); new Metadata());
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
}
[Test]
public void ClientStreaming_WriteFailureThrowsRpcException2()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Internal),
null,
new Metadata());
fakeCall.SendCompletionHandler(false);
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
}
[Test]
public void ClientStreaming_WriteFailureThrowsRpcException3()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
// Until the delayed write completion has been triggered,
// we still act as if there was an active write.
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request2"));
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Internal),
null,
new Metadata());
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
// Following attempts to write keep delivering the same status
var ex2 = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("after call has finished"));
Assert.AreEqual(StatusCode.Internal, ex2.Status.StatusCode);
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
} }
@ -415,6 +468,49 @@ namespace Grpc.Core.Internal.Tests
Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync()); Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync());
} }
[Test]
public void DuplexStreaming_WriteFailureThrowsRpcException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
// The write will wait for call to finish to receive the status code.
Assert.IsFalse(writeTask.IsCompleted);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied));
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);
AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.PermissionDenied);
}
[Test]
public void DuplexStreaming_WriteFailureThrowsRpcException2()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied));
fakeCall.SendCompletionHandler(false);
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);
AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.PermissionDenied);
}
[Test] [Test]
public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException() public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
{ {

@ -341,6 +341,11 @@ namespace Grpc.Core.Internal
get { return true; } get { return true; }
} }
protected override Exception GetRpcExceptionClientOnly()
{
return new RpcException(finishedStatus.Value.Status);
}
protected override Task CheckSendAllowedOrEarlyResult() protected override Task CheckSendAllowedOrEarlyResult()
{ {
var earlyResult = CheckSendPreconditionsClientSide(); var earlyResult = CheckSendPreconditionsClientSide();
@ -452,6 +457,7 @@ namespace Grpc.Core.Internal
using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse")) using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse"))
{ {
TaskCompletionSource<object> delayedStreamingWriteTcs = null;
TResponse msg = default(TResponse); TResponse msg = default(TResponse);
var deserializeException = TryDeserialize(receivedMessage, out msg); var deserializeException = TryDeserialize(receivedMessage, out msg);
@ -465,13 +471,23 @@ namespace Grpc.Core.Internal
} }
finishedStatus = receivedStatus; finishedStatus = receivedStatus;
if (isStreamingWriteCompletionDelayed)
{
delayedStreamingWriteTcs = streamingWriteTcs;
streamingWriteTcs = null;
}
ReleaseResourcesIfPossible(); ReleaseResourcesIfPossible();
} }
responseHeadersTcs.SetResult(responseHeaders); responseHeadersTcs.SetResult(responseHeaders);
var status = receivedStatus.Status; if (delayedStreamingWriteTcs != null)
{
delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
}
var status = receivedStatus.Status;
if (status.StatusCode != StatusCode.OK) if (status.StatusCode != StatusCode.OK)
{ {
unaryResponseTcs.SetException(new RpcException(status)); unaryResponseTcs.SetException(new RpcException(status));
@ -490,16 +506,27 @@ namespace Grpc.Core.Internal
// NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT, // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
// success will be always set to true. // success will be always set to true.
TaskCompletionSource<object> delayedStreamingWriteTcs = null;
lock (myLock) lock (myLock)
{ {
finished = true; finished = true;
finishedStatus = receivedStatus; finishedStatus = receivedStatus;
if (isStreamingWriteCompletionDelayed)
{
delayedStreamingWriteTcs = streamingWriteTcs;
streamingWriteTcs = null;
}
ReleaseResourcesIfPossible(); ReleaseResourcesIfPossible();
} }
var status = receivedStatus.Status; if (delayedStreamingWriteTcs != null)
{
delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
}
var status = receivedStatus.Status;
if (status.StatusCode != StatusCode.OK) if (status.StatusCode != StatusCode.OK)
{ {
streamingCallFinishedTcs.SetException(new RpcException(status)); streamingCallFinishedTcs.SetException(new RpcException(status));

@ -69,6 +69,7 @@ namespace Grpc.Core.Internal
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null. protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null. protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null.
protected TaskCompletionSource<object> sendStatusFromServerTcs; protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool isStreamingWriteCompletionDelayed; // Only used for the client side.
protected bool readingDone; // True if last read (i.e. read with null payload) was already received. protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
protected bool halfcloseRequested; // True if send close have been initiated. protected bool halfcloseRequested; // True if send close have been initiated.
@ -200,6 +201,12 @@ namespace Grpc.Core.Internal
get; get;
} }
/// <summary>
/// Returns an exception to throw for a failed send operation.
/// It is only allowed to call this method for a call that has already finished.
/// </summary>
protected abstract Exception GetRpcExceptionClientOnly();
private void ReleaseResources() private void ReleaseResources()
{ {
if (call != null) if (call != null)
@ -252,18 +259,43 @@ namespace Grpc.Core.Internal
/// </summary> /// </summary>
protected void HandleSendFinished(bool success) protected void HandleSendFinished(bool success)
{ {
bool delayCompletion = false;
TaskCompletionSource<object> origTcs = null; TaskCompletionSource<object> origTcs = null;
lock (myLock) lock (myLock)
{ {
origTcs = streamingWriteTcs; if (!success && !finished && IsClient) {
streamingWriteTcs = null; // We should be setting this only once per call, following writes will be short circuited
// because they cannot start until the entire call finishes.
GrpcPreconditions.CheckState(!isStreamingWriteCompletionDelayed);
// leave streamingWriteTcs set, it will be completed once call finished.
isStreamingWriteCompletionDelayed = true;
delayCompletion = true;
}
else
{
origTcs = streamingWriteTcs;
streamingWriteTcs = null;
}
ReleaseResourcesIfPossible(); ReleaseResourcesIfPossible();
} }
if (!success) if (!success)
{ {
origTcs.SetException(new InvalidOperationException("Send failed")); if (!delayCompletion)
{
if (IsClient)
{
GrpcPreconditions.CheckState(finished); // implied by !success && !delayCompletion && IsClient
origTcs.SetException(GetRpcExceptionClientOnly());
}
else
{
origTcs.SetException (new IOException("Error sending from server."));
}
}
// if delayCompletion == true, postpone SetException until call finishes.
} }
else else
{ {
@ -283,7 +315,7 @@ namespace Grpc.Core.Internal
if (!success) if (!success)
{ {
sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server.")); sendStatusFromServerTcs.SetException(new IOException("Error sending status from server."));
} }
else else
{ {

@ -33,6 +33,7 @@
using System; using System;
using System.Diagnostics; using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Runtime.InteropServices; using System.Runtime.InteropServices;
using System.Threading; using System.Threading;
@ -193,6 +194,11 @@ namespace Grpc.Core.Internal
get { return false; } get { return false; }
} }
protected override Exception GetRpcExceptionClientOnly()
{
throw new InvalidOperationException("Call be only called for client calls");
}
protected override void OnAfterReleaseResources() protected override void OnAfterReleaseResources()
{ {
server.RemoveCallReference(this); server.RemoveCallReference(this);

Loading…
Cancel
Save