diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index abacfabadb6..18dbe87734b 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -69,6 +69,7 @@ namespace Grpc.Core.Internal protected AsyncCompletionDelegate sendCompletionDelegate; // Completion of a pending send or sendclose if not null. protected TaskCompletionSource streamingReadTcs; // Completion of a pending streaming read if not null. + protected TaskCompletionSource sendStatusFromServerTcs; protected bool readingDone; // True if last read (i.e. read with null payload) was already received. protected bool halfcloseRequested; // True if send close have been initiated. @@ -328,22 +329,18 @@ namespace Grpc.Core.Internal /// protected void HandleSendStatusFromServerFinished(bool success) { - AsyncCompletionDelegate origCompletionDelegate = null; lock (myLock) { - origCompletionDelegate = sendCompletionDelegate; - sendCompletionDelegate = null; - ReleaseResourcesIfPossible(); } if (!success) { - FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Error sending status from server.")); + sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server.")); } else { - FireCompletion(origCompletionDelegate, null, null); + sendStatusFromServerTcs.SetResult(null); } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index efcf4ea7fea..94f49bd8f2a 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -136,24 +136,24 @@ namespace Grpc.Core.Internal } /// - /// Sends call result status, also indicating server is done with streaming responses. - /// Only one pending send action is allowed at any given time. - /// completionDelegate is called when the operation finishes. + /// Sends call result status, indicating we are done with writes. + /// Sending a status different from StatusCode.OK will also implicitly cancel the call. /// - public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate completionDelegate) + public Task SendStatusFromServerAsync(Status status, Metadata trailers) { lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); - CheckSendingAllowed(allowFinished: false); + GrpcPreconditions.CheckState(started); + GrpcPreconditions.CheckState(!disposed); + GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once."); using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent); } halfcloseRequested = true; - readingDone = true; - sendCompletionDelegate = completionDelegate; + sendStatusFromServerTcs = new TaskCompletionSource(); + return sendStatusFromServerTcs.Task; } } @@ -198,12 +198,13 @@ namespace Grpc.Core.Internal /// private void HandleFinishedServerside(bool success, bool cancelled) { + // NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER, + // success will be always set to true. lock (myLock) { finished = true; ReleaseResourcesIfPossible(); } - // TODO(jtattermusch): handle error if (cancelled) { diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 1f83e51548e..bf9df9f783b 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -93,7 +93,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -149,7 +149,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -209,7 +209,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -260,7 +260,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); } catch (OperationCanceledException) { @@ -282,9 +282,7 @@ namespace Grpc.Core.Internal asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); - var responseStream = new ServerResponseStream(asyncCall); - - await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false); await finishedTask.ConfigureAwait(false); } } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index 03e39efc024..ecfee0bfddb 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -57,13 +57,6 @@ namespace Grpc.Core.Internal return taskSource.Task; } - public Task WriteStatusAsync(Status status, Metadata trailers) - { - var taskSource = new AsyncCompletionTaskSource(); - call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate); - return taskSource.Task; - } - public Task WriteResponseHeadersAsync(Metadata responseHeaders) { var taskSource = new AsyncCompletionTaskSource();