simplify stream reads on client side

pull/3029/head
Jan Tattermusch 10 years ago
parent fb34a99d98
commit 3af838a2d7
  1. 66
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  2. 35
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  3. 8
      src/csharp/Grpc.Core/Internal/ClientResponseStream.cs

@ -56,14 +56,15 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null. // Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs; TaskCompletionSource<TResponse> unaryResponseTcs;
// Indicates that steaming call has finished.
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
// Response headers set here once received. // Response headers set here once received.
TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>(); TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
// Set after status is received. Used for both unary and streaming response calls. // Set after status is received. Used for both unary and streaming response calls.
ClientSideStatus? finishedStatus; ClientSideStatus? finishedStatus;
bool readObserverCompleted; // True if readObserver has already been completed.
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails) public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment) : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment)
{ {
@ -74,8 +75,7 @@ namespace Grpc.Core.Internal
/// <summary> /// <summary>
/// This constructor should only be used for testing. /// This constructor should only be used for testing.
/// </summary> /// </summary>
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) : this(callDetails)
: this(callDetails)
{ {
this.injectedNativeCall = injectedNativeCall; this.injectedNativeCall = injectedNativeCall;
} }
@ -192,7 +192,6 @@ namespace Grpc.Core.Internal
Initialize(environment.CompletionQueue); Initialize(environment.CompletionQueue);
halfcloseRequested = true; halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
byte[] payload = UnsafeSerialize(msg); byte[] payload = UnsafeSerialize(msg);
@ -260,6 +259,17 @@ namespace Grpc.Core.Internal
} }
} }
/// <summary>
/// Get the task that completes once if streaming call finishes with ok status and throws RpcException with given status otherwise.
/// </summary>
public Task StreamingCallFinishedTask
{
get
{
return streamingCallFinishedTcs.Task;
}
}
/// <summary> /// <summary>
/// Get the task that completes once response headers are received. /// Get the task that completes once response headers are received.
/// </summary> /// </summary>
@ -305,36 +315,6 @@ namespace Grpc.Core.Internal
} }
} }
/// <summary>
/// On client-side, we only fire readCompletionDelegate once all messages have been read
/// and status has been received.
/// </summary>
protected override void ProcessLastRead(AsyncCompletionDelegate<TResponse> completionDelegate)
{
if (completionDelegate != null && readingDone && finishedStatus.HasValue)
{
bool shouldComplete;
lock (myLock)
{
shouldComplete = !readObserverCompleted;
readObserverCompleted = true;
}
if (shouldComplete)
{
var status = finishedStatus.Value.Status;
if (status.StatusCode != StatusCode.OK)
{
FireCompletion(completionDelegate, default(TResponse), new RpcException(status));
}
else
{
FireCompletion(completionDelegate, default(TResponse), null);
}
}
}
}
protected override void OnAfterReleaseResources() protected override void OnAfterReleaseResources()
{ {
details.Channel.RemoveCallReference(this); details.Channel.RemoveCallReference(this);
@ -392,8 +372,6 @@ namespace Grpc.Core.Internal
finished = true; finished = true;
finishedStatus = receivedStatus; finishedStatus = receivedStatus;
halfclosed = true;
ReleaseResourcesIfPossible(); ReleaseResourcesIfPossible();
} }
@ -403,7 +381,6 @@ namespace Grpc.Core.Internal
if (!success || status.StatusCode != StatusCode.OK) if (!success || status.StatusCode != StatusCode.OK)
{ {
unaryResponseTcs.SetException(new RpcException(status)); unaryResponseTcs.SetException(new RpcException(status));
return; return;
} }
@ -420,18 +397,23 @@ namespace Grpc.Core.Internal
/// </summary> /// </summary>
private void HandleFinished(bool success, ClientSideStatus receivedStatus) private void HandleFinished(bool success, ClientSideStatus receivedStatus)
{ {
AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null;
lock (myLock) lock (myLock)
{ {
finished = true; finished = true;
finishedStatus = receivedStatus; finishedStatus = receivedStatus;
origReadCompletionDelegate = readCompletionDelegate;
ReleaseResourcesIfPossible(); ReleaseResourcesIfPossible();
} }
ProcessLastRead(origReadCompletionDelegate); var status = receivedStatus.Status;
if (!success || status.StatusCode != StatusCode.OK)
{
streamingCallFinishedTcs.SetException(new RpcException(status));
return;
}
streamingCallFinishedTcs.SetResult(null);
} }
} }
} }

