Merge pull request #6754 from jtattermusch/csharp_better_shutdown

C#: don't require Channel.ShutdownAsync() and Server.ShutdownAsync() to be able to exit.
pull/6758/head^2
Jan Tattermusch 9 years ago
commit d941fd8786
  1. 90
      src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs
  2. 4
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  3. 12
      src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
  4. 4
      src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
  5. 2
      src/csharp/Grpc.Core.Tests/PInvokeTest.cs
  6. 57
      src/csharp/Grpc.Core.Tests/ShutdownHookClientTest.cs
  7. 69
      src/csharp/Grpc.Core.Tests/ShutdownHookPendingCallTest.cs
  8. 58
      src/csharp/Grpc.Core.Tests/ShutdownHookServerTest.cs
  9. 4
      src/csharp/Grpc.Core/Channel.cs
  10. 121
      src/csharp/Grpc.Core/GrpcEnvironment.cs
  11. 28
      src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
  12. 81
      src/csharp/Grpc.Core/Server.cs
  13. 4
      src/csharp/tests.json

@ -0,0 +1,90 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class AppDomainUnloadTest
{
[Test]
public void AppDomainUnloadHookCanCleanupAbandonedCall()
{
var setup = new AppDomainSetup
{
ApplicationBase = AppDomain.CurrentDomain.BaseDirectory
};
var childDomain = AppDomain.CreateDomain("test", null, setup);
var remoteObj = childDomain.CreateInstance(typeof(AppDomainTestClass).Assembly.GetName().Name, typeof(AppDomainTestClass).FullName);
// Try to unload the appdomain once we've created a server and a channel inside the appdomain.
AppDomain.Unload(childDomain);
}
public class AppDomainTestClass
{
const string Host = "127.0.0.1";
/// <summary>
/// Creates a server and a channel and initiates a call. The code is invoked from inside of an AppDomain
/// to test if AppDomain.Unload() work if Grpc is being used.
/// </summary>
public AppDomainTestClass()
{
var helper = new MockServiceHelper(Host);
var server = helper.GetServer();
server.Start();
var channel = helper.GetChannel();
var readyToShutdown = new TaskCompletionSource<object>();
helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
{
readyToShutdown.SetResult(null);
await requestStream.ToListAsync();
});
var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall());
readyToShutdown.Task.Wait(); // make sure handler is running
}
}
}
}

@ -86,6 +86,10 @@
<Compile Include="NUnitMain.cs" />
<Compile Include="Internal\FakeNativeCall.cs" />
<Compile Include="Internal\AsyncCallServerTest.cs" />
<Compile Include="ShutdownHookServerTest.cs" />
<Compile Include="ShutdownHookPendingCallTest.cs" />
<Compile Include="ShutdownHookClientTest.cs" />
<Compile Include="AppDomainUnloadTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -49,7 +49,7 @@ namespace Grpc.Core.Tests
{
Assert.IsNotNull(env.CompletionQueues.ElementAt(i));
}
GrpcEnvironment.Release();
GrpcEnvironment.ReleaseAsync().Wait();
}
[Test]
@ -58,8 +58,8 @@ namespace Grpc.Core.Tests
var env1 = GrpcEnvironment.AddRef();
var env2 = GrpcEnvironment.AddRef();
Assert.AreSame(env1, env2);
GrpcEnvironment.Release();
GrpcEnvironment.Release();
GrpcEnvironment.ReleaseAsync().Wait();
GrpcEnvironment.ReleaseAsync().Wait();
}
[Test]
@ -68,10 +68,10 @@ namespace Grpc.Core.Tests
Assert.AreEqual(0, GrpcEnvironment.GetRefCount());
var env1 = GrpcEnvironment.AddRef();
GrpcEnvironment.Release();
GrpcEnvironment.ReleaseAsync().Wait();
var env2 = GrpcEnvironment.AddRef();
GrpcEnvironment.Release();
GrpcEnvironment.ReleaseAsync().Wait();
Assert.AreNotSame(env1, env2);
}
@ -80,7 +80,7 @@ namespace Grpc.Core.Tests
public void ReleaseWithoutAddRef()
{
Assert.AreEqual(0, GrpcEnvironment.GetRefCount());
Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release());
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await GrpcEnvironment.ReleaseAsync());
}
[Test]

