Merge pull request #6416 from jtattermusch/csharp_api_polishing

Add C# wrapping tests and fix some issues with the API.
pull/6453/head
Jan Tattermusch 9 years ago
commit 05146ace34
  1. 63
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  2. 2
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  3. 203
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
  4. 441
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
  5. 183
      src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
  6. 5
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  7. 54
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  8. 34
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  9. 4
      src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
  10. 12
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  11. 4
      src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
  12. 7
      src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
  13. 1
      src/csharp/tests.json

@ -166,6 +166,37 @@ namespace Grpc.Core.Tests
Assert.IsNotNull("xyz", call.GetTrailers()[0].Key);
}
[Test]
public async Task ServerStreamingCall_EndOfStreamIsIdempotent()
{
helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
{
});
var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
Assert.IsFalse(await call.ResponseStream.MoveNext());
Assert.IsFalse(await call.ResponseStream.MoveNext());
}
[Test]
public async Task ServerStreamingCall_ErrorCanBeAwaitedTwice()
{
helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
{
context.Status = new Status(StatusCode.InvalidArgument, "");
});
var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
Assert.AreEqual(StatusCode.InvalidArgument, ex.Status.StatusCode);
// attempting MoveNext again should result in throwing the same exception.
var ex2 = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
Assert.AreEqual(StatusCode.InvalidArgument, ex2.Status.StatusCode);
}
[Test]
public async Task DuplexStreamingCall()
{
@ -208,6 +239,38 @@ namespace Grpc.Core.Tests
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
[Test]
public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull()
{
var handlerStartedBarrier = new TaskCompletionSource<object>();
var cancelNotificationReceivedBarrier = new TaskCompletionSource<object>();
var successTcs = new TaskCompletionSource<string>();
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
handlerStartedBarrier.SetResult(null);
// wait for cancellation to be delivered.
context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null));
await cancelNotificationReceivedBarrier.Task;
var moveNextResult = await requestStream.MoveNext();
successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL");
return "";
});
var cts = new CancellationTokenSource();
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
await handlerStartedBarrier.Task;
cts.Cancel();
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync);
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
Assert.AreEqual("SUCCESS", await successTcs.Task);
}
[Test]
public async Task AsyncUnaryCall_EchoMetadata()
{

@ -84,6 +84,8 @@
<Compile Include="SanityTest.cs" />
<Compile Include="HalfcloseTest.cs" />
<Compile Include="NUnitMain.cs" />
<Compile Include="Internal\FakeNativeCall.cs" />
<Compile Include="Internal\AsyncCallServerTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -0,0 +1,203 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
/// <summary>
/// Uses fake native call to test interaction of <c>AsyncCallServer</c> wrapping code with C core in different situations.
/// </summary>
public class AsyncCallServerTest
{
Server server;
FakeNativeCall fakeCall;
AsyncCallServer<string, string> asyncCallServer;
[SetUp]
public void Init()
{
var environment = GrpcEnvironment.AddRef();
// Create a fake server just so we have an instance to refer to.
// The server won't actually be used at all.
server = new Server()
{
Ports = { { "localhost", 0, ServerCredentials.Insecure } }
};
server.Start();
fakeCall = new FakeNativeCall();
asyncCallServer = new AsyncCallServer<string, string>(
Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer,
environment,
server);
asyncCallServer.InitializeForTesting(fakeCall);
}
[TearDown]
public void Cleanup()
{
server.ShutdownAsync().Wait();
GrpcEnvironment.Release();
}
[Test]
public void CancelNotificationAfterStartDisposes()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
[Test]
public void ReadAfterCancelNotificationCanSucceed()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
// Check that startin a read after cancel notification has been processed is legal.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
[Test]
public void ReadCompletionFailureClosesRequestStream()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
// if a read completion's success==false, the request stream will silently finish
// and we rely on C core cancelling the call.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(false, null);
Assert.IsFalse(moveNextTask.Result);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
[Test]
public void WriteAfterCancelNotificationFails()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
// TODO(jtattermusch): should we throw a different exception type instead?
Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1"));
// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
[Test]
public void WriteCompletionFailureThrows()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
var writeTask = responseStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
// TODO(jtattermusch): should we throw a different exception type instead?
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
[Test]
public void WriteAndWriteStatusCanRunConcurrently()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
var writeTask = responseStream.WriteAsync("request1");
var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata());
fakeCall.SendCompletionHandler(true);
fakeCall.SendStatusFromServerHandler(true);
Assert.DoesNotThrowAsync(async () => await writeTask);
Assert.DoesNotThrowAsync(async () => await writeStatusTask);
// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
static void AssertFinished(AsyncCallServer<string, string> asyncCallServer, FakeNativeCall fakeCall, Task finishedTask)
{
Assert.IsTrue(fakeCall.IsDisposed);
Assert.IsTrue(finishedTask.IsCompleted);
Assert.DoesNotThrow(() => finishedTask.Wait());
}
}
}

