diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index 8c4b92fbada..26a1a683ba1 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -165,27 +165,6 @@ namespace Grpc.Core.Tests }).Wait(); } - [Test] - public void ClientStreamingCall_ServerHandlerThrows() - { - Task.Run(async () => - { - var call = new Call(ServiceName, ConcatAndEchoMethod, channel, Metadata.Empty); - var callResult = Calls.AsyncClientStreamingCall(call, CancellationToken.None); - // TODO(jtattermusch): if we send "A", "THROW", "C", server hangs. - await callResult.RequestStream.WriteAll(new string[] { "A", "B", "THROW" }); - - try - { - await callResult.Result; - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode); - } - }).Wait(); - } - [Test] public void ClientStreamingCall_CancelAfterBegin() { @@ -195,6 +174,9 @@ namespace Grpc.Core.Tests var cts = new CancellationTokenSource(); var callResult = Calls.AsyncClientStreamingCall(call, cts.Token); + + // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it. + await Task.Delay(1000); cts.Cancel(); try @@ -260,7 +242,9 @@ namespace Grpc.Core.Tests } result += request; }); - return result; + // simulate processing takes some time. + await Task.Delay(250); + return result; } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index b911cdcc874..7cf0f6ff847 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -180,7 +180,8 @@ namespace Grpc.Core.Internal { if (!disposed && call != null) { - if (halfclosed && readingDone && finished) + bool noMoreSendCompletions = halfclosed || (cancelRequested && sendCompletionDelegate == null); + if (noMoreSendCompletions && readingDone && finished) { ReleaseResources(); return true; @@ -207,8 +208,9 @@ namespace Grpc.Core.Internal protected void CheckSendingAllowed() { Preconditions.CheckState(started); - Preconditions.CheckState(!disposed); Preconditions.CheckState(!errorOccured); + CheckNotCancelled(); + Preconditions.CheckState(!disposed); Preconditions.CheckState(!halfcloseRequested, "Already halfclosed."); Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); @@ -221,7 +223,14 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!errorOccured); Preconditions.CheckState(!readingDone, "Stream has already been closed."); - Preconditions.CheckState(readCompletionDelegate == null, "Only one write can be pending at a time"); + Preconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time"); + } + + protected void CheckNotCancelled() { + if (cancelRequested) + { + throw new OperationCanceledException("Remote call has been cancelled."); + } } protected byte[] UnsafeSerialize(TWrite msg) @@ -292,6 +301,8 @@ namespace Grpc.Core.Internal }); } + + /// /// Handles send completion. /// diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 4775f2d07be..3c66c67dcce 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -123,18 +123,23 @@ namespace Grpc.Core.Internal /// private void HandleFinishedServerside(bool wasError, BatchContextSafeHandleNotOwned ctx) { + bool cancelled = ctx.GetReceivedCloseOnServerCancelled(); + lock (myLock) { finished = true; - if (readCompletionDelegate == null) + if (cancelled) { - // allow disposal of native call - readingDone = true; + // Once we cancel, we don't have to care that much + // about reads and writes. + Cancel(); } ReleaseResourcesIfPossible(); } + // TODO(jtattermusch): check if call was cancelled. + // TODO: handle error ... finishedServersideTcs.SetResult(null); diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs index 3c54753756e..b562abaa7a9 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs @@ -61,6 +61,9 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char* + [DllImport("grpc_csharp_ext.dll")] + static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandleNotOwned ctx); + public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false) { SetHandle(handle); @@ -94,5 +97,10 @@ namespace Grpc.Core.Internal { return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this)); } + + public bool GetReceivedCloseOnServerCancelled() + { + return grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0; + } } } \ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 0416eada348..01b2a113699 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -80,7 +80,14 @@ namespace Grpc.Core.Internal Console.WriteLine("Exception occured in handler: " + e); status = HandlerUtils.StatusFromException(e); } - await responseStream.WriteStatus(status); + try + { + await responseStream.WriteStatus(status); + } + catch (OperationCanceledException) + { + // Call has been already cancelled. + } await finishedTask; } } @@ -121,7 +128,15 @@ namespace Grpc.Core.Internal Console.WriteLine("Exception occured in handler: " + e); status = HandlerUtils.StatusFromException(e); } - await responseStream.WriteStatus(status); + + try + { + await responseStream.WriteStatus(status); + } + catch (OperationCanceledException) + { + // Call has been already cancelled. + } await finishedTask; } } @@ -151,15 +166,30 @@ namespace Grpc.Core.Internal Status status = Status.DefaultSuccess; try { - var result = await handler(requestStream); - await responseStream.Write(result); - } + var result = await handler(requestStream); + try + { + await responseStream.Write(result); + } + catch (OperationCanceledException) + { + status = Status.DefaultCancelled; + } + } catch (Exception e) { Console.WriteLine("Exception occured in handler: " + e); status = HandlerUtils.StatusFromException(e); } - await responseStream.WriteStatus(status); + + try + { + await responseStream.WriteStatus(status); + } + catch (OperationCanceledException) + { + // Call has been already cancelled. + } await finishedTask; } } @@ -196,7 +226,14 @@ namespace Grpc.Core.Internal Console.WriteLine("Exception occured in handler: " + e); status = HandlerUtils.StatusFromException(e); } - await responseStream.WriteStatus(status); + try + { + await responseStream.WriteStatus(status); + } + catch (OperationCanceledException) + { + // Call has been already cancelled. + } await finishedTask; } } diff --git a/src/csharp/Grpc.Core/Status.cs b/src/csharp/Grpc.Core/Status.cs index b5881706941..754f6cb3cab 100644 --- a/src/csharp/Grpc.Core/Status.cs +++ b/src/csharp/Grpc.Core/Status.cs @@ -44,6 +44,11 @@ namespace Grpc.Core /// public static readonly Status DefaultSuccess = new Status(StatusCode.OK, ""); + /// + /// Default result of a cancelled RPC. StatusCode=Cancelled, empty details message. + /// + public static readonly Status DefaultCancelled = new Status(StatusCode.Cancelled, ""); + readonly StatusCode statusCode; readonly string detail; diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index 440702d06f4..a433659a086 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -366,6 +366,8 @@ namespace Grpc.IntegrationTesting var cts = new CancellationTokenSource(); var call = client.StreamingInputCall(cts.Token); + // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it. + await Task.Delay(1000); cts.Cancel(); try diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index fb8b75798d3..a8cc1b29a45 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -277,6 +277,12 @@ grpcsharp_batch_context_server_rpc_new_method( return ctx->server_rpc_new.call_details.method; } +GPR_EXPORT gpr_int32 GPR_CALLTYPE +grpcsharp_batch_context_recv_close_on_server_cancelled( + const grpcsharp_batch_context *ctx) { + return (gpr_int32) ctx->recv_close_on_server_cancelled; +} + /* Init & shutdown */ GPR_EXPORT void GPR_CALLTYPE grpcsharp_init(void) { grpc_init(); }