@ -48,7 +48,7 @@ namespace Grpc.Core.Internal.Tests
GrpcEnvironment.AddRef();
var cq = CompletionQueueSafeHandle.Create();
cq.Dispose();
GrpcEnvironment.Release();
GrpcEnvironment.ReleaseAsync().Wait();
}
[Test]
@ -59,7 +59,7 @@ namespace Grpc.Core.Internal.Tests
cq.Shutdown();
var ev = cq.Next();
cq.Dispose();
GrpcEnvironment.Release();
GrpcEnvironment.ReleaseAsync().Wait();
Assert.AreEqual(CompletionQueueEvent.CompletionType.Shutdown, ev.type);
Assert.AreNotEqual(IntPtr.Zero, ev.success);
Assert.AreEqual(IntPtr.Zero, ev.tag);

@ -65,7 +65,7 @@ namespace Grpc.Core.Tests
cq.Dispose();
});
GrpcEnvironment.Release();
GrpcEnvironment.ReleaseAsync().Wait();
}
/// <summary>

@ -0,0 +1,57 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class ShutdownHookClientTest
{
const string Host = "127.0.0.1";
[Test]
public void ProcessExitHookCanCleanupAbandonedChannels()
{
var channel = new Channel(Host, 1000, ChannelCredentials.Insecure);
var channel2 = new Channel(Host, 1001, ChannelCredentials.Insecure);
}
}
}

@ -0,0 +1,69 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class ShutdownHookPendingCallTest
{
const string Host = "127.0.0.1";
[Test]
public void ProcessExitHookCanCleanupAbandonedCall()
{
var helper = new MockServiceHelper(Host);
var server = helper.GetServer();
server.Start();
var channel = helper.GetChannel();
var readyToShutdown = new TaskCompletionSource<object>();
helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
{
readyToShutdown.SetResult(null);
await requestStream.ToListAsync();
});
var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall());
readyToShutdown.Task.Wait(); // make sure handler is running
}
}
}

@ -0,0 +1,58 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class ShutdownHookServerTest
{
const string Host = "127.0.0.1";
[Test]
public void ProcessExitHookCanCleanupAbandonedServers()
{
var helper = new MockServiceHelper(Host);
var server = helper.GetServer();
server.Start();
}
}
}

@ -88,6 +88,7 @@ namespace Grpc.Core
this.handle = ChannelSafeHandle.CreateInsecure(target, nativeChannelArgs);
}
}
GrpcEnvironment.RegisterChannel(this);
}
/// <summary>
@ -209,6 +210,7 @@ namespace Grpc.Core
GrpcPreconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
GrpcEnvironment.UnregisterChannel(this);
shutdownTokenSource.Cancel();
@ -220,7 +222,7 @@ namespace Grpc.Core
handle.Dispose();
await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false);
await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
}
internal ChannelSafeHandle Handle