@ -32,6 +32,7 @@
#endregion
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
@ -40,6 +41,9 @@ using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
/// <summary>
/// Uses fake native call to test interaction of <c>AsyncCall</c> wrapping code with C core in different situations.
/// </summary>
public class AsyncCallTest
{
Channel channel;
@ -75,8 +79,8 @@ namespace Grpc.Core.Internal.Tests
public void AsyncUnary_StreamingOperationsNotAllowed()
{
asyncCall.UnaryCallAsync("request1");
Assert.Throws(typeof(InvalidOperationException),
() => asyncCall.StartReadMessage((x,y) => {}));
Assert.ThrowsAsync(typeof(InvalidOperationException),
async () => await asyncCall.ReadMessageAsync());
Assert.Throws(typeof(InvalidOperationException),
() => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
}
@ -118,6 +122,14 @@ namespace Grpc.Core.Internal.Tests
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
}
[Test]
public void ClientStreaming_StreamingReadNotAllowed()
{
asyncCall.ClientStreamingCallAsync();
Assert.ThrowsAsync(typeof(InvalidOperationException),
async () => await asyncCall.ReadMessageAsync());
}
[Test]
public void ClientStreaming_NoRequest_Success()
{
@ -142,6 +154,283 @@ namespace Grpc.Core.Internal.Tests
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
}
[Test]
public void ClientStreaming_MoreRequests_Success()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(true);
writeTask.Wait();
var writeTask2 = requestStream.WriteAsync("request2");
fakeCall.SendCompletionHandler(true);
writeTask2.Wait();
var completeTask = requestStream.CompleteAsync();
fakeCall.SendCompletionHandler(true);
completeTask.Wait();
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
}
[Test]
public void ClientStreaming_WriteFailure()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Internal),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
}
[Test]
public void ClientStreaming_WriteAfterReceivingStatusFails()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
}
[Test]
public void ClientStreaming_CompleteAfterReceivingStatusSucceeds()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync());
}
[Test]
public void ClientStreaming_WriteAfterCancellationRequestFails()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Cancelled),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled);
}
[Test]
public void ServerStreaming_StreamingSendNotAllowed()
{
asyncCall.StartServerStreamingCall("request1");
Assert.Throws(typeof(InvalidOperationException),
() => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
}
[Test]
public void ServerStreaming_NoResponse_Success1()
{
asyncCall.StartServerStreamingCall("request1");
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedResponseHeadersHandler(true, new Metadata());
Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
[Test]
public void ServerStreaming_NoResponse_Success2()
{
asyncCall.StartServerStreamingCall("request1");
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
// try alternative order of completions
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageHandler(true, null);
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
[Test]
public void ServerStreaming_NoResponse_ReadFailure()
{
asyncCall.StartServerStreamingCall("request1");
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(false, null); // after a failed read, we rely on C core to deliver appropriate status code.
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Internal));
AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal);
}
[Test]
public void ServerStreaming_MoreResponses_Success()
{
asyncCall.StartServerStreamingCall("request1");
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask1 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
Assert.IsTrue(readTask1.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
Assert.IsTrue(readTask2.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask3 = responseStream.MoveNext();
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageHandler(true, null);
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3);
}
[Test]
public void DuplexStreaming_NoRequestNoResponse_Success()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var writeTask1 = requestStream.CompleteAsync();
fakeCall.SendCompletionHandler(true);
Assert.DoesNotThrowAsync(async () => await writeTask1);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
[Test]
public void DuplexStreaming_WriteAfterReceivingStatusFails()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await requestStream.WriteAsync("request1"));
}
[Test]
public void DuplexStreaming_CompleteAfterReceivingStatusFails()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync());
}
[Test]
public void DuplexStreaming_WriteAfterCancellationRequestFails()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var responseStream = new ClientResponseStream<string, string>(asyncCall);
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled);
}
[Test]
public void DuplexStreaming_ReadAfterCancellationRequestCanSucceed()
{
asyncCall.StartDuplexStreamingCall();
var responseStream = new ClientResponseStream<string, string>(asyncCall);
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
var readTask1 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
Assert.IsTrue(readTask1.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
}
[Test]
public void DuplexStreaming_ReadStartedBeforeCancellationRequestCanSucceed()
{
asyncCall.StartDuplexStreamingCall();
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask1 = responseStream.MoveNext(); // initiate the read before cancel request
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
Assert.IsTrue(readTask1.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
}
ClientSideStatus CreateClientSideStatus(StatusCode statusCode)
{
return new ClientSideStatus(new Status(statusCode, ""), new Metadata());
@ -163,6 +452,16 @@ namespace Grpc.Core.Internal.Tests
Assert.AreEqual("response1", resultTask.Result);
}
static void AssertStreamingResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask)
{
Assert.IsTrue(moveNextTask.IsCompleted);
Assert.IsTrue(fakeCall.IsDisposed);
Assert.IsFalse(moveNextTask.Result);
Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus());
Assert.AreEqual(0, asyncCall.GetTrailers().Count);
}
static void AssertUnaryResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask, StatusCode expectedStatusCode)
{
Assert.IsTrue(resultTask.IsCompleted);
@ -175,135 +474,15 @@ namespace Grpc.Core.Internal.Tests
Assert.AreEqual(0, asyncCall.GetTrailers().Count);
}
internal class FakeNativeCall : INativeCall
{
public UnaryResponseClientHandler UnaryResponseClientHandler
{
get;
set;
}
public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler
{
get;
set;
}
public ReceivedMessageHandler ReceivedMessageHandler
{
get;
set;
}
public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
{
get;
set;
}
public SendCompletionHandler SendCompletionHandler
{
get;
set;
}
public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler
{
get;
set;
}
public bool IsCancelled
{
get;
set;
}
public bool IsDisposed
{
get;
set;
}
public void Cancel()
{
IsCancelled = true;
}
public void CancelWithStatus(Status status)
{
IsCancelled = true;
}
public string GetPeer()
{
return "PEER";
}
public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
UnaryResponseClientHandler = callback;
}
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
throw new NotImplementedException();
}
public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
{
UnaryResponseClientHandler = callback;
}
public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
ReceivedStatusOnClientHandler = callback;
}
public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
{
ReceivedStatusOnClientHandler = callback;
}
public void StartReceiveMessage(ReceivedMessageHandler callback)
{
ReceivedMessageHandler = callback;
}
public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
{
ReceivedResponseHeadersHandler = callback;
}
public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
{
SendCompletionHandler = callback;
}
public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
SendCompletionHandler = callback;
}
public void StartSendCloseFromClient(SendCompletionHandler callback)
{
SendCompletionHandler = callback;
}
public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
SendCompletionHandler = callback;
}
public void StartServerSide(ReceivedCloseOnServerHandler callback)
{
ReceivedCloseOnServerHandler = callback;
}
public void Dispose()
{
IsDisposed = true;
}
static void AssertStreamingResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask, StatusCode expectedStatusCode)
{
Assert.IsTrue(moveNextTask.IsCompleted);
Assert.IsTrue(fakeCall.IsDisposed);
var ex = Assert.ThrowsAsync<RpcException>(async () => await moveNextTask);
Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode);
Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode);
Assert.AreEqual(0, asyncCall.GetTrailers().Count);
}
}
}

