finishing serverside request stream should not be required for disposal

pull/6420/head
Jan Tattermusch 9 years ago
parent f21f465bce
commit 739e86c394
  1. 34
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
  2. 7
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  3. 14
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs

@ -80,16 +80,24 @@ namespace Grpc.Core.Internal.Tests
[Test]
public void CancelNotificationAfterStartDisposes()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
[Test]
public void CancelNotificationAfterStartDisposesAfterPendingReadFinishes()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
@ -101,9 +109,8 @@ namespace Grpc.Core.Internal.Tests
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
// Check that startin a read after cancel notification has been processed is legal.
// Check that starting a read after cancel notification has been processed is legal.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
@ -136,12 +143,6 @@ namespace Grpc.Core.Internal.Tests
// TODO(jtattermusch): should we throw a different exception type instead?
Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1"));
// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
@ -149,7 +150,6 @@ namespace Grpc.Core.Internal.Tests
public void WriteCompletionFailureThrows()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
var writeTask = responseStream.WriteAsync("request1");
@ -157,13 +157,7 @@ namespace Grpc.Core.Internal.Tests
// TODO(jtattermusch): should we throw a different exception type instead?
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
@ -171,7 +165,6 @@ namespace Grpc.Core.Internal.Tests
public void WriteAndWriteStatusCanRunConcurrently()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
var writeTask = responseStream.WriteAsync("request1");
@ -183,11 +176,6 @@ namespace Grpc.Core.Internal.Tests
Assert.DoesNotThrowAsync(async () => await writeTask);
Assert.DoesNotThrowAsync(async () => await writeStatusTask);
// Finishing requestStream is needed for dispose to happen.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);

@ -155,7 +155,7 @@ namespace Grpc.Core.Internal
{
lock (myLock)
{
CheckReadingAllowed();
GrpcPreconditions.CheckState(started);
if (readingDone)
{
// the last read that returns null or throws an exception is idempotent
@ -224,11 +224,6 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
protected virtual void CheckReadingAllowed()
{
GrpcPreconditions.CheckState(started);
}
protected void CheckNotCancelled()
{
if (cancelRequested)

@ -183,12 +183,6 @@ namespace Grpc.Core.Internal
get { return false; }
}
protected override void CheckReadingAllowed()
{
base.CheckReadingAllowed();
GrpcPreconditions.CheckArgument(!cancelRequested);
}
protected override void OnAfterReleaseResources()
{
server.RemoveCallReference(this);
@ -204,6 +198,14 @@ namespace Grpc.Core.Internal
lock (myLock)
{
finished = true;
if (streamingReadTcs == null)
{
// if there's no pending read, readingDone=true will dispose now.
// if there is a pending read, we will dispose once that read finishes.
readingDone = true;
streamingReadTcs = new TaskCompletionSource<TRequest>();
streamingReadTcs.SetResult(default(TRequest));
}
ReleaseResourcesIfPossible();
}

Loading…
Cancel
Save