Merge pull request #8535 from jtattermusch/lazy_streaming_call_finished

Lazy initialize streamingResponseCallFinishedTcs
pull/7556/head
Jan Tattermusch 8 years ago committed by GitHub
commit 3600bf01ee
  1. 17
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  2. 2
      src/csharp/Grpc.Core/Internal/ClientResponseStream.cs

@ -52,9 +52,8 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null. // Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs; TaskCompletionSource<TResponse> unaryResponseTcs;
// TODO(jtattermusch): this field doesn't need to be initialized for unary response calls. // Completion of a streaming response call if not null.
// Indicates that response streaming call has finished. TaskCompletionSource<object> streamingResponseCallFinishedTcs;
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
// TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers). // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers).
// Response headers set here once received. // Response headers set here once received.
@ -198,6 +197,7 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg); byte[] payload = UnsafeSerialize(msg);
streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{ {
call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall()); call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
@ -219,6 +219,7 @@ namespace Grpc.Core.Internal
Initialize(details.Channel.CompletionQueue); Initialize(details.Channel.CompletionQueue);
streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{ {
call.StartDuplexStreaming(HandleFinished, metadataArray); call.StartDuplexStreaming(HandleFinished, metadataArray);
@ -276,13 +277,13 @@ namespace Grpc.Core.Internal
} }
/// <summary> /// <summary>
/// Get the task that completes once if streaming call finishes with ok status and throws RpcException with given status otherwise. /// Get the task that completes once if streaming response call finishes with ok status and throws RpcException with given status otherwise.
/// </summary> /// </summary>
public Task StreamingCallFinishedTask public Task StreamingResponseCallFinishedTask
{ {
get get
{ {
return streamingCallFinishedTcs.Task; return streamingResponseCallFinishedTcs.Task;
} }
} }
@ -529,11 +530,11 @@ namespace Grpc.Core.Internal
var status = receivedStatus.Status; var status = receivedStatus.Status;
if (status.StatusCode != StatusCode.OK) if (status.StatusCode != StatusCode.OK)
{ {
streamingCallFinishedTcs.SetException(new RpcException(status)); streamingResponseCallFinishedTcs.SetException(new RpcException(status));
return; return;
} }
streamingCallFinishedTcs.SetResult(null); streamingResponseCallFinishedTcs.SetResult(null);
} }
} }
} }

@ -73,7 +73,7 @@ namespace Grpc.Core.Internal
if (result == null) if (result == null)
{ {
await call.StreamingCallFinishedTask.ConfigureAwait(false); await call.StreamingResponseCallFinishedTask.ConfigureAwait(false);
return false; return false;
} }
return true; return true;

Loading…
Cancel
Save