@ -0,0 +1,183 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
/// <summary>
/// For testing purposes.
/// </summary>
internal class FakeNativeCall : INativeCall
{
public UnaryResponseClientHandler UnaryResponseClientHandler
{
get;
set;
}
public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler
{
get;
set;
}
public ReceivedMessageHandler ReceivedMessageHandler
{
get;
set;
}
public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
{
get;
set;
}
public SendCompletionHandler SendCompletionHandler
{
get;
set;
}
public SendCompletionHandler SendStatusFromServerHandler
{
get;
set;
}
public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler
{
get;
set;
}
public bool IsCancelled
{
get;
set;
}
public bool IsDisposed
{
get;
set;
}
public void Cancel()
{
IsCancelled = true;
}
public void CancelWithStatus(Status status)
{
IsCancelled = true;
}
public string GetPeer()
{
return "PEER";
}
public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
UnaryResponseClientHandler = callback;
}
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
throw new NotImplementedException();
}
public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
{
UnaryResponseClientHandler = callback;
}
public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
ReceivedStatusOnClientHandler = callback;
}
public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
{
ReceivedStatusOnClientHandler = callback;
}
public void StartReceiveMessage(ReceivedMessageHandler callback)
{
ReceivedMessageHandler = callback;
}
public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
{
ReceivedResponseHeadersHandler = callback;
}
public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
{
SendCompletionHandler = callback;
}
public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
SendCompletionHandler = callback;
}
public void StartSendCloseFromClient(SendCompletionHandler callback)
{
SendCompletionHandler = callback;
}
public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
SendStatusFromServerHandler = callback;
}
public void StartServerSide(ReceivedCloseOnServerHandler callback)
{
ReceivedCloseOnServerHandler = callback;
}
public void Dispose()
{
IsDisposed = true;
}
}
}

