diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index 6c13a4fa483..d92addbf548 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -166,6 +166,37 @@ namespace Grpc.Core.Tests Assert.IsNotNull("xyz", call.GetTrailers()[0].Key); } + [Test] + public async Task ServerStreamingCall_EndOfStreamIsIdempotent() + { + helper.ServerStreamingHandler = new ServerStreamingServerMethod(async (request, responseStream, context) => + { + }); + + var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), ""); + + Assert.IsFalse(await call.ResponseStream.MoveNext()); + Assert.IsFalse(await call.ResponseStream.MoveNext()); + } + + [Test] + public async Task ServerStreamingCall_ErrorCanBeAwaitedTwice() + { + helper.ServerStreamingHandler = new ServerStreamingServerMethod(async (request, responseStream, context) => + { + context.Status = new Status(StatusCode.InvalidArgument, ""); + }); + + var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), ""); + + var ex = Assert.ThrowsAsync(async () => await call.ResponseStream.MoveNext()); + Assert.AreEqual(StatusCode.InvalidArgument, ex.Status.StatusCode); + + // attempting MoveNext again should result in throwing the same exception. + var ex2 = Assert.ThrowsAsync(async () => await call.ResponseStream.MoveNext()); + Assert.AreEqual(StatusCode.InvalidArgument, ex2.Status.StatusCode); + } + [Test] public async Task DuplexStreamingCall() { @@ -208,6 +239,38 @@ namespace Grpc.Core.Tests Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); } + [Test] + public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull() + { + var handlerStartedBarrier = new TaskCompletionSource(); + var cancelNotificationReceivedBarrier = new TaskCompletionSource(); + var successTcs = new TaskCompletionSource(); + + helper.ClientStreamingHandler = new ClientStreamingServerMethod(async (requestStream, context) => + { + handlerStartedBarrier.SetResult(null); + + // wait for cancellation to be delivered. + context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null)); + await cancelNotificationReceivedBarrier.Task; + + var moveNextResult = await requestStream.MoveNext(); + successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL"); + return ""; + }); + + var cts = new CancellationTokenSource(); + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); + + await handlerStartedBarrier.Task; + cts.Cancel(); + + var ex = Assert.ThrowsAsync(async () => await call.ResponseAsync); + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + + Assert.AreEqual("SUCCESS", await successTcs.Task); + } + [Test] public async Task AsyncUnaryCall_EchoMetadata() { diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index 0cd059c2327..47131fc454e 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -84,6 +84,8 @@ + + diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs new file mode 100644 index 00000000000..0b6981f871d --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -0,0 +1,203 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using System.Threading.Tasks; + +using Grpc.Core.Internal; +using NUnit.Framework; + +namespace Grpc.Core.Internal.Tests +{ + /// + /// Uses fake native call to test interaction of AsyncCallServer wrapping code with C core in different situations. + /// + public class AsyncCallServerTest + { + Server server; + FakeNativeCall fakeCall; + AsyncCallServer asyncCallServer; + + [SetUp] + public void Init() + { + var environment = GrpcEnvironment.AddRef(); + + // Create a fake server just so we have an instance to refer to. + // The server won't actually be used at all. + server = new Server() + { + Ports = { { "localhost", 0, ServerCredentials.Insecure } } + }; + server.Start(); + + fakeCall = new FakeNativeCall(); + asyncCallServer = new AsyncCallServer( + Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer, + environment, + server); + asyncCallServer.InitializeForTesting(fakeCall); + } + + [TearDown] + public void Cleanup() + { + server.ShutdownAsync().Wait(); + GrpcEnvironment.Release(); + } + + [Test] + public void CancelNotificationAfterStartDisposes() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var requestStream = new ServerRequestStream(asyncCallServer); + + // Finishing requestStream is needed for dispose to happen. + var moveNextTask = requestStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + Assert.IsFalse(moveNextTask.Result); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + [Test] + public void ReadAfterCancelNotificationCanSucceed() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var requestStream = new ServerRequestStream(asyncCallServer); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + + // Check that startin a read after cancel notification has been processed is legal. + var moveNextTask = requestStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + Assert.IsFalse(moveNextTask.Result); + + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + [Test] + public void ReadCompletionFailureClosesRequestStream() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var requestStream = new ServerRequestStream(asyncCallServer); + + // if a read completion's success==false, the request stream will silently finish + // and we rely on C core cancelling the call. + var moveNextTask = requestStream.MoveNext(); + fakeCall.ReceivedMessageHandler(false, null); + Assert.IsFalse(moveNextTask.Result); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + [Test] + public void WriteAfterCancelNotificationFails() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var requestStream = new ServerRequestStream(asyncCallServer); + var responseStream = new ServerResponseStream(asyncCallServer); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + + // TODO(jtattermusch): should we throw a different exception type instead? + Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1")); + + // Finishing requestStream is needed for dispose to happen. + var moveNextTask = requestStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + Assert.IsFalse(moveNextTask.Result); + + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + [Test] + public void WriteCompletionFailureThrows() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var requestStream = new ServerRequestStream(asyncCallServer); + var responseStream = new ServerResponseStream(asyncCallServer); + + var writeTask = responseStream.WriteAsync("request1"); + fakeCall.SendCompletionHandler(false); + // TODO(jtattermusch): should we throw a different exception type instead? + Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask); + + // Finishing requestStream is needed for dispose to happen. + var moveNextTask = requestStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + Assert.IsFalse(moveNextTask.Result); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + [Test] + public void WriteAndWriteStatusCanRunConcurrently() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var requestStream = new ServerRequestStream(asyncCallServer); + var responseStream = new ServerResponseStream(asyncCallServer); + + var writeTask = responseStream.WriteAsync("request1"); + var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata()); + + fakeCall.SendCompletionHandler(true); + fakeCall.SendStatusFromServerHandler(true); + + Assert.DoesNotThrowAsync(async () => await writeTask); + Assert.DoesNotThrowAsync(async () => await writeStatusTask); + + // Finishing requestStream is needed for dispose to happen. + var moveNextTask = requestStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + Assert.IsFalse(moveNextTask.Result); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + static void AssertFinished(AsyncCallServer asyncCallServer, FakeNativeCall fakeCall, Task finishedTask) + { + Assert.IsTrue(fakeCall.IsDisposed); + Assert.IsTrue(finishedTask.IsCompleted); + Assert.DoesNotThrow(() => finishedTask.Wait()); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index a678e4dafe1..abe9d4a2e62 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Collections.Generic; using System.Runtime.InteropServices; using System.Threading.Tasks; @@ -40,6 +41,9 @@ using NUnit.Framework; namespace Grpc.Core.Internal.Tests { + /// + /// Uses fake native call to test interaction of AsyncCall wrapping code with C core in different situations. + /// public class AsyncCallTest { Channel channel; @@ -75,8 +79,8 @@ namespace Grpc.Core.Internal.Tests public void AsyncUnary_StreamingOperationsNotAllowed() { asyncCall.UnaryCallAsync("request1"); - Assert.Throws(typeof(InvalidOperationException), - () => asyncCall.StartReadMessage((x,y) => {})); + Assert.ThrowsAsync(typeof(InvalidOperationException), + async () => await asyncCall.ReadMessageAsync()); Assert.Throws(typeof(InvalidOperationException), () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {})); } @@ -118,6 +122,14 @@ namespace Grpc.Core.Internal.Tests AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); } + [Test] + public void ClientStreaming_StreamingReadNotAllowed() + { + asyncCall.ClientStreamingCallAsync(); + Assert.ThrowsAsync(typeof(InvalidOperationException), + async () => await asyncCall.ReadMessageAsync()); + } + [Test] public void ClientStreaming_NoRequest_Success() { @@ -142,6 +154,283 @@ namespace Grpc.Core.Internal.Tests AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument); } + [Test] + public void ClientStreaming_MoreRequests_Success() + { + var resultTask = asyncCall.ClientStreamingCallAsync(); + var requestStream = new ClientRequestStream(asyncCall); + + var writeTask = requestStream.WriteAsync("request1"); + fakeCall.SendCompletionHandler(true); + writeTask.Wait(); + + var writeTask2 = requestStream.WriteAsync("request2"); + fakeCall.SendCompletionHandler(true); + writeTask2.Wait(); + + var completeTask = requestStream.CompleteAsync(); + fakeCall.SendCompletionHandler(true); + completeTask.Wait(); + + fakeCall.UnaryResponseClientHandler(true, + new ClientSideStatus(Status.DefaultSuccess, new Metadata()), + CreateResponsePayload(), + new Metadata()); + + AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); + } + + [Test] + public void ClientStreaming_WriteFailure() + { + var resultTask = asyncCall.ClientStreamingCallAsync(); + var requestStream = new ClientRequestStream(asyncCall); + + var writeTask = requestStream.WriteAsync("request1"); + fakeCall.SendCompletionHandler(false); + Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask); + + fakeCall.UnaryResponseClientHandler(true, + CreateClientSideStatus(StatusCode.Internal), + CreateResponsePayload(), + new Metadata()); + + AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); + } + + [Test] + public void ClientStreaming_WriteAfterReceivingStatusFails() + { + var resultTask = asyncCall.ClientStreamingCallAsync(); + var requestStream = new ClientRequestStream(asyncCall); + + fakeCall.UnaryResponseClientHandler(true, + new ClientSideStatus(Status.DefaultSuccess, new Metadata()), + CreateResponsePayload(), + new Metadata()); + + AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); + Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1")); + } + + [Test] + public void ClientStreaming_CompleteAfterReceivingStatusSucceeds() + { + var resultTask = asyncCall.ClientStreamingCallAsync(); + var requestStream = new ClientRequestStream(asyncCall); + + fakeCall.UnaryResponseClientHandler(true, + new ClientSideStatus(Status.DefaultSuccess, new Metadata()), + CreateResponsePayload(), + new Metadata()); + + AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); + Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync()); + } + + [Test] + public void ClientStreaming_WriteAfterCancellationRequestFails() + { + var resultTask = asyncCall.ClientStreamingCallAsync(); + var requestStream = new ClientRequestStream(asyncCall); + + asyncCall.Cancel(); + Assert.IsTrue(fakeCall.IsCancelled); + + Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1")); + + fakeCall.UnaryResponseClientHandler(true, + CreateClientSideStatus(StatusCode.Cancelled), + CreateResponsePayload(), + new Metadata()); + + AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled); + } + + [Test] + public void ServerStreaming_StreamingSendNotAllowed() + { + asyncCall.StartServerStreamingCall("request1"); + Assert.Throws(typeof(InvalidOperationException), + () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {})); + } + + [Test] + public void ServerStreaming_NoResponse_Success1() + { + asyncCall.StartServerStreamingCall("request1"); + var responseStream = new ClientResponseStream(asyncCall); + var readTask = responseStream.MoveNext(); + + fakeCall.ReceivedResponseHeadersHandler(true, new Metadata()); + Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count); + + fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + + AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); + } + + [Test] + public void ServerStreaming_NoResponse_Success2() + { + asyncCall.StartServerStreamingCall("request1"); + var responseStream = new ClientResponseStream(asyncCall); + var readTask = responseStream.MoveNext(); + + // try alternative order of completions + fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageHandler(true, null); + + AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); + } + + [Test] + public void ServerStreaming_NoResponse_ReadFailure() + { + asyncCall.StartServerStreamingCall("request1"); + var responseStream = new ClientResponseStream(asyncCall); + var readTask = responseStream.MoveNext(); + + fakeCall.ReceivedMessageHandler(false, null); // after a failed read, we rely on C core to deliver appropriate status code. + fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Internal)); + + AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal); + } + + [Test] + public void ServerStreaming_MoreResponses_Success() + { + asyncCall.StartServerStreamingCall("request1"); + var responseStream = new ClientResponseStream(asyncCall); + + var readTask1 = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + Assert.IsTrue(readTask1.Result); + Assert.AreEqual("response1", responseStream.Current); + + var readTask2 = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + Assert.IsTrue(readTask2.Result); + Assert.AreEqual("response1", responseStream.Current); + + var readTask3 = responseStream.MoveNext(); + fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + fakeCall.ReceivedMessageHandler(true, null); + + AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3); + } + + [Test] + public void DuplexStreaming_NoRequestNoResponse_Success() + { + asyncCall.StartDuplexStreamingCall(); + var requestStream = new ClientRequestStream(asyncCall); + var responseStream = new ClientResponseStream(asyncCall); + + var writeTask1 = requestStream.CompleteAsync(); + fakeCall.SendCompletionHandler(true); + Assert.DoesNotThrowAsync(async () => await writeTask1); + + var readTask = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + + AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); + } + + [Test] + public void DuplexStreaming_WriteAfterReceivingStatusFails() + { + asyncCall.StartDuplexStreamingCall(); + var requestStream = new ClientRequestStream(asyncCall); + var responseStream = new ClientResponseStream(asyncCall); + + var readTask = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + + AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); + + Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await requestStream.WriteAsync("request1")); + } + + [Test] + public void DuplexStreaming_CompleteAfterReceivingStatusFails() + { + asyncCall.StartDuplexStreamingCall(); + var requestStream = new ClientRequestStream(asyncCall); + var responseStream = new ClientResponseStream(asyncCall); + + var readTask = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata())); + + AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); + + Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync()); + } + + [Test] + public void DuplexStreaming_WriteAfterCancellationRequestFails() + { + asyncCall.StartDuplexStreamingCall(); + var requestStream = new ClientRequestStream(asyncCall); + var responseStream = new ClientResponseStream(asyncCall); + + asyncCall.Cancel(); + Assert.IsTrue(fakeCall.IsCancelled); + Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1")); + + var readTask = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled)); + + AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled); + } + + [Test] + public void DuplexStreaming_ReadAfterCancellationRequestCanSucceed() + { + asyncCall.StartDuplexStreamingCall(); + var responseStream = new ClientResponseStream(asyncCall); + + asyncCall.Cancel(); + Assert.IsTrue(fakeCall.IsCancelled); + + var readTask1 = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + Assert.IsTrue(readTask1.Result); + Assert.AreEqual("response1", responseStream.Current); + + var readTask2 = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled)); + + AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled); + } + + [Test] + public void DuplexStreaming_ReadStartedBeforeCancellationRequestCanSucceed() + { + asyncCall.StartDuplexStreamingCall(); + var responseStream = new ClientResponseStream(asyncCall); + + var readTask1 = responseStream.MoveNext(); // initiate the read before cancel request + asyncCall.Cancel(); + Assert.IsTrue(fakeCall.IsCancelled); + + fakeCall.ReceivedMessageHandler(true, CreateResponsePayload()); + Assert.IsTrue(readTask1.Result); + Assert.AreEqual("response1", responseStream.Current); + + var readTask2 = responseStream.MoveNext(); + fakeCall.ReceivedMessageHandler(true, null); + fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled)); + + AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled); + } + ClientSideStatus CreateClientSideStatus(StatusCode statusCode) { return new ClientSideStatus(new Status(statusCode, ""), new Metadata()); @@ -163,6 +452,16 @@ namespace Grpc.Core.Internal.Tests Assert.AreEqual("response1", resultTask.Result); } + static void AssertStreamingResponseSuccess(AsyncCall asyncCall, FakeNativeCall fakeCall, Task moveNextTask) + { + Assert.IsTrue(moveNextTask.IsCompleted); + Assert.IsTrue(fakeCall.IsDisposed); + + Assert.IsFalse(moveNextTask.Result); + Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus()); + Assert.AreEqual(0, asyncCall.GetTrailers().Count); + } + static void AssertUnaryResponseError(AsyncCall asyncCall, FakeNativeCall fakeCall, Task resultTask, StatusCode expectedStatusCode) { Assert.IsTrue(resultTask.IsCompleted); @@ -175,135 +474,15 @@ namespace Grpc.Core.Internal.Tests Assert.AreEqual(0, asyncCall.GetTrailers().Count); } - internal class FakeNativeCall : INativeCall - { - public UnaryResponseClientHandler UnaryResponseClientHandler - { - get; - set; - } - - public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler - { - get; - set; - } - - public ReceivedMessageHandler ReceivedMessageHandler - { - get; - set; - } - - public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler - { - get; - set; - } - - public SendCompletionHandler SendCompletionHandler - { - get; - set; - } - - public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler - { - get; - set; - } - - public bool IsCancelled - { - get; - set; - } - - public bool IsDisposed - { - get; - set; - } - - public void Cancel() - { - IsCancelled = true; - } - - public void CancelWithStatus(Status status) - { - IsCancelled = true; - } - - public string GetPeer() - { - return "PEER"; - } - - public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) - { - UnaryResponseClientHandler = callback; - } - - public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) - { - throw new NotImplementedException(); - } - - public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) - { - UnaryResponseClientHandler = callback; - } - - public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) - { - ReceivedStatusOnClientHandler = callback; - } - - public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray) - { - ReceivedStatusOnClientHandler = callback; - } - - public void StartReceiveMessage(ReceivedMessageHandler callback) - { - ReceivedMessageHandler = callback; - } - - public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) - { - ReceivedResponseHeadersHandler = callback; - } - - public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) - { - SendCompletionHandler = callback; - } - - public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) - { - SendCompletionHandler = callback; - } - - public void StartSendCloseFromClient(SendCompletionHandler callback) - { - SendCompletionHandler = callback; - } - - public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) - { - SendCompletionHandler = callback; - } - - public void StartServerSide(ReceivedCloseOnServerHandler callback) - { - ReceivedCloseOnServerHandler = callback; - } - - public void Dispose() - { - IsDisposed = true; - } + static void AssertStreamingResponseError(AsyncCall asyncCall, FakeNativeCall fakeCall, Task moveNextTask, StatusCode expectedStatusCode) + { + Assert.IsTrue(moveNextTask.IsCompleted); + Assert.IsTrue(fakeCall.IsDisposed); + + var ex = Assert.ThrowsAsync(async () => await moveNextTask); + Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode); + Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode); + Assert.AreEqual(0, asyncCall.GetTrailers().Count); } } } diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs new file mode 100644 index 00000000000..1bec258ca29 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs @@ -0,0 +1,183 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using System.Threading.Tasks; + +using Grpc.Core.Internal; +using NUnit.Framework; + +namespace Grpc.Core.Internal.Tests +{ + /// + /// For testing purposes. + /// + internal class FakeNativeCall : INativeCall + { + public UnaryResponseClientHandler UnaryResponseClientHandler + { + get; + set; + } + + public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler + { + get; + set; + } + + public ReceivedMessageHandler ReceivedMessageHandler + { + get; + set; + } + + public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler + { + get; + set; + } + + public SendCompletionHandler SendCompletionHandler + { + get; + set; + } + + public SendCompletionHandler SendStatusFromServerHandler + { + get; + set; + } + + public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler + { + get; + set; + } + + public bool IsCancelled + { + get; + set; + } + + public bool IsDisposed + { + get; + set; + } + + public void Cancel() + { + IsCancelled = true; + } + + public void CancelWithStatus(Status status) + { + IsCancelled = true; + } + + public string GetPeer() + { + return "PEER"; + } + + public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + { + UnaryResponseClientHandler = callback; + } + + public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + { + throw new NotImplementedException(); + } + + public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) + { + UnaryResponseClientHandler = callback; + } + + public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + { + ReceivedStatusOnClientHandler = callback; + } + + public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray) + { + ReceivedStatusOnClientHandler = callback; + } + + public void StartReceiveMessage(ReceivedMessageHandler callback) + { + ReceivedMessageHandler = callback; + } + + public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) + { + ReceivedResponseHeadersHandler = callback; + } + + public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) + { + SendCompletionHandler = callback; + } + + public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) + { + SendCompletionHandler = callback; + } + + public void StartSendCloseFromClient(SendCompletionHandler callback) + { + SendCompletionHandler = callback; + } + + public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) + { + SendStatusFromServerHandler = callback; + } + + public void StartServerSide(ReceivedCloseOnServerHandler callback) + { + ReceivedCloseOnServerHandler = callback; + } + + public void Dispose() + { + IsDisposed = true; + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 50ba617cdb1..f522174bd0f 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -241,11 +241,10 @@ namespace Grpc.Core.Internal /// /// Receives a streaming response. Only one pending read action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// - public void StartReadMessage(AsyncCompletionDelegate completionDelegate) + public Task ReadMessageAsync() { - StartReadMessageInternal(completionDelegate); + return ReadMessageInternalAsync(); } /// diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index ccd047f4695..18dbe87734b 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -68,7 +68,8 @@ namespace Grpc.Core.Internal protected bool cancelRequested; protected AsyncCompletionDelegate sendCompletionDelegate; // Completion of a pending send or sendclose if not null. - protected AsyncCompletionDelegate readCompletionDelegate; // Completion of a pending send or sendclose if not null. + protected TaskCompletionSource streamingReadTcs; // Completion of a pending streaming read if not null. + protected TaskCompletionSource sendStatusFromServerTcs; protected bool readingDone; // True if last read (i.e. read with null payload) was already received. protected bool halfcloseRequested; // True if send close have been initiated. @@ -150,15 +151,25 @@ namespace Grpc.Core.Internal /// Initiates reading a message. Only one read operation can be active at a time. /// completionDelegate is invoked upon completion. /// - protected void StartReadMessageInternal(AsyncCompletionDelegate completionDelegate) + protected Task ReadMessageInternalAsync() { lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckReadingAllowed(); + if (readingDone) + { + // the last read that returns null or throws an exception is idempotent + // and maintain its state. + GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads."); + return streamingReadTcs.Task; + } + + GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time"); + GrpcPreconditions.CheckState(!disposed); call.StartReceiveMessage(HandleReadFinished); - readCompletionDelegate = completionDelegate; + streamingReadTcs = new TaskCompletionSource(); + return streamingReadTcs.Task; } } @@ -216,10 +227,6 @@ namespace Grpc.Core.Internal protected virtual void CheckReadingAllowed() { GrpcPreconditions.CheckState(started); - GrpcPreconditions.CheckState(!disposed); - - GrpcPreconditions.CheckState(!readingDone, "Stream has already been closed."); - GrpcPreconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time"); } protected void CheckNotCancelled() @@ -322,22 +329,18 @@ namespace Grpc.Core.Internal /// protected void HandleSendStatusFromServerFinished(bool success) { - AsyncCompletionDelegate origCompletionDelegate = null; lock (myLock) { - origCompletionDelegate = sendCompletionDelegate; - sendCompletionDelegate = null; - ReleaseResourcesIfPossible(); } if (!success) { - FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Error sending status from server.")); + sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server.")); } else { - FireCompletion(origCompletionDelegate, null, null); + sendStatusFromServerTcs.SetResult(null); } } @@ -346,15 +349,17 @@ namespace Grpc.Core.Internal /// protected void HandleReadFinished(bool success, byte[] receivedMessage) { + // if success == false, received message will be null. It that case we will + // treat this completion as the last read an rely on C core to handle the failed + // read (e.g. deliver approriate statusCode on the clientside). + TRead msg = default(TRead); var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null; - AsyncCompletionDelegate origCompletionDelegate = null; + TaskCompletionSource origTcs = null; lock (myLock) { - origCompletionDelegate = readCompletionDelegate; - readCompletionDelegate = null; - + origTcs = streamingReadTcs; if (receivedMessage == null) { // This was the last read. @@ -364,20 +369,25 @@ namespace Grpc.Core.Internal if (deserializeException != null && IsClient) { readingDone = true; + + // TODO(jtattermusch): it might be too late to set the status CancelWithStatus(DeserializeResponseFailureStatus); } + if (!readingDone) + { + streamingReadTcs = null; + } + ReleaseResourcesIfPossible(); } - // TODO: handle the case when success==false - if (deserializeException != null && !IsClient) { - FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException)); + origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException)); return; } - FireCompletion(origCompletionDelegate, msg, null); + origTcs.SetResult(msg); } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index bea2b3660c3..44f2988e21e 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -64,6 +64,15 @@ namespace Grpc.Core.Internal InitializeInternal(call); } + /// + /// Only for testing purposes. + /// + public void InitializeForTesting(INativeCall call) + { + server.AddCallReference(this); + InitializeInternal(call); + } + /// /// Starts a server side call. /// @@ -91,11 +100,10 @@ namespace Grpc.Core.Internal /// /// Receives a streaming request. Only one pending read action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// - public void StartReadMessage(AsyncCompletionDelegate completionDelegate) + public Task ReadMessageAsync() { - StartReadMessageInternal(completionDelegate); + return ReadMessageInternalAsync(); } /// @@ -128,24 +136,25 @@ namespace Grpc.Core.Internal } /// - /// Sends call result status, also indicating server is done with streaming responses. - /// Only one pending send action is allowed at any given time. - /// completionDelegate is called when the operation finishes. + /// Sends call result status, indicating we are done with writes. + /// Sending a status different from StatusCode.OK will also implicitly cancel the call. /// - public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate completionDelegate) + public Task SendStatusFromServerAsync(Status status, Metadata trailers) { lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); - CheckSendingAllowed(allowFinished: false); + GrpcPreconditions.CheckState(started); + GrpcPreconditions.CheckState(!disposed); + GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once."); using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent); } halfcloseRequested = true; - readingDone = true; - sendCompletionDelegate = completionDelegate; + initialMetadataSent = true; + sendStatusFromServerTcs = new TaskCompletionSource(); + return sendStatusFromServerTcs.Task; } } @@ -190,12 +199,13 @@ namespace Grpc.Core.Internal /// private void HandleFinishedServerside(bool success, bool cancelled) { + // NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER, + // success will be always set to true. lock (myLock) { finished = true; ReleaseResourcesIfPossible(); } - // TODO(jtattermusch): handle error if (cancelled) { diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index d6e34a0f044..ad9423ff58c 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -68,9 +68,7 @@ namespace Grpc.Core.Internal { throw new InvalidOperationException("Cancellation of individual reads is not supported."); } - var taskSource = new AsyncCompletionTaskSource(); - call.StartReadMessage(taskSource.CompletionDelegate); - var result = await taskSource.Task.ConfigureAwait(false); + var result = await call.ReadMessageAsync().ConfigureAwait(false); this.current = result; if (result == null) diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 1f83e51548e..bf9df9f783b 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -93,7 +93,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -149,7 +149,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -209,7 +209,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -260,7 +260,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -282,9 +282,7 @@ namespace Grpc.Core.Internal asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); - var responseStream = new ServerResponseStream(asyncCall); - - await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false); await finishedTask.ConfigureAwait(false); } } diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs index e7be82c3185..d76030d1add 100644 --- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs @@ -68,9 +68,7 @@ namespace Grpc.Core.Internal { throw new InvalidOperationException("Cancellation of individual reads is not supported."); } - var taskSource = new AsyncCompletionTaskSource(); - call.StartReadMessage(taskSource.CompletionDelegate); - var result = await taskSource.Task.ConfigureAwait(false); + var result = await call.ReadMessageAsync().ConfigureAwait(false); this.current = result; return result != null; } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index 03e39efc024..ecfee0bfddb 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -57,13 +57,6 @@ namespace Grpc.Core.Internal return taskSource.Task; } - public Task WriteStatusAsync(Status status, Metadata trailers) - { - var taskSource = new AsyncCompletionTaskSource(); - call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate); - return taskSource.Task; - } - public Task WriteResponseHeadersAsync(Metadata responseHeaders) { var taskSource = new AsyncCompletionTaskSource(); diff --git a/src/csharp/tests.json b/src/csharp/tests.json index f733352a310..f6af3408d51 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -1,5 +1,6 @@ { "Grpc.Core.Tests": [ + "Grpc.Core.Internal.Tests.AsyncCallServerTest", "Grpc.Core.Internal.Tests.AsyncCallTest", "Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest", "Grpc.Core.Internal.Tests.CompletionQueueEventTest",