diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs index 0b6981f871d..058371521d6 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -80,16 +80,24 @@ namespace Grpc.Core.Internal.Tests [Test] public void CancelNotificationAfterStartDisposes() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + + [Test] + public void CancelNotificationAfterStartDisposesAfterPendingReadFinishes() { var finishedTask = asyncCallServer.ServerSideCallAsync(); var requestStream = new ServerRequestStream(asyncCallServer); - // Finishing requestStream is needed for dispose to happen. var moveNextTask = requestStream.MoveNext(); + + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); fakeCall.ReceivedMessageHandler(true, null); Assert.IsFalse(moveNextTask.Result); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -101,9 +109,8 @@ namespace Grpc.Core.Internal.Tests fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); - // Check that startin a read after cancel notification has been processed is legal. + // Check that starting a read after cancel notification has been processed is legal. var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); Assert.IsFalse(moveNextTask.Result); AssertFinished(asyncCallServer, fakeCall, finishedTask); @@ -136,12 +143,6 @@ namespace Grpc.Core.Internal.Tests // TODO(jtattermusch): should we throw a different exception type instead? Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1")); - - // Finishing requestStream is needed for dispose to happen. - var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - Assert.IsFalse(moveNextTask.Result); - AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -149,7 +150,6 @@ namespace Grpc.Core.Internal.Tests public void WriteCompletionFailureThrows() { var finishedTask = asyncCallServer.ServerSideCallAsync(); - var requestStream = new ServerRequestStream(asyncCallServer); var responseStream = new ServerResponseStream(asyncCallServer); var writeTask = responseStream.WriteAsync("request1"); @@ -157,13 +157,7 @@ namespace Grpc.Core.Internal.Tests // TODO(jtattermusch): should we throw a different exception type instead? Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask); - // Finishing requestStream is needed for dispose to happen. - var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - Assert.IsFalse(moveNextTask.Result); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); - AssertFinished(asyncCallServer, fakeCall, finishedTask); } @@ -171,7 +165,6 @@ namespace Grpc.Core.Internal.Tests public void WriteAndWriteStatusCanRunConcurrently() { var finishedTask = asyncCallServer.ServerSideCallAsync(); - var requestStream = new ServerRequestStream(asyncCallServer); var responseStream = new ServerResponseStream(asyncCallServer); var writeTask = responseStream.WriteAsync("request1"); @@ -183,11 +176,6 @@ namespace Grpc.Core.Internal.Tests Assert.DoesNotThrowAsync(async () => await writeTask); Assert.DoesNotThrowAsync(async () => await writeStatusTask); - // Finishing requestStream is needed for dispose to happen. - var moveNextTask = requestStream.MoveNext(); - fakeCall.ReceivedMessageHandler(true, null); - Assert.IsFalse(moveNextTask.Result); - fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); AssertFinished(asyncCallServer, fakeCall, finishedTask); diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 18dbe87734b..42234dcac21 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -155,7 +155,7 @@ namespace Grpc.Core.Internal { lock (myLock) { - CheckReadingAllowed(); + GrpcPreconditions.CheckState(started); if (readingDone) { // the last read that returns null or throws an exception is idempotent @@ -224,11 +224,6 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); } - protected virtual void CheckReadingAllowed() - { - GrpcPreconditions.CheckState(started); - } - protected void CheckNotCancelled() { if (cancelRequested) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 44f2988e21e..eafe2ccab87 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -183,12 +183,6 @@ namespace Grpc.Core.Internal get { return false; } } - protected override void CheckReadingAllowed() - { - base.CheckReadingAllowed(); - GrpcPreconditions.CheckArgument(!cancelRequested); - } - protected override void OnAfterReleaseResources() { server.RemoveCallReference(this); @@ -204,6 +198,14 @@ namespace Grpc.Core.Internal lock (myLock) { finished = true; + if (streamingReadTcs == null) + { + // if there's no pending read, readingDone=true will dispose now. + // if there is a pending read, we will dispose once that read finishes. + readingDone = true; + streamingReadTcs = new TaskCompletionSource(); + streamingReadTcs.SetResult(default(TRequest)); + } ReleaseResourcesIfPossible(); } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index bf9df9f783b..00d82d51e82 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -80,8 +80,6 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); var request = requestStream.Current; - // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false)); var result = await handler(request, context).ConfigureAwait(false); status = context.Status; await responseStream.WriteAsync(result).ConfigureAwait(false); @@ -136,8 +134,6 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); var request = requestStream.Current; - // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false)); await handler(request, responseStream, context).ConfigureAwait(false); status = context.Status; } @@ -298,7 +294,6 @@ namespace Grpc.Core.Internal return rpcException.Status; } - // TODO(jtattermusch): what is the right status code here? return new Status(StatusCode.Unknown, "Exception was thrown by handler."); }