@ -241,11 +241,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Receives a streaming response. Only one pending read action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartReadMessage(AsyncCompletionDelegate<TResponse> completionDelegate)
public Task<TResponse> ReadMessageAsync()
{
StartReadMessageInternal(completionDelegate);
return ReadMessageInternalAsync();
}
/// <summary>

@ -68,7 +68,8 @@ namespace Grpc.Core.Internal
protected bool cancelRequested;
protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
protected bool halfcloseRequested; // True if send close have been initiated.
@ -150,15 +151,25 @@ namespace Grpc.Core.Internal
/// Initiates reading a message. Only one read operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
protected void StartReadMessageInternal(AsyncCompletionDelegate<TRead> completionDelegate)
protected Task<TRead> ReadMessageInternalAsync()
{
lock (myLock)
{
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckReadingAllowed();
if (readingDone)
{
// the last read that returns null or throws an exception is idempotent
// and maintain its state.
GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
return streamingReadTcs.Task;
}
GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time");
GrpcPreconditions.CheckState(!disposed);
call.StartReceiveMessage(HandleReadFinished);
readCompletionDelegate = completionDelegate;
streamingReadTcs = new TaskCompletionSource<TRead>();
return streamingReadTcs.Task;
}
}
@ -216,10 +227,6 @@ namespace Grpc.Core.Internal
protected virtual void CheckReadingAllowed()
{
GrpcPreconditions.CheckState(started);
GrpcPreconditions.CheckState(!disposed);
GrpcPreconditions.CheckState(!readingDone, "Stream has already been closed.");
GrpcPreconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time");
}
protected void CheckNotCancelled()
@ -322,22 +329,18 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleSendStatusFromServerFinished(bool success)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
{
origCompletionDelegate = sendCompletionDelegate;
sendCompletionDelegate = null;
ReleaseResourcesIfPossible();
}
if (!success)
{
FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Error sending status from server."));
sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server."));
}
else
{
FireCompletion(origCompletionDelegate, null, null);
sendStatusFromServerTcs.SetResult(null);
}
}
@ -346,15 +349,17 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleReadFinished(bool success, byte[] receivedMessage)
{
// if success == false, received message will be null. It that case we will
// treat this completion as the last read an rely on C core to handle the failed
// read (e.g. deliver approriate statusCode on the clientside).
TRead msg = default(TRead);
var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null;
AsyncCompletionDelegate<TRead> origCompletionDelegate = null;
TaskCompletionSource<TRead> origTcs = null;
lock (myLock)
{
origCompletionDelegate = readCompletionDelegate;
readCompletionDelegate = null;
origTcs = streamingReadTcs;
if (receivedMessage == null)
{
// This was the last read.
@ -364,20 +369,25 @@ namespace Grpc.Core.Internal
if (deserializeException != null && IsClient)
{
readingDone = true;
// TODO(jtattermusch): it might be too late to set the status
CancelWithStatus(DeserializeResponseFailureStatus);
}
if (!readingDone)
{
streamingReadTcs = null;
}
ReleaseResourcesIfPossible();
}
// TODO: handle the case when success==false
if (deserializeException != null && !IsClient)
{
FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException));
origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException));
return;
}
FireCompletion(origCompletionDelegate, msg, null);
origTcs.SetResult(msg);
}
}
}

