make SendStatusFromServer independent on WriteAsync

pull/6416/head
Jan Tattermusch 9 years ago
parent ce60d8e7a4
commit b32e29f0a2
  1. 9
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  2. 19
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  3. 12
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  4. 7
      src/csharp/Grpc.Core/Internal/ServerResponseStream.cs

@ -69,6 +69,7 @@ namespace Grpc.Core.Internal
protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null. protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null. protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool readingDone; // True if last read (i.e. read with null payload) was already received. protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
protected bool halfcloseRequested; // True if send close have been initiated. protected bool halfcloseRequested; // True if send close have been initiated.
@ -328,22 +329,18 @@ namespace Grpc.Core.Internal
/// </summary> /// </summary>
protected void HandleSendStatusFromServerFinished(bool success) protected void HandleSendStatusFromServerFinished(bool success)
{ {
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock) lock (myLock)
{ {
origCompletionDelegate = sendCompletionDelegate;
sendCompletionDelegate = null;
ReleaseResourcesIfPossible(); ReleaseResourcesIfPossible();
} }
if (!success) if (!success)
{ {
FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Error sending status from server.")); sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server."));
} }
else else
{ {
FireCompletion(origCompletionDelegate, null, null); sendStatusFromServerTcs.SetResult(null);
} }
} }

@ -136,24 +136,24 @@ namespace Grpc.Core.Internal
} }
/// <summary> /// <summary>
/// Sends call result status, also indicating server is done with streaming responses. /// Sends call result status, indicating we are done with writes.
/// Only one pending send action is allowed at any given time. /// Sending a status different from StatusCode.OK will also implicitly cancel the call.
/// completionDelegate is called when the operation finishes.
/// </summary> /// </summary>
public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate) public Task SendStatusFromServerAsync(Status status, Metadata trailers)
{ {
lock (myLock) lock (myLock)
{ {
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); GrpcPreconditions.CheckState(started);
CheckSendingAllowed(allowFinished: false); GrpcPreconditions.CheckState(!disposed);
GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once.");
using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{ {
call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent); call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent);
} }
halfcloseRequested = true; halfcloseRequested = true;
readingDone = true; sendStatusFromServerTcs = new TaskCompletionSource<object>();
sendCompletionDelegate = completionDelegate; return sendStatusFromServerTcs.Task;
} }
} }
@ -198,12 +198,13 @@ namespace Grpc.Core.Internal
/// </summary> /// </summary>
private void HandleFinishedServerside(bool success, bool cancelled) private void HandleFinishedServerside(bool success, bool cancelled)
{ {
// NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER,
// success will be always set to true.
lock (myLock) lock (myLock)
{ {
finished = true; finished = true;
ReleaseResourcesIfPossible(); ReleaseResourcesIfPossible();
} }
// TODO(jtattermusch): handle error
if (cancelled) if (cancelled)
{ {

@ -93,7 +93,7 @@ namespace Grpc.Core.Internal
} }
try try
{ {
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
@ -149,7 +149,7 @@ namespace Grpc.Core.Internal
try try
{ {
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
@ -209,7 +209,7 @@ namespace Grpc.Core.Internal
try try
{ {
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
@ -260,7 +260,7 @@ namespace Grpc.Core.Internal
} }
try try
{ {
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
@ -282,9 +282,7 @@ namespace Grpc.Core.Internal
asyncCall.Initialize(newRpc.Call); asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync(); var finishedTask = asyncCall.ServerSideCallAsync();
var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall); await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false); await finishedTask.ConfigureAwait(false);
} }
} }

@ -57,13 +57,6 @@ namespace Grpc.Core.Internal
return taskSource.Task; return taskSource.Task;
} }
public Task WriteStatusAsync(Status status, Metadata trailers)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
return taskSource.Task;
}
public Task WriteResponseHeadersAsync(Metadata responseHeaders) public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{ {
var taskSource = new AsyncCompletionTaskSource<object>(); var taskSource = new AsyncCompletionTaskSource<object>();

Loading…
Cancel
Save