Merge pull request #6067 from jtattermusch/halfclose_after_close

Allow halfclose after close on clients
pull/6033/head^2
Jan Tattermusch 9 years ago
commit 1b7c0a2c5c
  1. 1
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  2. 97
      src/csharp/Grpc.Core.Tests/HalfcloseTest.cs
  3. 14
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  4. 38
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  5. 6
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  6. 1
      src/csharp/tests.json

@ -93,6 +93,7 @@
<Compile Include="MetadataTest.cs" />
<Compile Include="PerformanceTest.cs" />
<Compile Include="SanityTest.cs" />
<Compile Include="HalfcloseTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -0,0 +1,97 @@
#region Copyright notice and license
// Copyright 2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class HalfcloseTest
{
MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
helper = new MockServiceHelper();
server = helper.GetServer();
server.Start();
channel = helper.GetChannel();
}
[TearDown]
public void Cleanup()
{
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
/// <summary>
/// For client streaming and duplex streaming calls, if server does a full close
/// before we halfclose the request stream, an attempt to halfclose
/// (complete the request stream) shouldn't be treated as an error.
/// </summary>
[Test]
public async Task HalfcloseAfterFullclose_ClientStreamingCall()
{
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
return "PASS";
});
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
// make sure server has fullclosed on us
Assert.AreEqual("PASS", await call.ResponseAsync);
// sending close from client should be still fine because server can finish
// the call anytime and we cannot do anything about it on the client side.
await call.RequestStream.CompleteAsync();
// Second attempt to close from client is not allowed.
Assert.Throws(typeof(InvalidOperationException), async () => await call.RequestStream.CompleteAsync());
}
}
}

@ -258,9 +258,19 @@ namespace Grpc.Core.Internal
lock (myLock)
{
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
CheckSendingAllowed(allowFinished: true);
call.StartSendCloseFromClient(HandleHalfclosed);
if (!disposed && !finished)
{
call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
}
else
{
// In case the call has already been finished by the serverside,
// the halfclose has already been done implicitly, so we only
// emit the notification for the completion delegate.
Task.Run(() => HandleSendCloseFromClientFinished(true));
}
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;

@ -136,7 +136,7 @@ namespace Grpc.Core.Internal
lock (myLock)
{
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
CheckSendingAllowed(allowFinished: false);
call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
@ -202,14 +202,14 @@ namespace Grpc.Core.Internal
{
}
protected void CheckSendingAllowed()
protected void CheckSendingAllowed(bool allowFinished)
{
GrpcPreconditions.CheckState(started);
CheckNotCancelled();
GrpcPreconditions.CheckState(!disposed);
GrpcPreconditions.CheckState(!disposed || allowFinished);
GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
GrpcPreconditions.CheckState(!finished, "Already finished.");
GrpcPreconditions.CheckState(!finished || allowFinished, "Already finished.");
GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
@ -294,9 +294,33 @@ namespace Grpc.Core.Internal
}
/// <summary>
/// Handles halfclose completion.
/// Handles halfclose (send close from client) completion.
/// </summary>
protected void HandleSendCloseFromClientFinished(bool success)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
{
origCompletionDelegate = sendCompletionDelegate;
sendCompletionDelegate = null;
ReleaseResourcesIfPossible();
}
if (!success)
{
FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Sending close from client has failed."));
}
else
{
FireCompletion(origCompletionDelegate, null, null);
}
}
/// <summary>
/// Handles send status from server completion.
/// </summary>
protected void HandleHalfclosed(bool success)
protected void HandleSendStatusFromServerFinished(bool success)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
@ -309,7 +333,7 @@ namespace Grpc.Core.Internal
if (!success)
{
FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Halfclose failed"));
FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Error sending status from server."));
}
else
{

@ -113,7 +113,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
CheckSendingAllowed();
CheckSendingAllowed(allowFinished: false);
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
@ -137,11 +137,11 @@ namespace Grpc.Core.Internal
lock (myLock)
{
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
CheckSendingAllowed(allowFinished: false);
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, !initialMetadataSent);
call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent);
}
halfcloseRequested = true;
readingDone = true;

@ -21,6 +21,7 @@
"Grpc.Core.Tests.CompressionTest",
"Grpc.Core.Tests.ContextPropagationTest",
"Grpc.Core.Tests.GrpcEnvironmentTest",
"Grpc.Core.Tests.HalfcloseTest",
"Grpc.Core.Tests.MarshallingErrorsTest",
"Grpc.Core.Tests.MetadataTest",
"Grpc.Core.Tests.NUnitVersionTest",

Loading…
Cancel
Save