@ -61,19 +61,17 @@ namespace Grpc.Core.Internal
protected bool disposed; protected bool disposed;
protected bool started; protected bool started;
protected bool errorOccured;
protected bool cancelRequested; protected bool cancelRequested;
protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null. protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null. protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected bool readingDone; protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
protected bool halfcloseRequested; protected bool halfcloseRequested; // True if send close have been initiated.
protected bool halfclosed;
protected bool finished; // True if close has been received from the peer. protected bool finished; // True if close has been received from the peer.
protected bool initialMetadataSent; protected bool initialMetadataSent;
protected long streamingWritesCounter; protected long streamingWritesCounter; // Number of streaming send operations started so far.
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment) public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment)
{ {
@ -161,16 +159,6 @@ namespace Grpc.Core.Internal
} }
} }
// TODO(jtattermusch): find more fitting name for this method.
/// <summary>
/// Default behavior just completes the read observer, but more sofisticated behavior might be required
/// by subclasses.
/// </summary>
protected virtual void ProcessLastRead(AsyncCompletionDelegate<TRead> completionDelegate)
{
FireCompletion(completionDelegate, default(TRead), null);
}
/// <summary> /// <summary>
/// If there are no more pending actions and no new actions can be started, releases /// If there are no more pending actions and no new actions can be started, releases
/// the underlying native resources. /// the underlying native resources.
@ -179,7 +167,7 @@ namespace Grpc.Core.Internal
{ {
if (!disposed && call != null) if (!disposed && call != null)
{ {
bool noMoreSendCompletions = halfclosed || ((cancelRequested || finished) && sendCompletionDelegate == null); bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
if (noMoreSendCompletions && readingDone && finished) if (noMoreSendCompletions && readingDone && finished)
{ {
ReleaseResources(); ReleaseResources();
@ -206,7 +194,6 @@ namespace Grpc.Core.Internal
protected void CheckSendingAllowed() protected void CheckSendingAllowed()
{ {
Preconditions.CheckState(started); Preconditions.CheckState(started);
Preconditions.CheckState(!errorOccured);
CheckNotCancelled(); CheckNotCancelled();
Preconditions.CheckState(!disposed); Preconditions.CheckState(!disposed);
@ -218,7 +205,6 @@ namespace Grpc.Core.Internal
protected virtual void CheckReadingAllowed() protected virtual void CheckReadingAllowed()
{ {
Preconditions.CheckState(started); Preconditions.CheckState(started);
Preconditions.CheckState(!errorOccured);
Preconditions.CheckState(!disposed); Preconditions.CheckState(!disposed);
Preconditions.CheckState(!readingDone, "Stream has already been closed."); Preconditions.CheckState(!readingDone, "Stream has already been closed.");
@ -312,7 +298,6 @@ namespace Grpc.Core.Internal
AsyncCompletionDelegate<object> origCompletionDelegate = null; AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock) lock (myLock)
{ {
halfclosed = true;
origCompletionDelegate = sendCompletionDelegate; origCompletionDelegate = sendCompletionDelegate;
sendCompletionDelegate = null; sendCompletionDelegate = null;
@ -338,15 +323,11 @@ namespace Grpc.Core.Internal
lock (myLock) lock (myLock)
{ {
origCompletionDelegate = readCompletionDelegate; origCompletionDelegate = readCompletionDelegate;
if (receivedMessage != null)
{
readCompletionDelegate = null; readCompletionDelegate = null;
}
else if (receivedMessage == null)
{ {
// This was the last read. Keeping the readCompletionDelegate // This was the last read.
// to be either fired by this handler or by client-side finished
// handler.
readingDone = true; readingDone = true;
} }
@ -365,7 +346,7 @@ namespace Grpc.Core.Internal
} }
else else
{ {
ProcessLastRead(origCompletionDelegate); FireCompletion(origCompletionDelegate, default(TRead), null);
} }
} }
} }

@ -72,7 +72,13 @@ namespace Grpc.Core.Internal
call.StartReadMessage(taskSource.CompletionDelegate); call.StartReadMessage(taskSource.CompletionDelegate);
var result = await taskSource.Task; var result = await taskSource.Task;
this.current = result; this.current = result;
return result != null;
if (result == null)
{
await call.StreamingCallFinishedTask;
return false;
}
return true;
} }
public void Dispose() public void Dispose()

Loading…
Cancel
Save