Fixes for C# cancellation support

pull/1445/head
Jan Tattermusch 10 years ago
parent 1b54fcf31b
commit 8c2dd9d864
  1. 26
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  2. 17
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  3. 11
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  4. 8
      src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs
  5. 37
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  6. 5
      src/csharp/Grpc.Core/Status.cs
  7. 2
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs
  8. 6
      src/csharp/ext/grpc_csharp_ext.c

@ -165,27 +165,6 @@ namespace Grpc.Core.Tests
}).Wait(); }).Wait();
} }
[Test]
public void ClientStreamingCall_ServerHandlerThrows()
{
Task.Run(async () =>
{
var call = new Call<string, string>(ServiceName, ConcatAndEchoMethod, channel, Metadata.Empty);
var callResult = Calls.AsyncClientStreamingCall(call, CancellationToken.None);
// TODO(jtattermusch): if we send "A", "THROW", "C", server hangs.
await callResult.RequestStream.WriteAll(new string[] { "A", "B", "THROW" });
try
{
await callResult.Result;
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
}
}).Wait();
}
[Test] [Test]
public void ClientStreamingCall_CancelAfterBegin() public void ClientStreamingCall_CancelAfterBegin()
{ {
@ -195,6 +174,9 @@ namespace Grpc.Core.Tests
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
var callResult = Calls.AsyncClientStreamingCall(call, cts.Token); var callResult = Calls.AsyncClientStreamingCall(call, cts.Token);
// TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
await Task.Delay(1000);
cts.Cancel(); cts.Cancel();
try try
@ -260,6 +242,8 @@ namespace Grpc.Core.Tests
} }
result += request; result += request;
}); });
// simulate processing takes some time.
await Task.Delay(250);
return result; return result;
} }
} }

@ -180,7 +180,8 @@ namespace Grpc.Core.Internal
{ {
if (!disposed && call != null) if (!disposed && call != null)
{ {
if (halfclosed && readingDone && finished) bool noMoreSendCompletions = halfclosed || (cancelRequested && sendCompletionDelegate == null);
if (noMoreSendCompletions && readingDone && finished)
{ {
ReleaseResources(); ReleaseResources();
return true; return true;
@ -207,8 +208,9 @@ namespace Grpc.Core.Internal
protected void CheckSendingAllowed() protected void CheckSendingAllowed()
{ {
Preconditions.CheckState(started); Preconditions.CheckState(started);
Preconditions.CheckState(!disposed);
Preconditions.CheckState(!errorOccured); Preconditions.CheckState(!errorOccured);
CheckNotCancelled();
Preconditions.CheckState(!disposed);
Preconditions.CheckState(!halfcloseRequested, "Already halfclosed."); Preconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
@ -221,7 +223,14 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!errorOccured); Preconditions.CheckState(!errorOccured);
Preconditions.CheckState(!readingDone, "Stream has already been closed."); Preconditions.CheckState(!readingDone, "Stream has already been closed.");
Preconditions.CheckState(readCompletionDelegate == null, "Only one write can be pending at a time"); Preconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time");
}
protected void CheckNotCancelled() {
if (cancelRequested)
{
throw new OperationCanceledException("Remote call has been cancelled.");
}
} }
protected byte[] UnsafeSerialize(TWrite msg) protected byte[] UnsafeSerialize(TWrite msg)
@ -292,6 +301,8 @@ namespace Grpc.Core.Internal
}); });
} }
/// <summary> /// <summary>
/// Handles send completion. /// Handles send completion.
/// </summary> /// </summary>

@ -123,18 +123,23 @@ namespace Grpc.Core.Internal
/// </summary> /// </summary>
private void HandleFinishedServerside(bool wasError, BatchContextSafeHandleNotOwned ctx) private void HandleFinishedServerside(bool wasError, BatchContextSafeHandleNotOwned ctx)
{ {
bool cancelled = ctx.GetReceivedCloseOnServerCancelled();
lock (myLock) lock (myLock)
{ {
finished = true; finished = true;
if (readCompletionDelegate == null) if (cancelled)
{ {
// allow disposal of native call // Once we cancel, we don't have to care that much
readingDone = true; // about reads and writes.
Cancel();
} }
ReleaseResourcesIfPossible(); ReleaseResourcesIfPossible();
} }
// TODO(jtattermusch): check if call was cancelled.
// TODO: handle error ... // TODO: handle error ...
finishedServersideTcs.SetResult(null); finishedServersideTcs.SetResult(null);

@ -61,6 +61,9 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char* static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char*
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandleNotOwned ctx);
public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false) public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false)
{ {
SetHandle(handle); SetHandle(handle);
@ -94,5 +97,10 @@ namespace Grpc.Core.Internal
{ {
return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this)); return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this));
} }
public bool GetReceivedCloseOnServerCancelled()
{
return grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0;
}
} }
} }

