Merge pull request #6699 from jtattermusch/csharp_streaming_api_improvements

C# streaming api improvements
pull/6724/head
Jan Tattermusch 9 years ago
commit 55ebba1205
  1. 16
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
  2. 39
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
  3. 1
      src/csharp/Grpc.Core/Grpc.Core.csproj
  4. 90
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  5. 77
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  6. 30
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  7. 94
      src/csharp/Grpc.Core/Internal/AsyncCompletion.cs
  8. 8
      src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
  9. 8
      src/csharp/Grpc.Core/Internal/ServerResponseStream.cs

@ -136,7 +136,6 @@ namespace Grpc.Core.Internal.Tests
public void WriteAfterCancelNotificationFails()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
@ -181,6 +180,21 @@ namespace Grpc.Core.Internal.Tests
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
[Test]
public void WriteAfterWriteStatusThrowsInvalidOperationException()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null);
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await responseStream.WriteAsync("request1"));
fakeCall.SendStatusFromServerHandler(true);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
static void AssertFinished(AsyncCallServer<string, string> asyncCallServer, FakeNativeCall fakeCall, Task finishedTask)
{
Assert.IsTrue(fakeCall.IsDisposed);

@ -33,7 +33,6 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@ -82,7 +81,7 @@ namespace Grpc.Core.Internal.Tests
Assert.ThrowsAsync(typeof(InvalidOperationException),
async () => await asyncCall.ReadMessageAsync());
Assert.Throws(typeof(InvalidOperationException),
() => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
() => asyncCall.SendMessageAsync("abc", new WriteFlags()));
}
[Test]
@ -103,7 +102,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.UnaryCallAsync("request1");
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
CreateResponsePayload(),
null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
@ -148,7 +147,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.ClientStreamingCallAsync();
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
CreateResponsePayload(),
null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
@ -193,7 +192,7 @@ namespace Grpc.Core.Internal.Tests
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Internal),
CreateResponsePayload(),
null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
@ -211,7 +210,9 @@ namespace Grpc.Core.Internal.Tests
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
var writeTask = requestStream.WriteAsync("request1");
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
@ -223,11 +224,13 @@ namespace Grpc.Core.Internal.Tests
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
CreateResponsePayload(),
null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange);
var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
var writeTask = requestStream.WriteAsync("request1");
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode);
}
@ -267,7 +270,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void ClientStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
public void ClientStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@ -275,11 +278,12 @@ namespace Grpc.Core.Internal.Tests
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
var writeTask = requestStream.WriteAsync("request1");
Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Cancelled),
CreateResponsePayload(),
null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled);
@ -290,7 +294,7 @@ namespace Grpc.Core.Internal.Tests
{
asyncCall.StartServerStreamingCall("request1");
Assert.Throws(typeof(InvalidOperationException),
() => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
() => asyncCall.SendMessageAsync("abc", new WriteFlags()));
}
[Test]
@ -390,12 +394,13 @@ namespace Grpc.Core.Internal.Tests
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
var ex = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("request1"));
var writeTask = requestStream.WriteAsync("request1");
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
[Test]
public void DuplexStreaming_CompleteAfterReceivingStatusFails()
public void DuplexStreaming_CompleteAfterReceivingStatusSuceeds()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@ -411,7 +416,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void DuplexStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@ -419,7 +424,9 @@ namespace Grpc.Core.Internal.Tests
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
var writeTask = requestStream.WriteAsync("request1");
Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);

@ -86,7 +86,6 @@
<Compile Include="Utils\BenchmarkUtil.cs" />
<Compile Include="ChannelCredentials.cs" />
<Compile Include="Internal\ChannelArgsSafeHandle.cs" />
<Compile Include="Internal\AsyncCompletion.cs" />
<Compile Include="Internal\AsyncCallBase.cs" />
<Compile Include="Internal\AsyncCallServer.cs" />
<Compile Include="Internal\AsyncCall.cs" />