@ -64,6 +64,15 @@ namespace Grpc.Core.Internal
InitializeInternal(call);
}
/// <summary>
/// Only for testing purposes.
/// </summary>
public void InitializeForTesting(INativeCall call)
{
server.AddCallReference(this);
InitializeInternal(call);
}
/// <summary>
/// Starts a server side call.
/// </summary>
@ -91,11 +100,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Receives a streaming request. Only one pending read action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartReadMessage(AsyncCompletionDelegate<TRequest> completionDelegate)
public Task<TRequest> ReadMessageAsync()
{
StartReadMessageInternal(completionDelegate);
return ReadMessageInternalAsync();
}
/// <summary>
@ -128,24 +136,25 @@ namespace Grpc.Core.Internal
}
/// <summary>
/// Sends call result status, also indicating server is done with streaming responses.
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// Sends call result status, indicating we are done with writes.
/// Sending a status different from StatusCode.OK will also implicitly cancel the call.
/// </summary>
public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate)
public Task SendStatusFromServerAsync(Status status, Metadata trailers)
{
lock (myLock)
{
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed(allowFinished: false);
GrpcPreconditions.CheckState(started);
GrpcPreconditions.CheckState(!disposed);
GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once.");
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent);
}
halfcloseRequested = true;
readingDone = true;
sendCompletionDelegate = completionDelegate;
initialMetadataSent = true;
sendStatusFromServerTcs = new TaskCompletionSource<object>();
return sendStatusFromServerTcs.Task;
}
}
@ -190,12 +199,13 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleFinishedServerside(bool success, bool cancelled)
{
// NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER,
// success will be always set to true.
lock (myLock)
{
finished = true;
ReleaseResourcesIfPossible();
}
// TODO(jtattermusch): handle error
if (cancelled)
{

@ -68,9 +68,7 @@ namespace Grpc.Core.Internal
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
var taskSource = new AsyncCompletionTaskSource<TResponse>();
call.StartReadMessage(taskSource.CompletionDelegate);
var result = await taskSource.Task.ConfigureAwait(false);
var result = await call.ReadMessageAsync().ConfigureAwait(false);
this.current = result;
if (result == null)

@ -93,7 +93,7 @@ namespace Grpc.Core.Internal
}
try
{
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@ -149,7 +149,7 @@ namespace Grpc.Core.Internal
try
{
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@ -209,7 +209,7 @@ namespace Grpc.Core.Internal
try
{
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@ -260,7 +260,7 @@ namespace Grpc.Core.Internal
}
try
{
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@ -282,9 +282,7 @@ namespace Grpc.Core.Internal
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);
}
}

@ -68,9 +68,7 @@ namespace Grpc.Core.Internal
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
var taskSource = new AsyncCompletionTaskSource<TRequest>();
call.StartReadMessage(taskSource.CompletionDelegate);
var result = await taskSource.Task.ConfigureAwait(false);
var result = await call.ReadMessageAsync().ConfigureAwait(false);
this.current = result;
return result != null;
}

@ -57,13 +57,6 @@ namespace Grpc.Core.Internal
return taskSource.Task;
}
public Task WriteStatusAsync(Status status, Metadata trailers)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
return taskSource.Task;
}
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
var taskSource = new AsyncCompletionTaskSource<object>();

@ -1,5 +1,6 @@
{
"Grpc.Core.Tests": [
"Grpc.Core.Internal.Tests.AsyncCallServerTest",
"Grpc.Core.Internal.Tests.AsyncCallTest",
"Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest",
"Grpc.Core.Internal.Tests.CompletionQueueEventTest",

Loading…
Cancel
Save