@ -80,7 +80,14 @@ namespace Grpc.Core.Internal
Console.WriteLine("Exception occured in handler: " + e); Console.WriteLine("Exception occured in handler: " + e);
status = HandlerUtils.StatusFromException(e); status = HandlerUtils.StatusFromException(e);
} }
try
{
await responseStream.WriteStatus(status); await responseStream.WriteStatus(status);
}
catch (OperationCanceledException)
{
// Call has been already cancelled.
}
await finishedTask; await finishedTask;
} }
} }
@ -121,7 +128,15 @@ namespace Grpc.Core.Internal
Console.WriteLine("Exception occured in handler: " + e); Console.WriteLine("Exception occured in handler: " + e);
status = HandlerUtils.StatusFromException(e); status = HandlerUtils.StatusFromException(e);
} }
try
{
await responseStream.WriteStatus(status); await responseStream.WriteStatus(status);
}
catch (OperationCanceledException)
{
// Call has been already cancelled.
}
await finishedTask; await finishedTask;
} }
} }
@ -152,14 +167,29 @@ namespace Grpc.Core.Internal
try try
{ {
var result = await handler(requestStream); var result = await handler(requestStream);
try
{
await responseStream.Write(result); await responseStream.Write(result);
} }
catch (OperationCanceledException)
{
status = Status.DefaultCancelled;
}
}
catch (Exception e) catch (Exception e)
{ {
Console.WriteLine("Exception occured in handler: " + e); Console.WriteLine("Exception occured in handler: " + e);
status = HandlerUtils.StatusFromException(e); status = HandlerUtils.StatusFromException(e);
} }
try
{
await responseStream.WriteStatus(status); await responseStream.WriteStatus(status);
}
catch (OperationCanceledException)
{
// Call has been already cancelled.
}
await finishedTask; await finishedTask;
} }
} }
@ -196,7 +226,14 @@ namespace Grpc.Core.Internal
Console.WriteLine("Exception occured in handler: " + e); Console.WriteLine("Exception occured in handler: " + e);
status = HandlerUtils.StatusFromException(e); status = HandlerUtils.StatusFromException(e);
} }
try
{
await responseStream.WriteStatus(status); await responseStream.WriteStatus(status);
}
catch (OperationCanceledException)
{
// Call has been already cancelled.
}
await finishedTask; await finishedTask;
} }
} }

@ -44,6 +44,11 @@ namespace Grpc.Core
/// </summary> /// </summary>
public static readonly Status DefaultSuccess = new Status(StatusCode.OK, ""); public static readonly Status DefaultSuccess = new Status(StatusCode.OK, "");
/// <summary>
/// Default result of a cancelled RPC. StatusCode=Cancelled, empty details message.
/// </summary>
public static readonly Status DefaultCancelled = new Status(StatusCode.Cancelled, "");
readonly StatusCode statusCode; readonly StatusCode statusCode;
readonly string detail; readonly string detail;

@ -366,6 +366,8 @@ namespace Grpc.IntegrationTesting
var cts = new CancellationTokenSource(); var cts = new CancellationTokenSource();
var call = client.StreamingInputCall(cts.Token); var call = client.StreamingInputCall(cts.Token);
// TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
await Task.Delay(1000);
cts.Cancel(); cts.Cancel();
try try

@ -277,6 +277,12 @@ grpcsharp_batch_context_server_rpc_new_method(
return ctx->server_rpc_new.call_details.method; return ctx->server_rpc_new.call_details.method;
} }
GPR_EXPORT gpr_int32 GPR_CALLTYPE
grpcsharp_batch_context_recv_close_on_server_cancelled(
const grpcsharp_batch_context *ctx) {
return (gpr_int32) ctx->recv_close_on_server_cancelled;
}
/* Init & shutdown */ /* Init & shutdown */
GPR_EXPORT void GPR_CALLTYPE grpcsharp_init(void) { grpc_init(); } GPR_EXPORT void GPR_CALLTYPE grpcsharp_init(void) { grpc_init(); }

Loading…
Cancel
Save