@ -32,12 +32,7 @@
#endregion
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Profiling;
using Grpc.Core.Utils;
@ -57,9 +52,11 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
// TODO(jtattermusch): this field doesn't need to be initialized for unary response calls.
// Indicates that response streaming call has finished.
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
// TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers).
// Response headers set here once received.
TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
@ -232,11 +229,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends a streaming request. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags)
{
StartSendMessageInternal(msg, writeFlags, completionDelegate);
return SendMessageInternalAsync(msg, writeFlags);
}
/// <summary>
@ -250,29 +246,32 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends halfclose, indicating client is done with streaming requests.
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate)
public Task SendCloseFromClientAsync()
{
lock (myLock)
{
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed(allowFinished: true);
GrpcPreconditions.CheckState(started);
if (!disposed && !finished)
var earlyResult = CheckSendPreconditionsClientSide();
if (earlyResult != null)
{
call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
return earlyResult;
}
else
if (disposed || finished)
{
// In case the call has already been finished by the serverside,
// the halfclose has already been done implicitly, so we only
// emit the notification for the completion delegate.
Task.Run(() => HandleSendCloseFromClientFinished(true));
// the halfclose has already been done implicitly, so just return
// completed task here.
halfcloseRequested = true;
return Task.FromResult<object>(null);
}
call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;
streamingWriteTcs = new TaskCompletionSource<object>();
return streamingWriteTcs.Task;
}
}
@ -342,6 +341,45 @@ namespace Grpc.Core.Internal
get { return true; }
}
protected override Task CheckSendAllowedOrEarlyResult()
{
var earlyResult = CheckSendPreconditionsClientSide();
if (earlyResult != null)
{
return earlyResult;
}
if (finishedStatus.HasValue)
{
// throwing RpcException if we already received status on client
// side makes the most sense.
// Note that this throws even for StatusCode.OK.
// Writing after the call has finished is not a programming error because server can close
// the call anytime, so don't throw directly, but let the write task finish with an error.
var tcs = new TaskCompletionSource<object>();
tcs.SetException(new RpcException(finishedStatus.Value.Status));
return tcs.Task;
}
return null;
}
private Task CheckSendPreconditionsClientSide()
{
GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed.");
GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time.");
if (cancelRequested)
{
// Return a cancelled task.
var tcs = new TaskCompletionSource<object>();
tcs.SetCanceled();
return tcs.Task;
}
return null;
}
private void Initialize(CompletionQueueSafeHandle cq)
{
using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize"))
@ -400,6 +438,7 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
{
// TODO(jtattermusch): handle success==false
responseHeadersTcs.SetResult(responseHeaders);
}
@ -443,19 +482,6 @@ namespace Grpc.Core.Internal
}
}
protected override void CheckSendingAllowed(bool allowFinished)
{
base.CheckSendingAllowed(true);
// throwing RpcException if we already received status on client
// side makes the most sense.
// Note that this throws even for StatusCode.OK.
if (!allowFinished && finishedStatus.HasValue)
{
throw new RpcException(finishedStatus.Value.Status);
}
}
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>

@ -67,8 +67,8 @@ namespace Grpc.Core.Internal
protected bool started;
protected bool cancelRequested;
protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null.
protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
@ -128,28 +128,31 @@ namespace Grpc.Core.Internal
/// <summary>
/// Initiates sending a message. Only one send operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags)
{
byte[] payload = UnsafeSerialize(msg);
lock (myLock)
{
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed(allowFinished: false);
GrpcPreconditions.CheckState(started);
var earlyResult = CheckSendAllowedOrEarlyResult();
if (earlyResult != null)
{
return earlyResult;
}
call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
sendCompletionDelegate = completionDelegate;
initialMetadataSent = true;
streamingWritesCounter++;
streamingWriteTcs = new TaskCompletionSource<object>();
return streamingWriteTcs.Task;
}
}
/// <summary>
/// Initiates reading a message. Only one read operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
protected Task<TRead> ReadMessageInternalAsync()
{
@ -159,7 +162,7 @@ namespace Grpc.Core.Internal
if (readingDone)
{
// the last read that returns null or throws an exception is idempotent
// and maintain its state.
// and maintains its state.
GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
return streamingReadTcs.Task;
}
@ -183,7 +186,7 @@ namespace Grpc.Core.Internal
{
if (!disposed && call != null)
{
bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished);
if (noMoreSendCompletions && readingDone && finished)
{
ReleaseResources();
@ -213,24 +216,11 @@ namespace Grpc.Core.Internal
{
}
protected virtual void CheckSendingAllowed(bool allowFinished)
{
GrpcPreconditions.CheckState(started);
CheckNotCancelled();
GrpcPreconditions.CheckState(!disposed || allowFinished);
GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
GrpcPreconditions.CheckState(!finished || allowFinished, "Already finished.");
GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
protected void CheckNotCancelled()
{
if (cancelRequested)
{
throw new OperationCanceledException("Remote call has been cancelled.");
}
}
/// <summary>
/// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send
/// logic by directly returning the write operation result task. Normally, null is returned.
/// </summary>
protected abstract Task CheckSendAllowedOrEarlyResult();
protected byte[] UnsafeSerialize(TWrite msg)
{
@ -259,39 +249,27 @@ namespace Grpc.Core.Internal
}
}
protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDelegate, T value, Exception error)
{
try
{
completionDelegate(value, error);
}
catch (Exception e)
{
Logger.Error(e, "Exception occured while invoking completion delegate.");
}
}
/// <summary>
/// Handles send completion.
/// </summary>
protected void HandleSendFinished(bool success)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
origCompletionDelegate = sendCompletionDelegate;
sendCompletionDelegate = null;
origTcs = streamingWriteTcs;
streamingWriteTcs = null;
ReleaseResourcesIfPossible();
}
if (!success)
{
FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed"));
origTcs.SetException(new InvalidOperationException("Send failed"));
}
else
{
FireCompletion(origCompletionDelegate, null, null);
origTcs.SetResult(null);
}
}
@ -300,22 +278,23 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleSendCloseFromClientFinished(bool success)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
origCompletionDelegate = sendCompletionDelegate;
sendCompletionDelegate = null;
origTcs = streamingWriteTcs;
streamingWriteTcs = null;
ReleaseResourcesIfPossible();
}
if (!success)
{
FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Sending close from client has failed."));
// TODO(jtattermusch): this method is same as HandleSendFinished (only the error message differs).
origTcs.SetException(new InvalidOperationException("Sending close from client has failed."));
}
else
{
FireCompletion(origCompletionDelegate, null, null);
origTcs.SetResult(null);
}
}

