From afe1fe8a09712084b6cce54f88dff12d85da2b8b Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 19 May 2016 21:01:26 -0700 Subject: [PATCH 01/15] use tcs for streamingWrites --- src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 4de23706b28..2784751b1cb 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -67,8 +67,8 @@ namespace Grpc.Core.Internal protected bool started; protected bool cancelRequested; - protected AsyncCompletionDelegate sendCompletionDelegate; // Completion of a pending send or sendclose if not null. protected TaskCompletionSource streamingReadTcs; // Completion of a pending streaming read if not null. + protected TaskCompletionSource streamingWriteTcs; // Completion of a pending streaming write if not null. protected TaskCompletionSource sendStatusFromServerTcs; protected bool readingDone; // True if last read (i.e. read with null payload) was already received. From 82e4581898bcee7dc83015fd5a1ae9eb25511757 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 19 May 2016 21:25:54 -0700 Subject: [PATCH 02/15] get rid of AsyncCompletionDelegate --- .../Grpc.Core.Tests/Internal/AsyncCallTest.cs | 4 +- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 12 ++--- .../Grpc.Core/Internal/AsyncCallBase.cs | 47 +++++++------------ .../Grpc.Core/Internal/AsyncCallServer.cs | 14 ++---- .../Grpc.Core/Internal/ClientRequestStream.cs | 8 +--- .../Internal/ServerResponseStream.cs | 8 +--- 6 files changed, 33 insertions(+), 60 deletions(-) diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index 777a1c8c500..d7e112c4155 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -82,7 +82,7 @@ namespace Grpc.Core.Internal.Tests Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await asyncCall.ReadMessageAsync()); Assert.Throws(typeof(InvalidOperationException), - () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {})); + () => asyncCall.SendMessageAsync("abc", new WriteFlags())); } [Test] @@ -290,7 +290,7 @@ namespace Grpc.Core.Internal.Tests { asyncCall.StartServerStreamingCall("request1"); Assert.Throws(typeof(InvalidOperationException), - () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {})); + () => asyncCall.SendMessageAsync("abc", new WriteFlags())); } [Test] diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 55351869b5c..fabe4e757fd 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -232,11 +232,10 @@ namespace Grpc.Core.Internal /// /// Sends a streaming request. Only one pending send action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// - public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate completionDelegate) + public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags) { - StartSendMessageInternal(msg, writeFlags, completionDelegate); + return SendMessageInternalAsync(msg, writeFlags); } /// @@ -250,13 +249,11 @@ namespace Grpc.Core.Internal /// /// Sends halfclose, indicating client is done with streaming requests. /// Only one pending send action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// - public void StartSendCloseFromClient(AsyncCompletionDelegate completionDelegate) + public Task SendCloseFromClientAsync() { lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckSendingAllowed(allowFinished: true); if (!disposed && !finished) @@ -272,7 +269,8 @@ namespace Grpc.Core.Internal } halfcloseRequested = true; - sendCompletionDelegate = completionDelegate; + streamingWriteTcs = new TaskCompletionSource(); + return streamingWriteTcs.Task; } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 2784751b1cb..180b89db4d2 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -68,7 +68,7 @@ namespace Grpc.Core.Internal protected bool cancelRequested; protected TaskCompletionSource streamingReadTcs; // Completion of a pending streaming read if not null. - protected TaskCompletionSource streamingWriteTcs; // Completion of a pending streaming write if not null. + protected TaskCompletionSource streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null. protected TaskCompletionSource sendStatusFromServerTcs; protected bool readingDone; // True if last read (i.e. read with null payload) was already received. @@ -128,28 +128,26 @@ namespace Grpc.Core.Internal /// /// Initiates sending a message. Only one send operation can be active at a time. - /// completionDelegate is invoked upon completion. /// - protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate completionDelegate) + protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags) { byte[] payload = UnsafeSerialize(msg); lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckSendingAllowed(allowFinished: false); call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); - sendCompletionDelegate = completionDelegate; initialMetadataSent = true; streamingWritesCounter++; + streamingWriteTcs = new TaskCompletionSource(); + return streamingWriteTcs.Task; } } /// /// Initiates reading a message. Only one read operation can be active at a time. - /// completionDelegate is invoked upon completion. /// protected Task ReadMessageInternalAsync() { @@ -183,7 +181,7 @@ namespace Grpc.Core.Internal { if (!disposed && call != null) { - bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished); + bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished); if (noMoreSendCompletions && readingDone && finished) { ReleaseResources(); @@ -221,7 +219,7 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed."); GrpcPreconditions.CheckState(!finished || allowFinished, "Already finished."); - GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time"); } protected void CheckNotCancelled() @@ -259,39 +257,27 @@ namespace Grpc.Core.Internal } } - protected void FireCompletion(AsyncCompletionDelegate completionDelegate, T value, Exception error) - { - try - { - completionDelegate(value, error); - } - catch (Exception e) - { - Logger.Error(e, "Exception occured while invoking completion delegate."); - } - } - /// /// Handles send completion. /// protected void HandleSendFinished(bool success) { - AsyncCompletionDelegate origCompletionDelegate = null; + TaskCompletionSource origTcs = null; lock (myLock) { - origCompletionDelegate = sendCompletionDelegate; - sendCompletionDelegate = null; + origTcs = streamingWriteTcs; + streamingWriteTcs = null; ReleaseResourcesIfPossible(); } if (!success) { - FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed")); + origTcs.SetException(new InvalidOperationException("Send failed")); } else { - FireCompletion(origCompletionDelegate, null, null); + origTcs.SetResult(null); } } @@ -300,22 +286,23 @@ namespace Grpc.Core.Internal /// protected void HandleSendCloseFromClientFinished(bool success) { - AsyncCompletionDelegate origCompletionDelegate = null; + TaskCompletionSource origTcs = null; lock (myLock) { - origCompletionDelegate = sendCompletionDelegate; - sendCompletionDelegate = null; + origTcs = streamingWriteTcs; + streamingWriteTcs = null; ReleaseResourcesIfPossible(); } if (!success) { - FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Sending close from client has failed.")); + // TODO(jtattermusch): this method is same as HandleSendFinished (only the error message differs). + origTcs.SetException(new InvalidOperationException("Sending close from client has failed.")); } else { - FireCompletion(origCompletionDelegate, null, null); + origTcs.SetResult(null); } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index b1566b44a7c..b5dca4290f3 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -91,11 +91,10 @@ namespace Grpc.Core.Internal /// /// Sends a streaming response. Only one pending send action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// - public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate completionDelegate) + public Task SendMessageAsync(TResponse msg, WriteFlags writeFlags) { - StartSendMessageInternal(msg, writeFlags, completionDelegate); + return SendMessageInternalAsync(msg, writeFlags); } /// @@ -110,28 +109,25 @@ namespace Grpc.Core.Internal /// Initiates sending a initial metadata. /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation /// to make things simpler. - /// completionDelegate is invoked upon completion. /// - public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate completionDelegate) + public Task SendInitialMetadataAsync(Metadata headers) { lock (myLock) { GrpcPreconditions.CheckNotNull(headers, "metadata"); - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call."); GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts."); CheckSendingAllowed(allowFinished: false); - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); - using (var metadataArray = MetadataArraySafeHandle.Create(headers)) { call.StartSendInitialMetadata(HandleSendFinished, metadataArray); } this.initialMetadataSent = true; - sendCompletionDelegate = completionDelegate; + streamingWriteTcs = new TaskCompletionSource(); + return streamingWriteTcs.Task; } } diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs index 013f00ff6fc..924de028f51 100644 --- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs @@ -50,16 +50,12 @@ namespace Grpc.Core.Internal public Task WriteAsync(TRequest message) { - var taskSource = new AsyncCompletionTaskSource(); - call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate); - return taskSource.Task; + return call.SendMessageAsync(message, GetWriteFlags()); } public Task CompleteAsync() { - var taskSource = new AsyncCompletionTaskSource(); - call.StartSendCloseFromClient(taskSource.CompletionDelegate); - return taskSource.Task; + return call.SendCloseFromClientAsync(); } public WriteOptions WriteOptions diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index ecfee0bfddb..25b79b43988 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -52,16 +52,12 @@ namespace Grpc.Core.Internal public Task WriteAsync(TResponse message) { - var taskSource = new AsyncCompletionTaskSource(); - call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate); - return taskSource.Task; + return call.SendMessageAsync(message, GetWriteFlags()); } public Task WriteResponseHeadersAsync(Metadata responseHeaders) { - var taskSource = new AsyncCompletionTaskSource(); - call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate); - return taskSource.Task; + return call.SendInitialMetadataAsync(responseHeaders); } public WriteOptions WriteOptions From 6620dc3e2db773a0271f19f2d74ab9335fb81c8a Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 19 May 2016 21:29:18 -0700 Subject: [PATCH 03/15] remote AsyncCompletion --- src/csharp/Grpc.Core/Grpc.Core.csproj | 1 - .../Grpc.Core/Internal/AsyncCompletion.cs | 94 ------------------- 2 files changed, 95 deletions(-) delete mode 100644 src/csharp/Grpc.Core/Internal/AsyncCompletion.cs diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 4bf30e83c1b..a8b7b5f00d9 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -86,7 +86,6 @@ - diff --git a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs b/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs deleted file mode 100644 index 7e86fddb4d8..00000000000 --- a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs +++ /dev/null @@ -1,94 +0,0 @@ -#region Copyright notice and license - -// Copyright 2015, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -#endregion - -using System; -using System.Diagnostics; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; -using System.Threading; -using System.Threading.Tasks; -using Grpc.Core.Internal; -using Grpc.Core.Utils; - -namespace Grpc.Core.Internal -{ - /// - /// If error != null, there's been an error or operation has been cancelled. - /// - internal delegate void AsyncCompletionDelegate(T result, Exception error); - - /// - /// Helper for transforming AsyncCompletionDelegate into full-fledged Task. - /// - internal class AsyncCompletionTaskSource - { - readonly TaskCompletionSource tcs = new TaskCompletionSource(); - readonly AsyncCompletionDelegate completionDelegate; - - public AsyncCompletionTaskSource() - { - completionDelegate = new AsyncCompletionDelegate(HandleCompletion); - } - - public Task Task - { - get - { - return tcs.Task; - } - } - - public AsyncCompletionDelegate CompletionDelegate - { - get - { - return completionDelegate; - } - } - - private void HandleCompletion(T value, Exception error) - { - if (error == null) - { - tcs.SetResult(value); - return; - } - if (error is OperationCanceledException) - { - tcs.SetCanceled(); - return; - } - tcs.SetException(error); - } - } -} From 421d2c411a06e2f5e5fd1e03e439c27df73cefc6 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 19 May 2016 21:36:09 -0700 Subject: [PATCH 04/15] add TODOs and remove unused imports --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index fabe4e757fd..b50580c40f0 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -32,12 +32,7 @@ #endregion using System; -using System.Diagnostics; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; -using System.Threading; using System.Threading.Tasks; -using Grpc.Core.Internal; using Grpc.Core.Logging; using Grpc.Core.Profiling; using Grpc.Core.Utils; @@ -57,9 +52,11 @@ namespace Grpc.Core.Internal // Completion of a pending unary response if not null. TaskCompletionSource unaryResponseTcs; + // TODO(jtattermusch): this field doesn't need to be initialized for unary response calls. // Indicates that response streaming call has finished. TaskCompletionSource streamingCallFinishedTcs = new TaskCompletionSource(); + // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers). // Response headers set here once received. TaskCompletionSource responseHeadersTcs = new TaskCompletionSource(); From 0aca838d7d33df47e3b7839b29c50855ab960ef8 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 23 May 2016 11:01:54 -0400 Subject: [PATCH 05/15] add a server streaming api test --- .../Internal/AsyncCallServerTest.cs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs index 0e204761f61..f14a61d34f8 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -181,6 +181,21 @@ namespace Grpc.Core.Internal.Tests AssertFinished(asyncCallServer, fakeCall, finishedTask); } + [Test] + public void WriteAfterWriteStatusThrowsInvalidOperationException() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var responseStream = new ServerResponseStream(asyncCallServer); + + asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null); + Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await responseStream.WriteAsync("request1")); + + fakeCall.SendStatusFromServerHandler(true); + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + static void AssertFinished(AsyncCallServer asyncCallServer, FakeNativeCall fakeCall, Task finishedTask) { Assert.IsTrue(fakeCall.IsDisposed); From 84dcf0661ebe8436f52d3693ac9443ab1d7ac99b Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 23 May 2016 12:13:34 -0400 Subject: [PATCH 06/15] cleanup in tests --- .../Grpc.Core.Tests/Internal/AsyncCallServerTest.cs | 1 - src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs | 10 +++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs index f14a61d34f8..4e5a23b3c23 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -136,7 +136,6 @@ namespace Grpc.Core.Internal.Tests public void WriteAfterCancelNotificationFails() { var finishedTask = asyncCallServer.ServerSideCallAsync(); - var requestStream = new ServerRequestStream(asyncCallServer); var responseStream = new ServerResponseStream(asyncCallServer); fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index d7e112c4155..ae9dd6a6bf5 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -103,7 +103,7 @@ namespace Grpc.Core.Internal.Tests var resultTask = asyncCall.UnaryCallAsync("request1"); fakeCall.UnaryResponseClientHandler(true, CreateClientSideStatus(StatusCode.InvalidArgument), - CreateResponsePayload(), + null, new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument); @@ -148,7 +148,7 @@ namespace Grpc.Core.Internal.Tests var resultTask = asyncCall.ClientStreamingCallAsync(); fakeCall.UnaryResponseClientHandler(true, CreateClientSideStatus(StatusCode.InvalidArgument), - CreateResponsePayload(), + null, new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument); @@ -193,7 +193,7 @@ namespace Grpc.Core.Internal.Tests fakeCall.UnaryResponseClientHandler(true, CreateClientSideStatus(StatusCode.Internal), - CreateResponsePayload(), + null, new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); @@ -223,7 +223,7 @@ namespace Grpc.Core.Internal.Tests fakeCall.UnaryResponseClientHandler(true, new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()), - CreateResponsePayload(), + null, new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange); @@ -279,7 +279,7 @@ namespace Grpc.Core.Internal.Tests fakeCall.UnaryResponseClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled), - CreateResponsePayload(), + null, new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled); From 8472cc5bc5428b5006b9ca608d399159d7daeee1 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 23 May 2016 12:41:57 -0400 Subject: [PATCH 07/15] preparation for write semantics cleanup --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 1 + src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index b50580c40f0..ab194121a7a 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -251,6 +251,7 @@ namespace Grpc.Core.Internal { lock (myLock) { + GrpcPreconditions.CheckState(started); CheckSendingAllowed(allowFinished: true); if (!disposed && !finished) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 180b89db4d2..df313cbb734 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -135,6 +135,7 @@ namespace Grpc.Core.Internal lock (myLock) { + GrpcPreconditions.CheckState(started); CheckSendingAllowed(allowFinished: false); call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); @@ -213,7 +214,6 @@ namespace Grpc.Core.Internal protected virtual void CheckSendingAllowed(bool allowFinished) { - GrpcPreconditions.CheckState(started); CheckNotCancelled(); GrpcPreconditions.CheckState(!disposed || allowFinished); From 239fce134426d73eb8d433f618f22aab10821826 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 23 May 2016 13:24:39 -0400 Subject: [PATCH 08/15] simplify implementation of SendCloseFromClient --- .../Grpc.Core.Tests/Internal/AsyncCallTest.cs | 2 +- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 14 ++++++-------- src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 2 +- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index ae9dd6a6bf5..b4dd2c107e1 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -395,7 +395,7 @@ namespace Grpc.Core.Internal.Tests } [Test] - public void DuplexStreaming_CompleteAfterReceivingStatusFails() + public void DuplexStreaming_CompleteAfterReceivingStatusSuceeds() { asyncCall.StartDuplexStreamingCall(); var requestStream = new ClientRequestStream(asyncCall); diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index ab194121a7a..ad690bd2ecd 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -254,17 +254,15 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(started); CheckSendingAllowed(allowFinished: true); - if (!disposed && !finished) - { - call.StartSendCloseFromClient(HandleSendCloseFromClientFinished); - } - else + if (disposed || finished) { // In case the call has already been finished by the serverside, - // the halfclose has already been done implicitly, so we only - // emit the notification for the completion delegate. - Task.Run(() => HandleSendCloseFromClientFinished(true)); + // the halfclose has already been done implicitly, so just return + // completed task here. + halfcloseRequested = true; + return Task.FromResult(null); } + call.StartSendCloseFromClient(HandleSendCloseFromClientFinished); halfcloseRequested = true; streamingWriteTcs = new TaskCompletionSource(); diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index df313cbb734..13f6309f6e5 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -158,7 +158,7 @@ namespace Grpc.Core.Internal if (readingDone) { // the last read that returns null or throws an exception is idempotent - // and maintain its state. + // and maintains its state. GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads."); return streamingReadTcs.Task; } From 6098848a3f7dade0691cb414c31007ef176ffca7 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 23 May 2016 14:56:14 -0400 Subject: [PATCH 09/15] allow short-circuiting the send operation --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 29 ++++++++++++++----- .../Grpc.Core/Internal/AsyncCallBase.cs | 20 ++++++------- .../Grpc.Core/Internal/AsyncCallServer.cs | 20 ++++++++++++- 3 files changed, 50 insertions(+), 19 deletions(-) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index ad690bd2ecd..dec6eafd46e 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -252,7 +252,7 @@ namespace Grpc.Core.Internal lock (myLock) { GrpcPreconditions.CheckState(started); - CheckSendingAllowed(allowFinished: true); + CheckSendPreconditionsClientSide(); if (disposed || finished) { @@ -437,17 +437,30 @@ namespace Grpc.Core.Internal } } - protected override void CheckSendingAllowed(bool allowFinished) + protected override Task CheckSendAllowedOrEarlyResult() { - base.CheckSendingAllowed(true); + CheckSendPreconditionsClientSide(); - // throwing RpcException if we already received status on client - // side makes the most sense. - // Note that this throws even for StatusCode.OK. - if (!allowFinished && finishedStatus.HasValue) + if (finishedStatus.HasValue) { - throw new RpcException(finishedStatus.Value.Status); + // throwing RpcException if we already received status on client + // side makes the most sense. + // Note that this throws even for StatusCode.OK. + // Writing after the call has finished is not a programming error because server can close + // the call anytime, so don't throw directly, but let the write task finish with an error. + var tcs = new TaskCompletionSource(); + tcs.SetException(new RpcException(finishedStatus.Value.Status)); + return tcs.Task; } + + return null; + } + + private void CheckSendPreconditionsClientSide() + { + CheckNotCancelled(); + GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); } /// diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 13f6309f6e5..d60876ddf37 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -136,7 +136,11 @@ namespace Grpc.Core.Internal lock (myLock) { GrpcPreconditions.CheckState(started); - CheckSendingAllowed(allowFinished: false); + var earlyResult = CheckSendAllowedOrEarlyResult(); + if (earlyResult != null) + { + return earlyResult; + } call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); @@ -212,15 +216,11 @@ namespace Grpc.Core.Internal { } - protected virtual void CheckSendingAllowed(bool allowFinished) - { - CheckNotCancelled(); - GrpcPreconditions.CheckState(!disposed || allowFinished); - - GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed."); - GrpcPreconditions.CheckState(!finished || allowFinished, "Already finished."); - GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time"); - } + /// + /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send + /// logic by directly returning the write operation result task. Normally, null is returned. + /// + protected abstract Task CheckSendAllowedOrEarlyResult(); protected void CheckNotCancelled() { diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index b5dca4290f3..a4f6e4d1b07 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -116,9 +116,15 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckNotNull(headers, "metadata"); + GrpcPreconditions.CheckState(started); GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call."); GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts."); - CheckSendingAllowed(allowFinished: false); + + var earlyResult = CheckSendAllowedOrEarlyResult(); + if (earlyResult != null) + { + return earlyResult; + } using (var metadataArray = MetadataArraySafeHandle.Create(headers)) { @@ -192,6 +198,18 @@ namespace Grpc.Core.Internal server.RemoveCallReference(this); } + protected override Task CheckSendAllowedOrEarlyResult() + { + CheckNotCancelled(); + GrpcPreconditions.CheckState(!disposed); + + GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed."); + GrpcPreconditions.CheckState(!finished, "Already finished."); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time"); + + return null; + } + /// /// Handles the server side close completion. /// From 56605efca630d693765ff2b10c253d8fbf5b0955 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 23 May 2016 15:04:42 -0400 Subject: [PATCH 10/15] adjust the tests to reflect the correct send behavior on client --- .../Grpc.Core.Tests/Internal/AsyncCallTest.cs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index b4dd2c107e1..303a878742a 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -33,7 +33,6 @@ using System; using System.Collections.Generic; -using System.Runtime.InteropServices; using System.Threading.Tasks; using Grpc.Core.Internal; @@ -211,7 +210,9 @@ namespace Grpc.Core.Internal.Tests new Metadata()); AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); - var ex = Assert.Throws(() => requestStream.WriteAsync("request1")); + + var writeTask = requestStream.WriteAsync("request1"); + var ex = Assert.ThrowsAsync(async () => await writeTask); Assert.AreEqual(Status.DefaultSuccess, ex.Status); } @@ -227,7 +228,9 @@ namespace Grpc.Core.Internal.Tests new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange); - var ex = Assert.Throws(() => requestStream.WriteAsync("request1")); + + var writeTask = requestStream.WriteAsync("request1"); + var ex = Assert.ThrowsAsync(async () => await writeTask); Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode); } @@ -275,6 +278,7 @@ namespace Grpc.Core.Internal.Tests asyncCall.Cancel(); Assert.IsTrue(fakeCall.IsCancelled); + // TODO: awaiting the writeTask should throw TaskCanceledException Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1")); fakeCall.UnaryResponseClientHandler(true, @@ -390,7 +394,8 @@ namespace Grpc.Core.Internal.Tests AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); - var ex = Assert.ThrowsAsync(async () => await requestStream.WriteAsync("request1")); + var writeTask = requestStream.WriteAsync("request1"); + var ex = Assert.ThrowsAsync(async () => await writeTask); Assert.AreEqual(Status.DefaultSuccess, ex.Status); } From 5c52f377bdd6f4cf0d16751d19ddada0c5bbceaa Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 23 May 2016 15:12:02 -0400 Subject: [PATCH 11/15] improve ordering of serverside send checks --- src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index a4f6e4d1b07..109c207c75c 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -201,11 +201,11 @@ namespace Grpc.Core.Internal protected override Task CheckSendAllowedOrEarlyResult() { CheckNotCancelled(); - GrpcPreconditions.CheckState(!disposed); GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed."); GrpcPreconditions.CheckState(!finished, "Already finished."); GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time"); + GrpcPreconditions.CheckState(!disposed); return null; } From 5468c27841a2b4ec26b28a293fcff84e84d7822f Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 23 May 2016 15:18:35 -0400 Subject: [PATCH 12/15] reorder members for readability --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 52 +++++++++++----------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index dec6eafd46e..ff70efc8b3e 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -336,6 +336,32 @@ namespace Grpc.Core.Internal get { return true; } } + protected override Task CheckSendAllowedOrEarlyResult() + { + CheckSendPreconditionsClientSide(); + + if (finishedStatus.HasValue) + { + // throwing RpcException if we already received status on client + // side makes the most sense. + // Note that this throws even for StatusCode.OK. + // Writing after the call has finished is not a programming error because server can close + // the call anytime, so don't throw directly, but let the write task finish with an error. + var tcs = new TaskCompletionSource(); + tcs.SetException(new RpcException(finishedStatus.Value.Status)); + return tcs.Task; + } + + return null; + } + + private void CheckSendPreconditionsClientSide() + { + CheckNotCancelled(); + GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); + } + private void Initialize(CompletionQueueSafeHandle cq) { using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize")) @@ -437,32 +463,6 @@ namespace Grpc.Core.Internal } } - protected override Task CheckSendAllowedOrEarlyResult() - { - CheckSendPreconditionsClientSide(); - - if (finishedStatus.HasValue) - { - // throwing RpcException if we already received status on client - // side makes the most sense. - // Note that this throws even for StatusCode.OK. - // Writing after the call has finished is not a programming error because server can close - // the call anytime, so don't throw directly, but let the write task finish with an error. - var tcs = new TaskCompletionSource(); - tcs.SetException(new RpcException(finishedStatus.Value.Status)); - return tcs.Task; - } - - return null; - } - - private void CheckSendPreconditionsClientSide() - { - CheckNotCancelled(); - GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); - GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); - } - /// /// Handles receive status completion for calls with streaming response. /// From d9108331756823ba8c1af051d174bb416a87fdb1 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 23 May 2016 15:58:48 -0400 Subject: [PATCH 13/15] clientside writes should finish with TaskCanceledException if cancel was previously requested --- .../Grpc.Core.Tests/Internal/AsyncCallTest.cs | 12 +++++---- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 26 ++++++++++++++++--- .../Grpc.Core/Internal/AsyncCallBase.cs | 8 ------ .../Grpc.Core/Internal/AsyncCallServer.cs | 2 -- 4 files changed, 29 insertions(+), 19 deletions(-) diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index 303a878742a..81897f8c772 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -270,7 +270,7 @@ namespace Grpc.Core.Internal.Tests } [Test] - public void ClientStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException() + public void ClientStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException() { var resultTask = asyncCall.ClientStreamingCallAsync(); var requestStream = new ClientRequestStream(asyncCall); @@ -278,8 +278,8 @@ namespace Grpc.Core.Internal.Tests asyncCall.Cancel(); Assert.IsTrue(fakeCall.IsCancelled); - // TODO: awaiting the writeTask should throw TaskCanceledException - Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1")); + var writeTask = requestStream.WriteAsync("request1"); + Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask); fakeCall.UnaryResponseClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled), @@ -416,7 +416,7 @@ namespace Grpc.Core.Internal.Tests } [Test] - public void DuplexStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException() + public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException() { asyncCall.StartDuplexStreamingCall(); var requestStream = new ClientRequestStream(asyncCall); @@ -424,7 +424,9 @@ namespace Grpc.Core.Internal.Tests asyncCall.Cancel(); Assert.IsTrue(fakeCall.IsCancelled); - Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1")); + + var writeTask = requestStream.WriteAsync("request1"); + Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask); var readTask = responseStream.MoveNext(); fakeCall.ReceivedMessageHandler(true, null); diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index ff70efc8b3e..8652b297c8a 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -252,7 +252,12 @@ namespace Grpc.Core.Internal lock (myLock) { GrpcPreconditions.CheckState(started); - CheckSendPreconditionsClientSide(); + + var earlyResult = CheckSendPreconditionsClientSide(); + if (earlyResult != null) + { + return earlyResult; + } if (disposed || finished) { @@ -338,7 +343,11 @@ namespace Grpc.Core.Internal protected override Task CheckSendAllowedOrEarlyResult() { - CheckSendPreconditionsClientSide(); + var earlyResult = CheckSendPreconditionsClientSide(); + if (earlyResult != null) + { + return earlyResult; + } if (finishedStatus.HasValue) { @@ -355,11 +364,20 @@ namespace Grpc.Core.Internal return null; } - private void CheckSendPreconditionsClientSide() + private Task CheckSendPreconditionsClientSide() { - CheckNotCancelled(); + if (cancelRequested) + { + // Return a cancelled task. + var tcs = new TaskCompletionSource(); + tcs.SetCanceled(); + return tcs.Task; + } + GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); + + return null; } private void Initialize(CompletionQueueSafeHandle cq) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index d60876ddf37..5f561daedd7 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -222,14 +222,6 @@ namespace Grpc.Core.Internal /// protected abstract Task CheckSendAllowedOrEarlyResult(); - protected void CheckNotCancelled() - { - if (cancelRequested) - { - throw new OperationCanceledException("Remote call has been cancelled."); - } - } - protected byte[] UnsafeSerialize(TWrite msg) { using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.UnsafeSerialize")) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 109c207c75c..d1bb80762ef 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -200,8 +200,6 @@ namespace Grpc.Core.Internal protected override Task CheckSendAllowedOrEarlyResult() { - CheckNotCancelled(); - GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed."); GrpcPreconditions.CheckState(!finished, "Already finished."); GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time"); From 6854c70c94af37f2504018d4274e9c1d107b98ed Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 23 May 2016 16:06:01 -0400 Subject: [PATCH 14/15] reorder clientside send preconditions --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 8652b297c8a..10c1295e9a7 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -366,6 +366,9 @@ namespace Grpc.Core.Internal private Task CheckSendPreconditionsClientSide() { + GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); + if (cancelRequested) { // Return a cancelled task. @@ -374,9 +377,6 @@ namespace Grpc.Core.Internal return tcs.Task; } - GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); - GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); - return null; } From f581659215916398b9a304fe44787d145104030d Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 23 May 2016 16:19:28 -0400 Subject: [PATCH 15/15] add TODO --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 10c1295e9a7..8669f0f7020 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -438,6 +438,7 @@ namespace Grpc.Core.Internal /// private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders) { + // TODO(jtattermusch): handle success==false responseHeadersTcs.SetResult(responseHeaders); }