support GrpcEnvironment.KillServersAsync

pull/6754/head
Jan Tattermusch 9 years ago
parent 63386a1064
commit 739ee1b159
  1. 40
      src/csharp/Grpc.Core/GrpcEnvironment.cs
  2. 15
      src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
  3. 26
      src/csharp/Grpc.Core/Server.cs

@ -55,6 +55,7 @@ namespace Grpc.Core
static int? customThreadPoolSize;
static int? customCompletionQueueCount;
static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
static readonly HashSet<Server> registeredServers = new HashSet<Server>();
static ILogger logger = new ConsoleLogger();
@ -131,6 +132,24 @@ namespace Grpc.Core
}
}
internal static void RegisterServer(Server server)
{
lock (staticLock)
{
GrpcPreconditions.CheckNotNull(server);
registeredServers.Add(server);
}
}
internal static void UnregisterServer(Server server)
{
lock (staticLock)
{
GrpcPreconditions.CheckNotNull(server);
GrpcPreconditions.CheckArgument(registeredServers.Remove(server), "Server not found in the registered servers set.");
}
}
/// <summary>
/// Requests shutdown of all channels created by the current process.
/// </summary>
@ -144,6 +163,19 @@ namespace Grpc.Core
return Task.WhenAll(snapshot.Select((channel) => channel.ShutdownAsync()));
}
/// <summary>
/// Requests immediate shutdown of all servers created by the current process.
/// </summary>
public static Task KillServersAsync()
{
HashSet<Server> snapshot = null;
lock (staticLock)
{
snapshot = new HashSet<Server>(registeredServers);
}
return Task.WhenAll(snapshot.Select((server) => server.KillAsync()));
}
/// <summary>
/// Gets application-wide logger used by gRPC.
/// </summary>
@ -220,6 +252,14 @@ namespace Grpc.Core
}
}
internal bool IsAlive
{
get
{
return this.threadPool.IsAlive;
}
}
/// <summary>
/// Picks a completion queue in a round-robin fashion.
/// Shouldn't be invoked on a per-call basis (used at per-channel basis).

@ -114,6 +114,21 @@ namespace Grpc.Core.Internal
});
}
/// <summary>
/// Returns true if there is at least one thread pool thread that hasn't
/// already stopped.
/// Threads can either stop because all completion queues shut down or
/// because all foreground threads have already shutdown and process is
/// going to exit.
/// </summary>
internal bool IsAlive
{
get
{
return threads.Any(t => t.ThreadState != ThreadState.Stopped);
}
}
internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get

@ -86,6 +86,7 @@ namespace Grpc.Core
{
this.handle.RegisterCompletionQueue(cq);
}
GrpcEnvironment.RegisterServer(this);
}
/// <summary>
@ -198,6 +199,7 @@ namespace Grpc.Core
GrpcPreconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
GrpcEnvironment.UnregisterServer(this);
var cq = environment.CompletionQueues.First(); // any cq will do
handle.ShutdownAndNotify(HandleServerShutdown, cq);
@ -205,12 +207,34 @@ namespace Grpc.Core
{
handle.CancelAllCalls();
}
await shutdownTcs.Task.ConfigureAwait(false);
await ShutdownCompleteOrEnvironmentDeadAsync().ConfigureAwait(false);
DisposeHandle();
await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
}
/// <summary>
/// In case the environment's threadpool becomes dead, the shutdown completion will
/// never be delivered, but we need to release the environment's handle anyway.
/// </summary>
private async Task ShutdownCompleteOrEnvironmentDeadAsync()
{
while (true)
{
var task = await Task.WhenAny(shutdownTcs.Task, Task.Delay(20)).ConfigureAwait(false);
if (shutdownTcs.Task == task)
{
return;
}
if (!environment.IsAlive)
{
return;
}
}
}
/// <summary>
/// Adds a service definition.
/// </summary>

Loading…
Cancel
Save