@ -91,11 +91,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends a streaming response. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
public Task SendMessageAsync(TResponse msg, WriteFlags writeFlags)
{
StartSendMessageInternal(msg, writeFlags, completionDelegate);
return SendMessageInternalAsync(msg, writeFlags);
}
/// <summary>
@ -110,20 +109,22 @@ namespace Grpc.Core.Internal
/// Initiates sending a initial metadata.
/// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
/// to make things simpler.
/// completionDelegate is invoked upon completion.
/// </summary>
public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate)
public Task SendInitialMetadataAsync(Metadata headers)
{
lock (myLock)
{
GrpcPreconditions.CheckNotNull(headers, "metadata");
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
GrpcPreconditions.CheckState(started);
GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
CheckSendingAllowed(allowFinished: false);
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
var earlyResult = CheckSendAllowedOrEarlyResult();
if (earlyResult != null)
{
return earlyResult;
}
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
@ -131,7 +132,8 @@ namespace Grpc.Core.Internal
}
this.initialMetadataSent = true;
sendCompletionDelegate = completionDelegate;
streamingWriteTcs = new TaskCompletionSource<object>();
return streamingWriteTcs.Task;
}
}
@ -196,6 +198,16 @@ namespace Grpc.Core.Internal
server.RemoveCallReference(this);
}
protected override Task CheckSendAllowedOrEarlyResult()
{
GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed.");
GrpcPreconditions.CheckState(!finished, "Already finished.");
GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time");
GrpcPreconditions.CheckState(!disposed);
return null;
}
/// <summary>
/// Handles the server side close completion.
/// </summary>

@ -1,94 +0,0 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// If error != null, there's been an error or operation has been cancelled.
/// </summary>
internal delegate void AsyncCompletionDelegate<T>(T result, Exception error);
/// <summary>
/// Helper for transforming AsyncCompletionDelegate into full-fledged Task.
/// </summary>
internal class AsyncCompletionTaskSource<T>
{
readonly TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
readonly AsyncCompletionDelegate<T> completionDelegate;
public AsyncCompletionTaskSource()
{
completionDelegate = new AsyncCompletionDelegate<T>(HandleCompletion);
}
public Task<T> Task
{
get
{
return tcs.Task;
}
}
public AsyncCompletionDelegate<T> CompletionDelegate
{
get
{
return completionDelegate;
}
}
private void HandleCompletion(T value, Exception error)
{
if (error == null)
{
tcs.SetResult(value);
return;
}
if (error is OperationCanceledException)
{
tcs.SetCanceled();
return;
}
tcs.SetException(error);
}
}
}

@ -50,16 +50,12 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TRequest message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
return call.SendMessageAsync(message, GetWriteFlags());
}
public Task CompleteAsync()
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendCloseFromClient(taskSource.CompletionDelegate);
return taskSource.Task;
return call.SendCloseFromClientAsync();
}
public WriteOptions WriteOptions

@ -52,16 +52,12 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TResponse message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
return call.SendMessageAsync(message, GetWriteFlags());
}
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate);
return taskSource.Task;
return call.SendInitialMetadataAsync(responseHeaders);
}
public WriteOptions WriteOptions

Loading…
Cancel
Save