From 3af838a2d719c65c360f28ee8551c6012f408401 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch <jtattermusch@google.com> Date: Fri, 21 Aug 2015 13:48:52 -0700 Subject: [PATCH] simplify stream reads on client side --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 66 +++++++------------ .../Grpc.Core/Internal/AsyncCallBase.cs | 37 +++-------- .../Internal/ClientResponseStream.cs | 8 ++- 3 files changed, 40 insertions(+), 71 deletions(-) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 132b4264243..d687bb62832 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -56,14 +56,15 @@ namespace Grpc.Core.Internal // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; + // Indicates that steaming call has finished. + TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>(); + // Response headers set here once received. TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>(); // Set after status is received. Used for both unary and streaming response calls. ClientSideStatus? finishedStatus; - bool readObserverCompleted; // True if readObserver has already been completed. - public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails) : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment) { @@ -74,8 +75,7 @@ namespace Grpc.Core.Internal /// <summary> /// This constructor should only be used for testing. /// </summary> - public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) - : this(callDetails) + public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) : this(callDetails) { this.injectedNativeCall = injectedNativeCall; } @@ -192,7 +192,6 @@ namespace Grpc.Core.Internal Initialize(environment.CompletionQueue); halfcloseRequested = true; - halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called. byte[] payload = UnsafeSerialize(msg); @@ -260,6 +259,17 @@ namespace Grpc.Core.Internal } } + /// <summary> + /// Get the task that completes once if streaming call finishes with ok status and throws RpcException with given status otherwise. + /// </summary> + public Task StreamingCallFinishedTask + { + get + { + return streamingCallFinishedTcs.Task; + } + } + /// <summary> /// Get the task that completes once response headers are received. /// </summary> @@ -305,36 +315,6 @@ namespace Grpc.Core.Internal } } - /// <summary> - /// On client-side, we only fire readCompletionDelegate once all messages have been read - /// and status has been received. - /// </summary> - protected override void ProcessLastRead(AsyncCompletionDelegate<TResponse> completionDelegate) - { - if (completionDelegate != null && readingDone && finishedStatus.HasValue) - { - bool shouldComplete; - lock (myLock) - { - shouldComplete = !readObserverCompleted; - readObserverCompleted = true; - } - - if (shouldComplete) - { - var status = finishedStatus.Value.Status; - if (status.StatusCode != StatusCode.OK) - { - FireCompletion(completionDelegate, default(TResponse), new RpcException(status)); - } - else - { - FireCompletion(completionDelegate, default(TResponse), null); - } - } - } - } - protected override void OnAfterReleaseResources() { details.Channel.RemoveCallReference(this); @@ -392,8 +372,6 @@ namespace Grpc.Core.Internal finished = true; finishedStatus = receivedStatus; - halfclosed = true; - ReleaseResourcesIfPossible(); } @@ -403,7 +381,6 @@ namespace Grpc.Core.Internal if (!success || status.StatusCode != StatusCode.OK) { - unaryResponseTcs.SetException(new RpcException(status)); return; } @@ -420,18 +397,23 @@ namespace Grpc.Core.Internal /// </summary> private void HandleFinished(bool success, ClientSideStatus receivedStatus) { - AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null; lock (myLock) { finished = true; finishedStatus = receivedStatus; - origReadCompletionDelegate = readCompletionDelegate; - ReleaseResourcesIfPossible(); } - ProcessLastRead(origReadCompletionDelegate); + var status = receivedStatus.Status; + + if (!success || status.StatusCode != StatusCode.OK) + { + streamingCallFinishedTcs.SetException(new RpcException(status)); + return; + } + + streamingCallFinishedTcs.SetResult(null); } } } \ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 7744dbec002..4d203946449 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -61,19 +61,17 @@ namespace Grpc.Core.Internal protected bool disposed; protected bool started; - protected bool errorOccured; protected bool cancelRequested; protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null. protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null. - protected bool readingDone; - protected bool halfcloseRequested; - protected bool halfclosed; + protected bool readingDone; // True if last read (i.e. read with null payload) was already received. + protected bool halfcloseRequested; // True if send close have been initiated. protected bool finished; // True if close has been received from the peer. protected bool initialMetadataSent; - protected long streamingWritesCounter; + protected long streamingWritesCounter; // Number of streaming send operations started so far. public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment) { @@ -161,16 +159,6 @@ namespace Grpc.Core.Internal } } - // TODO(jtattermusch): find more fitting name for this method. - /// <summary> - /// Default behavior just completes the read observer, but more sofisticated behavior might be required - /// by subclasses. - /// </summary> - protected virtual void ProcessLastRead(AsyncCompletionDelegate<TRead> completionDelegate) - { - FireCompletion(completionDelegate, default(TRead), null); - } - /// <summary> /// If there are no more pending actions and no new actions can be started, releases /// the underlying native resources. @@ -179,7 +167,7 @@ namespace Grpc.Core.Internal { if (!disposed && call != null) { - bool noMoreSendCompletions = halfclosed || ((cancelRequested || finished) && sendCompletionDelegate == null); + bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished); if (noMoreSendCompletions && readingDone && finished) { ReleaseResources(); @@ -206,7 +194,6 @@ namespace Grpc.Core.Internal protected void CheckSendingAllowed() { Preconditions.CheckState(started); - Preconditions.CheckState(!errorOccured); CheckNotCancelled(); Preconditions.CheckState(!disposed); @@ -218,7 +205,6 @@ namespace Grpc.Core.Internal protected virtual void CheckReadingAllowed() { Preconditions.CheckState(started); - Preconditions.CheckState(!errorOccured); Preconditions.CheckState(!disposed); Preconditions.CheckState(!readingDone, "Stream has already been closed."); @@ -312,7 +298,6 @@ namespace Grpc.Core.Internal AsyncCompletionDelegate<object> origCompletionDelegate = null; lock (myLock) { - halfclosed = true; origCompletionDelegate = sendCompletionDelegate; sendCompletionDelegate = null; @@ -338,15 +323,11 @@ namespace Grpc.Core.Internal lock (myLock) { origCompletionDelegate = readCompletionDelegate; - if (receivedMessage != null) - { - readCompletionDelegate = null; - } - else + readCompletionDelegate = null; + + if (receivedMessage == null) { - // This was the last read. Keeping the readCompletionDelegate - // to be either fired by this handler or by client-side finished - // handler. + // This was the last read. readingDone = true; } @@ -365,7 +346,7 @@ namespace Grpc.Core.Internal } else { - ProcessLastRead(origCompletionDelegate); + FireCompletion(origCompletionDelegate, default(TRead), null); } } } diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index 6c445210381..b4a7335c7ce 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -72,7 +72,13 @@ namespace Grpc.Core.Internal call.StartReadMessage(taskSource.CompletionDelegate); var result = await taskSource.Task; this.current = result; - return result != null; + + if (result == null) + { + await call.StreamingCallFinishedTask; + return false; + } + return true; } public void Dispose()