@ -35,6 +35,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
@ -53,12 +54,16 @@ namespace Grpc.Core
static int refCount;
static int? customThreadPoolSize;
static int? customCompletionQueueCount;
static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
static readonly HashSet<Server> registeredServers = new HashSet<Server>();
static ILogger logger = new ConsoleLogger();
readonly object myLock = new object();
readonly GrpcThreadPool threadPool;
readonly DebugStats debugStats = new DebugStats();
readonly AtomicCounter cqPickerCounter = new AtomicCounter();
bool isClosed;
/// <summary>
@ -67,6 +72,8 @@ namespace Grpc.Core
/// </summary>
internal static GrpcEnvironment AddRef()
{
ShutdownHooks.Register();
lock (staticLock)
{
refCount++;
@ -79,21 +86,26 @@ namespace Grpc.Core
}
/// <summary>
/// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero.
/// (and blocks until the environment has been fully shutdown).
/// Decrements the reference count for currently active environment and asynchronously shuts down the gRPC environment if reference count drops to zero.
/// </summary>
internal static void Release()
internal static async Task ReleaseAsync()
{
GrpcEnvironment instanceToShutdown = null;
lock (staticLock)
{
GrpcPreconditions.CheckState(refCount > 0);
refCount--;
if (refCount == 0)
{
instance.Close();
instanceToShutdown = instance;
instance = null;
}
}
if (instanceToShutdown != null)
{
await instanceToShutdown.ShutdownAsync();
}
}
internal static int GetRefCount()
@ -104,6 +116,68 @@ namespace Grpc.Core
}
}
internal static void RegisterChannel(Channel channel)
{
lock (staticLock)
{
GrpcPreconditions.CheckNotNull(channel);
registeredChannels.Add(channel);
}
}
internal static void UnregisterChannel(Channel channel)
{
lock (staticLock)
{
GrpcPreconditions.CheckNotNull(channel);
GrpcPreconditions.CheckArgument(registeredChannels.Remove(channel), "Channel not found in the registered channels set.");
}
}
internal static void RegisterServer(Server server)
{
lock (staticLock)
{
GrpcPreconditions.CheckNotNull(server);
registeredServers.Add(server);
}
}
internal static void UnregisterServer(Server server)
{
lock (staticLock)
{
GrpcPreconditions.CheckNotNull(server);
GrpcPreconditions.CheckArgument(registeredServers.Remove(server), "Server not found in the registered servers set.");
}
}
/// <summary>
/// Requests shutdown of all channels created by the current process.
/// </summary>
public static Task ShutdownChannelsAsync()
{
HashSet<Channel> snapshot = null;
lock (staticLock)
{
snapshot = new HashSet<Channel>(registeredChannels);
}
return Task.WhenAll(snapshot.Select((channel) => channel.ShutdownAsync()));
}
/// <summary>
/// Requests immediate shutdown of all servers created by the current process.
/// </summary>
public static Task KillServersAsync()
{
HashSet<Server> snapshot = null;
lock (staticLock)
{
snapshot = new HashSet<Server>(registeredServers);
}
return Task.WhenAll(snapshot.Select((server) => server.KillAsync()));
}
/// <summary>
/// Gets application-wide logger used by gRPC.
/// </summary>
@ -180,6 +254,14 @@ namespace Grpc.Core
}
}
internal bool IsAlive
{
get
{
return this.threadPool.IsAlive;
}
}
/// <summary>
/// Picks a completion queue in a round-robin fashion.
/// Shouldn't be invoked on a per-call basis (used at per-channel basis).
@ -223,13 +305,13 @@ namespace Grpc.Core
/// <summary>
/// Shuts down this environment.
/// </summary>
private void Close()
private async Task ShutdownAsync()
{
if (isClosed)
{
throw new InvalidOperationException("Close has already been called");
}
threadPool.Stop();
await threadPool.StopAsync().ConfigureAwait(false);
GrpcNativeShutdown();
isClosed = true;
@ -257,5 +339,32 @@ namespace Grpc.Core
// by default, create a completion queue for each thread
return GetThreadPoolSizeOrDefault();
}
private static class ShutdownHooks
{
static object staticLock = new object();
static bool hooksRegistered;
public static void Register()
{
lock (staticLock)
{
if (!hooksRegistered)
{
AppDomain.CurrentDomain.ProcessExit += ShutdownHookHandler;
AppDomain.CurrentDomain.DomainUnload += ShutdownHookHandler;
}
hooksRegistered = true;
}
}
/// <summary>
/// Handler for AppDomain.DomainUnload and AppDomain.ProcessExit hooks.
/// </summary>
private static void ShutdownHookHandler(object sender, EventArgs e)
{
Task.WaitAll(GrpcEnvironment.ShutdownChannelsAsync(), GrpcEnvironment.KillServersAsync());
}
}
}
}

@ -35,6 +35,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
@ -53,6 +54,8 @@ namespace Grpc.Core.Internal
readonly int poolSize;
readonly int completionQueueCount;
bool stopRequested;
IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues;
/// <summary>
@ -84,15 +87,21 @@ namespace Grpc.Core.Internal
}
}
public void Stop()
public Task StopAsync()
{
lock (myLock)
{
GrpcPreconditions.CheckState(!stopRequested, "Stop already requested.");
stopRequested = true;
foreach (var cq in completionQueues)
{
cq.Shutdown();
}
}
return Task.Run(() =>
{
foreach (var thread in threads)
{
thread.Join();
@ -102,6 +111,21 @@ namespace Grpc.Core.Internal
{
cq.Dispose();
}
});
}
/// <summary>
/// Returns true if there is at least one thread pool thread that hasn't
/// already stopped.
/// Threads can either stop because all completion queues shut down or
/// because all foreground threads have already shutdown and process is
/// going to exit.
/// </summary>
internal bool IsAlive
{
get
{
return threads.Any(t => t.ThreadState != ThreadState.Stopped);
}
}
@ -119,7 +143,7 @@ namespace Grpc.Core.Internal
var cq = completionQueues.ElementAt(cqIndex);
var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq)));
thread.IsBackground = false;
thread.IsBackground = true;
thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex);
thread.Start();

@ -86,6 +86,7 @@ namespace Grpc.Core
{
this.handle.RegisterCompletionQueue(cq);
}
GrpcEnvironment.RegisterServer(this);
}
/// <summary>
@ -155,21 +156,9 @@ namespace Grpc.Core
/// <remarks>
/// It is strongly recommended to shutdown all previously created servers before exiting from the process.
/// </remarks>
public async Task ShutdownAsync()
public Task ShutdownAsync()
{
lock (myLock)
{
GrpcPreconditions.CheckState(startRequested);
GrpcPreconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
var cq = environment.CompletionQueues.First(); // any cq will do
handle.ShutdownAndNotify(HandleServerShutdown, cq);
await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle();
await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false);
return ShutdownInternalAsync(false);
}
/// <summary>
@ -179,22 +168,9 @@ namespace Grpc.Core
/// <remarks>
/// It is strongly recommended to shutdown all previously created servers before exiting from the process.
/// </remarks>
public async Task KillAsync()
public Task KillAsync()
{
lock (myLock)
{
GrpcPreconditions.CheckState(startRequested);
GrpcPreconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
var cq = environment.CompletionQueues.First(); // any cq will do
handle.ShutdownAndNotify(HandleServerShutdown, cq);
handle.CancelAllCalls();
await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle();
await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false);
return ShutdownInternalAsync(true);
}
internal void AddCallReference(object call)
@ -212,6 +188,53 @@ namespace Grpc.Core
activeCallCounter.Decrement();
}
/// <summary>
/// Shuts down the server.
/// </summary>
private async Task ShutdownInternalAsync(bool kill)
{
lock (myLock)
{
GrpcPreconditions.CheckState(startRequested);
GrpcPreconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
GrpcEnvironment.UnregisterServer(this);
var cq = environment.CompletionQueues.First(); // any cq will do
handle.ShutdownAndNotify(HandleServerShutdown, cq);
if (kill)
{
handle.CancelAllCalls();
}
await ShutdownCompleteOrEnvironmentDeadAsync().ConfigureAwait(false);
DisposeHandle();
await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
}
/// <summary>
/// In case the environment's threadpool becomes dead, the shutdown completion will
/// never be delivered, but we need to release the environment's handle anyway.
/// </summary>
private async Task ShutdownCompleteOrEnvironmentDeadAsync()
{
while (true)
{
var task = await Task.WhenAny(shutdownTcs.Task, Task.Delay(20)).ConfigureAwait(false);
if (shutdownTcs.Task == task)
{
return;
}
if (!environment.IsAlive)
{
return;
}
}
}
/// <summary>
/// Adds a service definition.
/// </summary>

@ -7,6 +7,7 @@
"Grpc.Core.Internal.Tests.CompletionQueueSafeHandleTest",
"Grpc.Core.Internal.Tests.MetadataArraySafeHandleTest",
"Grpc.Core.Internal.Tests.TimespecTest",
"Grpc.Core.Tests.AppDomainUnloadTest",
"Grpc.Core.Tests.CallCredentialsTest",
"Grpc.Core.Tests.CallOptionsTest",
"Grpc.Core.Tests.ChannelCredentialsTest",
@ -25,6 +26,9 @@
"Grpc.Core.Tests.ResponseHeadersTest",
"Grpc.Core.Tests.SanityTest",
"Grpc.Core.Tests.ServerTest",
"Grpc.Core.Tests.ShutdownHookClientTest",
"Grpc.Core.Tests.ShutdownHookPendingCallTest",
"Grpc.Core.Tests.ShutdownHookServerTest",
"Grpc.Core.Tests.ShutdownTest",
"Grpc.Core.Tests.TimeoutsTest",
"Grpc.Core.Tests.UserAgentStringTest"

Loading…
Cancel
Save