get rid of explicit GrpcEnvironment.Shutdown()

pull/3018/head
Jan Tattermusch 9 years ago
parent 10cab1396f
commit 2b3579541b
  1. 27
      src/csharp/Grpc.Core.Tests/ChannelTest.cs
  2. 15
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  3. 8
      src/csharp/Grpc.Core.Tests/CompressionTest.cs
  4. 8
      src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
  5. 1
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  6. 26
      src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
  7. 8
      src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
  8. 5
      src/csharp/Grpc.Core.Tests/ServerTest.cs
  9. 77
      src/csharp/Grpc.Core.Tests/ShutdownTest.cs
  10. 8
      src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
  11. 48
      src/csharp/Grpc.Core/Channel.cs
  12. 37
      src/csharp/Grpc.Core/GrpcEnvironment.cs
  13. 8
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  14. 6
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  15. 17
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  16. 16
      src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
  17. 7
      src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
  18. 14
      src/csharp/Grpc.Core/Internal/DebugStats.cs
  19. 3
      src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
  20. 10
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  21. 4
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  22. 14
      src/csharp/Grpc.Core/Logging/ConsoleLogger.cs
  23. 57
      src/csharp/Grpc.Core/Server.cs
  24. 6
      src/csharp/Grpc.Examples.MathClient/MathClient.cs
  25. 1
      src/csharp/Grpc.Examples.MathServer/MathServer.cs
  26. 3
      src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
  27. 3
      src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
  28. 6
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs
  29. 3
      src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
  30. 2
      src/csharp/Grpc.IntegrationTesting/InteropServer.cs
  31. 3
      src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs

@ -41,12 +41,6 @@ namespace Grpc.Core.Tests
{ {
public class ChannelTest public class ChannelTest
{ {
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test] [Test]
public void Constructor_RejectsInvalidParams() public void Constructor_RejectsInvalidParams()
{ {
@ -56,36 +50,33 @@ namespace Grpc.Core.Tests
[Test] [Test]
public void State_IdleAfterCreation() public void State_IdleAfterCreation()
{ {
using (var channel = new Channel("localhost", Credentials.Insecure)) var channel = new Channel("localhost", Credentials.Insecure);
{
Assert.AreEqual(ChannelState.Idle, channel.State); Assert.AreEqual(ChannelState.Idle, channel.State);
} channel.ShutdownAsync().Wait();
} }
[Test] [Test]
public void WaitForStateChangedAsync_InvalidArgument() public void WaitForStateChangedAsync_InvalidArgument()
{ {
using (var channel = new Channel("localhost", Credentials.Insecure)) var channel = new Channel("localhost", Credentials.Insecure);
{
Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure)); Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure));
} channel.ShutdownAsync().Wait();
} }
[Test] [Test]
public void ResolvedTarget() public void ResolvedTarget()
{ {
using (var channel = new Channel("127.0.0.1", Credentials.Insecure)) var channel = new Channel("127.0.0.1", Credentials.Insecure);
{
Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1")); Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1"));
} channel.ShutdownAsync().Wait();
} }
[Test] [Test]
public void Dispose_IsIdempotent() public void Shutdown_AllowedOnlyOnce()
{ {
var channel = new Channel("localhost", Credentials.Insecure); var channel = new Channel("localhost", Credentials.Insecure);
channel.Dispose(); channel.ShutdownAsync().Wait();
channel.Dispose(); Assert.Throws(typeof(InvalidOperationException), () => channel.ShutdownAsync().GetAwaiter().GetResult());
} }
} }
} }

@ -63,16 +63,10 @@ namespace Grpc.Core.Tests
[TearDown] [TearDown]
public void Cleanup() public void Cleanup()
{ {
channel.Dispose(); channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait(); server.ShutdownAsync().Wait();
} }
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test] [Test]
public async Task UnaryCall() public async Task UnaryCall()
{ {
@ -207,13 +201,6 @@ namespace Grpc.Core.Tests
CollectionAssert.AreEqual(headers[1].ValueBytes, trailers[1].ValueBytes); CollectionAssert.AreEqual(headers[1].ValueBytes, trailers[1].ValueBytes);
} }
[Test]
public void UnaryCall_DisposedChannel()
{
channel.Dispose();
Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
}
[Test] [Test]
public void UnaryCallPerformance() public void UnaryCallPerformance()
{ {

@ -62,16 +62,10 @@ namespace Grpc.Core.Tests
[TearDown] [TearDown]
public void Cleanup() public void Cleanup()
{ {
channel.Dispose(); channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait(); server.ShutdownAsync().Wait();
} }
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test] [Test]
public void WriteOptions_Unary() public void WriteOptions_Unary()
{ {

@ -62,16 +62,10 @@ namespace Grpc.Core.Tests
[TearDown] [TearDown]
public void Cleanup() public void Cleanup()
{ {
channel.Dispose(); channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait(); server.ShutdownAsync().Wait();
} }
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test] [Test]
public async Task PropagateCancellation() public async Task PropagateCancellation()
{ {

@ -64,6 +64,7 @@
<Link>Version.cs</Link> <Link>Version.cs</Link>
</Compile> </Compile>
<Compile Include="ClientBaseTest.cs" /> <Compile Include="ClientBaseTest.cs" />
<Compile Include="ShutdownTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="ClientServerTest.cs" /> <Compile Include="ClientServerTest.cs" />
<Compile Include="ServerTest.cs" /> <Compile Include="ServerTest.cs" />

@ -43,33 +43,39 @@ namespace Grpc.Core.Tests
[Test] [Test]
public void InitializeAndShutdownGrpcEnvironment() public void InitializeAndShutdownGrpcEnvironment()
{ {
var env = GrpcEnvironment.GetInstance(); var env = GrpcEnvironment.AddRef();
Assert.IsNotNull(env.CompletionQueue); Assert.IsNotNull(env.CompletionQueue);
GrpcEnvironment.Shutdown(); GrpcEnvironment.Release();
} }
[Test] [Test]
public void SubsequentInvocations() public void SubsequentInvocations()
{ {
var env1 = GrpcEnvironment.GetInstance(); var env1 = GrpcEnvironment.AddRef();
var env2 = GrpcEnvironment.GetInstance(); var env2 = GrpcEnvironment.AddRef();
Assert.IsTrue(object.ReferenceEquals(env1, env2)); Assert.IsTrue(object.ReferenceEquals(env1, env2));
GrpcEnvironment.Shutdown(); GrpcEnvironment.Release();
GrpcEnvironment.Shutdown(); GrpcEnvironment.Release();
} }
[Test] [Test]
public void InitializeAfterShutdown() public void InitializeAfterShutdown()
{ {
var env1 = GrpcEnvironment.GetInstance(); var env1 = GrpcEnvironment.AddRef();
GrpcEnvironment.Shutdown(); GrpcEnvironment.Release();
var env2 = GrpcEnvironment.GetInstance(); var env2 = GrpcEnvironment.AddRef();
GrpcEnvironment.Shutdown(); GrpcEnvironment.Release();
Assert.IsFalse(object.ReferenceEquals(env1, env2)); Assert.IsFalse(object.ReferenceEquals(env1, env2));
} }
[Test]
public void ReleaseWithoutAddRef()
{
Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release());
}
[Test] [Test]
public void GetCoreVersionString() public void GetCoreVersionString()
{ {

@ -69,16 +69,10 @@ namespace Grpc.Core.Tests
[TearDown] [TearDown]
public void Cleanup() public void Cleanup()
{ {
channel.Dispose(); channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait(); server.ShutdownAsync().Wait();
} }
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test] [Test]
public void WriteResponseHeaders_NullNotAllowed() public void WriteResponseHeaders_NullNotAllowed()
{ {

@ -51,7 +51,6 @@ namespace Grpc.Core.Tests
}; };
server.Start(); server.Start();
server.ShutdownAsync().Wait(); server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
} }
[Test] [Test]
@ -67,8 +66,7 @@ namespace Grpc.Core.Tests
Assert.Greater(boundPort.BoundPort, 0); Assert.Greater(boundPort.BoundPort, 0);
server.Start(); server.Start();
server.ShutdownAsync(); server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
} }
[Test] [Test]
@ -83,7 +81,6 @@ namespace Grpc.Core.Tests
Assert.Throws(typeof(InvalidOperationException), () => server.Services.Add(ServerServiceDefinition.CreateBuilder("serviceName").Build())); Assert.Throws(typeof(InvalidOperationException), () => server.Services.Add(ServerServiceDefinition.CreateBuilder("serviceName").Build()));
server.ShutdownAsync().Wait(); server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
} }
} }
} }

@ -0,0 +1,77 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class ShutdownTest
{
const string Host = "127.0.0.1";
MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
helper = new MockServiceHelper(Host);
server = helper.GetServer();
server.Start();
channel = helper.GetChannel();
}
[Test]
public async Task AbandonedCall()
{
helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
{
await requestStream.ToListAsync();
});
var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall(new CallOptions(deadline: DateTime.UtcNow.AddMilliseconds(1))));
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
}
}

@ -65,16 +65,10 @@ namespace Grpc.Core.Tests
[TearDown] [TearDown]
public void Cleanup() public void Cleanup()
{ {
channel.Dispose(); channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait(); server.ShutdownAsync().Wait();
} }
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test] [Test]
public void InfiniteDeadline() public void InfiniteDeadline()
{ {

@ -45,14 +45,19 @@ namespace Grpc.Core
/// <summary> /// <summary>
/// gRPC Channel /// gRPC Channel
/// </summary> /// </summary>
public class Channel : IDisposable public class Channel
{ {
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>();
readonly object myLock = new object();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
readonly string target; readonly string target;
readonly GrpcEnvironment environment; readonly GrpcEnvironment environment;
readonly ChannelSafeHandle handle; readonly ChannelSafeHandle handle;
readonly List<ChannelOption> options; readonly List<ChannelOption> options;
bool shutdownRequested;
bool disposed; bool disposed;
/// <summary> /// <summary>
@ -65,7 +70,7 @@ namespace Grpc.Core
public Channel(string target, Credentials credentials, IEnumerable<ChannelOption> options = null) public Channel(string target, Credentials credentials, IEnumerable<ChannelOption> options = null)
{ {
this.target = Preconditions.CheckNotNull(target, "target"); this.target = Preconditions.CheckNotNull(target, "target");
this.environment = GrpcEnvironment.GetInstance(); this.environment = GrpcEnvironment.AddRef();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
EnsureUserAgentChannelOption(this.options); EnsureUserAgentChannelOption(this.options);
@ -172,12 +177,26 @@ namespace Grpc.Core
} }
/// <summary> /// <summary>
/// Destroys the underlying channel. /// Waits until there are no more active calls for this channel and then cleans up
/// resources used by this channel.
/// </summary> /// </summary>
public void Dispose() public async Task ShutdownAsync()
{
lock (myLock)
{
Preconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
var activeCallCount = activeCallCounter.Count;
if (activeCallCount > 0)
{ {
Dispose(true); Logger.Warning("Channel shutdown was called but there are still {0} active calls for that channel.", activeCallCount);
GC.SuppressFinalize(this); }
handle.Dispose();
await Task.Run(() => GrpcEnvironment.Release());
} }
internal ChannelSafeHandle Handle internal ChannelSafeHandle Handle
@ -196,13 +215,20 @@ namespace Grpc.Core
} }
} }
protected virtual void Dispose(bool disposing) internal void AddCallReference(object call)
{ {
if (disposing && handle != null && !disposed) activeCallCounter.Increment();
{
disposed = true; bool success = false;
handle.Dispose(); handle.DangerousAddRef(ref success);
Preconditions.CheckState(success);
} }
internal void RemoveCallReference(object call)
{
handle.DangerousRelease();
activeCallCounter.Decrement();
} }
private static void EnsureUserAgentChannelOption(List<ChannelOption> options) private static void EnsureUserAgentChannelOption(List<ChannelOption> options)

@ -58,6 +58,7 @@ namespace Grpc.Core
static object staticLock = new object(); static object staticLock = new object();
static GrpcEnvironment instance; static GrpcEnvironment instance;
static int refCount;
static ILogger logger = new ConsoleLogger(); static ILogger logger = new ConsoleLogger();
@ -67,13 +68,14 @@ namespace Grpc.Core
bool isClosed; bool isClosed;
/// <summary> /// <summary>
/// Returns an instance of initialized gRPC environment. /// Returns a reference-counted instance of initialized gRPC environment.
/// Subsequent invocations return the same instance unless Shutdown has been called first. /// Subsequent invocations return the same instance unless reference count has dropped to zero previously.
/// </summary> /// </summary>
internal static GrpcEnvironment GetInstance() internal static GrpcEnvironment AddRef()
{ {
lock (staticLock) lock (staticLock)
{ {
refCount++;
if (instance == null) if (instance == null)
{ {
instance = new GrpcEnvironment(); instance = new GrpcEnvironment();
@ -83,14 +85,16 @@ namespace Grpc.Core
} }
/// <summary> /// <summary>
/// Shuts down the gRPC environment if it was initialized before. /// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero.
/// Blocks until the environment has been fully shutdown. /// (and blocks until the environment has been fully shutdown).
/// </summary> /// </summary>
public static void Shutdown() internal static void Release()
{ {
lock (staticLock) lock (staticLock)
{ {
if (instance != null) Preconditions.CheckState(refCount > 0);
refCount--;
if (refCount == 0)
{ {
instance.Close(); instance.Close();
instance = null; instance = null;
@ -125,12 +129,10 @@ namespace Grpc.Core
private GrpcEnvironment() private GrpcEnvironment()
{ {
NativeLogRedirector.Redirect(); NativeLogRedirector.Redirect();
grpcsharp_init(); GrpcNativeInit();
completionRegistry = new CompletionRegistry(this); completionRegistry = new CompletionRegistry(this);
threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE); threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE);
threadPool.Start(); threadPool.Start();
// TODO: use proper logging here
Logger.Info("gRPC initialized.");
} }
/// <summary> /// <summary>
@ -175,6 +177,17 @@ namespace Grpc.Core
return Marshal.PtrToStringAnsi(ptr); return Marshal.PtrToStringAnsi(ptr);
} }
internal static void GrpcNativeInit()
{
grpcsharp_init();
}
internal static void GrpcNativeShutdown()
{
grpcsharp_shutdown();
}
/// <summary> /// <summary>
/// Shuts down this environment. /// Shuts down this environment.
/// </summary> /// </summary>
@ -185,12 +198,10 @@ namespace Grpc.Core
throw new InvalidOperationException("Close has already been called"); throw new InvalidOperationException("Close has already been called");
} }
threadPool.Stop(); threadPool.Stop();
grpcsharp_shutdown(); GrpcNativeShutdown();
isClosed = true; isClosed = true;
debugStats.CheckOK(); debugStats.CheckOK();
Logger.Info("gRPC shutdown.");
} }
} }
} }

@ -311,9 +311,9 @@ namespace Grpc.Core.Internal
} }
} }
protected override void OnReleaseResources() protected override void OnAfterReleaseResources()
{ {
details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement(); details.Channel.RemoveCallReference(this);
} }
private void Initialize(CompletionQueueSafeHandle cq) private void Initialize(CompletionQueueSafeHandle cq)
@ -323,7 +323,9 @@ namespace Grpc.Core.Internal
var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry, var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
parentCall, ContextPropagationToken.DefaultMask, cq, parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value)); details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value));
details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
details.Channel.AddCallReference(this);
InitializeInternal(call); InitializeInternal(call);
RegisterCancellationCallback(); RegisterCancellationCallback();
} }

@ -189,15 +189,15 @@ namespace Grpc.Core.Internal
private void ReleaseResources() private void ReleaseResources()
{ {
OnReleaseResources();
if (call != null) if (call != null)
{ {
call.Dispose(); call.Dispose();
} }
disposed = true; disposed = true;
OnAfterReleaseResources();
} }
protected virtual void OnReleaseResources() protected virtual void OnAfterReleaseResources()
{ {
} }
@ -212,7 +212,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
} }
protected void CheckReadingAllowed() protected virtual void CheckReadingAllowed()
{ {
Preconditions.CheckState(started); Preconditions.CheckState(started);
Preconditions.CheckState(!disposed); Preconditions.CheckState(!disposed);

@ -50,16 +50,19 @@ namespace Grpc.Core.Internal
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
readonly GrpcEnvironment environment; readonly GrpcEnvironment environment;
readonly Server server;
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer) public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer)
{ {
this.environment = Preconditions.CheckNotNull(environment); this.environment = Preconditions.CheckNotNull(environment);
this.server = Preconditions.CheckNotNull(server);
} }
public void Initialize(CallSafeHandle call) public void Initialize(CallSafeHandle call)
{ {
call.SetCompletionRegistry(environment.CompletionRegistry); call.SetCompletionRegistry(environment.CompletionRegistry);
environment.DebugStats.ActiveServerCalls.Increment();
server.AddCallReference(this);
InitializeInternal(call); InitializeInternal(call);
} }
@ -168,9 +171,15 @@ namespace Grpc.Core.Internal
} }
} }
protected override void OnReleaseResources() protected override void CheckReadingAllowed()
{
base.CheckReadingAllowed();
Preconditions.CheckArgument(!cancelRequested);
}
protected override void OnAfterReleaseResources()
{ {
environment.DebugStats.ActiveServerCalls.Decrement(); server.RemoveCallReference(this);
} }
/// <summary> /// <summary>

@ -134,7 +134,7 @@ namespace Grpc.Core.Internal
} }
// Gets data of server_rpc_new completion. // Gets data of server_rpc_new completion.
public ServerRpcNew GetServerRpcNew() public ServerRpcNew GetServerRpcNew(Server server)
{ {
var call = grpcsharp_batch_context_server_rpc_new_call(this); var call = grpcsharp_batch_context_server_rpc_new_call(this);
@ -145,7 +145,7 @@ namespace Grpc.Core.Internal
IntPtr metadataArrayPtr = grpcsharp_batch_context_server_rpc_new_request_metadata(this); IntPtr metadataArrayPtr = grpcsharp_batch_context_server_rpc_new_request_metadata(this);
var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr); var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr);
return new ServerRpcNew(call, method, host, deadline, metadata); return new ServerRpcNew(server, call, method, host, deadline, metadata);
} }
// Gets data of receive_close_on_server completion. // Gets data of receive_close_on_server completion.
@ -198,14 +198,16 @@ namespace Grpc.Core.Internal
/// </summary> /// </summary>
internal struct ServerRpcNew internal struct ServerRpcNew
{ {
readonly Server server;
readonly CallSafeHandle call; readonly CallSafeHandle call;
readonly string method; readonly string method;
readonly string host; readonly string host;
readonly Timespec deadline; readonly Timespec deadline;
readonly Metadata requestMetadata; readonly Metadata requestMetadata;
public ServerRpcNew(CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata) public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
{ {
this.server = server;
this.call = call; this.call = call;
this.method = method; this.method = method;
this.host = host; this.host = host;
@ -213,6 +215,14 @@ namespace Grpc.Core.Internal
this.requestMetadata = requestMetadata; this.requestMetadata = requestMetadata;
} }
public Server Server
{
get
{
return this.server;
}
}
public CallSafeHandle Call public CallSafeHandle Call
{ {
get get

@ -68,11 +68,17 @@ namespace Grpc.Core.Internal
public static ChannelSafeHandle CreateInsecure(string target, ChannelArgsSafeHandle channelArgs) public static ChannelSafeHandle CreateInsecure(string target, ChannelArgsSafeHandle channelArgs)
{ {
// Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
// Doing so would make object finalizer crash if we end up abandoning the handle.
GrpcEnvironment.GrpcNativeInit();
return grpcsharp_insecure_channel_create(target, channelArgs); return grpcsharp_insecure_channel_create(target, channelArgs);
} }
public static ChannelSafeHandle CreateSecure(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs) public static ChannelSafeHandle CreateSecure(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs)
{ {
// Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
// Doing so would make object finalizer crash if we end up abandoning the handle.
GrpcEnvironment.GrpcNativeInit();
return grpcsharp_secure_channel_create(credentials, target, channelArgs); return grpcsharp_secure_channel_create(credentials, target, channelArgs);
} }
@ -107,6 +113,7 @@ namespace Grpc.Core.Internal
protected override bool ReleaseHandle() protected override bool ReleaseHandle()
{ {
grpcsharp_channel_destroy(handle); grpcsharp_channel_destroy(handle);
GrpcEnvironment.GrpcNativeShutdown();
return true; return true;
} }
} }

@ -38,10 +38,6 @@ namespace Grpc.Core.Internal
{ {
internal class DebugStats internal class DebugStats
{ {
public readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
public readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
public readonly AtomicCounter PendingBatchCompletions = new AtomicCounter(); public readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
/// <summary> /// <summary>
@ -49,16 +45,6 @@ namespace Grpc.Core.Internal
/// </summary> /// </summary>
public void CheckOK() public void CheckOK()
{ {
var remainingClientCalls = ActiveClientCalls.Count;
if (remainingClientCalls != 0)
{
DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
}
var remainingServerCalls = ActiveServerCalls.Count;
if (remainingServerCalls != 0)
{
DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
}
var pendingBatchCompletions = PendingBatchCompletions.Count; var pendingBatchCompletions = PendingBatchCompletions.Count;
if (pendingBatchCompletions != 0) if (pendingBatchCompletions != 0)
{ {

@ -83,8 +83,6 @@ namespace Grpc.Core.Internal
lock (myLock) lock (myLock)
{ {
cq.Shutdown(); cq.Shutdown();
Logger.Info("Waiting for GRPC threads to finish.");
foreach (var thread in threads) foreach (var thread in threads)
{ {
thread.Join(); thread.Join();
@ -136,7 +134,6 @@ namespace Grpc.Core.Internal
} }
} }
while (ev.type != GRPCCompletionType.Shutdown); while (ev.type != GRPCCompletionType.Shutdown);
Logger.Info("Completion queue has shutdown successfully, thread {0} exiting.", Thread.CurrentThread.Name);
} }
} }
} }

@ -67,7 +67,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>( var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer, method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer, method.RequestMarshaller.Deserializer,
environment); environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call); asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync(); var finishedTask = asyncCall.ServerSideCallAsync();
@ -123,7 +123,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>( var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer, method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer, method.RequestMarshaller.Deserializer,
environment); environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call); asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync(); var finishedTask = asyncCall.ServerSideCallAsync();
@ -179,7 +179,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>( var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer, method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer, method.RequestMarshaller.Deserializer,
environment); environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call); asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync(); var finishedTask = asyncCall.ServerSideCallAsync();
@ -239,7 +239,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>( var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer, method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer, method.RequestMarshaller.Deserializer,
environment); environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call); asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync(); var finishedTask = asyncCall.ServerSideCallAsync();
@ -278,7 +278,7 @@ namespace Grpc.Core.Internal
{ {
// We don't care about the payload type here. // We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>( var asyncCall = new AsyncCallServer<byte[], byte[]>(
(payload) => payload, (payload) => payload, environment); (payload) => payload, (payload) => payload, environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call); asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync(); var finishedTask = asyncCall.ServerSideCallAsync();

@ -74,6 +74,9 @@ namespace Grpc.Core.Internal
public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args) public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args)
{ {
// Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
// Doing so would make object finalizer crash if we end up abandoning the handle.
GrpcEnvironment.GrpcNativeInit();
return grpcsharp_server_create(cq, args); return grpcsharp_server_create(cq, args);
} }
@ -109,6 +112,7 @@ namespace Grpc.Core.Internal
protected override bool ReleaseHandle() protected override bool ReleaseHandle()
{ {
grpcsharp_server_destroy(handle); grpcsharp_server_destroy(handle);
GrpcEnvironment.GrpcNativeShutdown();
return true; return true;
} }

@ -51,7 +51,19 @@ namespace Grpc.Core.Logging
private ConsoleLogger(Type forType) private ConsoleLogger(Type forType)
{ {
this.forType = forType; this.forType = forType;
this.forTypeString = forType != null ? forType.FullName + " " : ""; if (forType != null)
{
var namespaceStr = forType.Namespace ?? "";
if (namespaceStr.Length > 0)
{
namespaceStr += ".";
}
this.forTypeString = namespaceStr + forType.Name + " ";
}
else
{
this.forTypeString = "";
}
} }
/// <summary> /// <summary>

@ -50,6 +50,8 @@ namespace Grpc.Core
{ {
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
readonly ServiceDefinitionCollection serviceDefinitions; readonly ServiceDefinitionCollection serviceDefinitions;
readonly ServerPortCollection ports; readonly ServerPortCollection ports;
readonly GrpcEnvironment environment; readonly GrpcEnvironment environment;
@ -73,7 +75,7 @@ namespace Grpc.Core
{ {
this.serviceDefinitions = new ServiceDefinitionCollection(this); this.serviceDefinitions = new ServiceDefinitionCollection(this);
this.ports = new ServerPortCollection(this); this.ports = new ServerPortCollection(this);
this.environment = GrpcEnvironment.GetInstance(); this.environment = GrpcEnvironment.AddRef();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options)) using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
{ {
@ -105,6 +107,17 @@ namespace Grpc.Core
} }
} }
/// <summary>
/// To allow awaiting termination of the server.
/// </summary>
public Task ShutdownTask
{
get
{
return shutdownTcs.Task;
}
}
/// <summary> /// <summary>
/// Starts the server. /// Starts the server.
/// </summary> /// </summary>
@ -136,18 +149,9 @@ namespace Grpc.Core
handle.ShutdownAndNotify(HandleServerShutdown, environment); handle.ShutdownAndNotify(HandleServerShutdown, environment);
await shutdownTcs.Task; await shutdownTcs.Task;
handle.Dispose(); DisposeHandle();
}
/// <summary> await Task.Run(() => GrpcEnvironment.Release());
/// To allow awaiting termination of the server.
/// </summary>
public Task ShutdownTask
{
get
{
return shutdownTcs.Task;
}
} }
/// <summary> /// <summary>
@ -166,7 +170,22 @@ namespace Grpc.Core
handle.ShutdownAndNotify(HandleServerShutdown, environment); handle.ShutdownAndNotify(HandleServerShutdown, environment);
handle.CancelAllCalls(); handle.CancelAllCalls();
await shutdownTcs.Task; await shutdownTcs.Task;
handle.Dispose(); DisposeHandle();
}
internal void AddCallReference(object call)
{
activeCallCounter.Increment();
bool success = false;
handle.DangerousAddRef(ref success);
Preconditions.CheckState(success);
}
internal void RemoveCallReference(object call)
{
handle.DangerousRelease();
activeCallCounter.Decrement();
} }
/// <summary> /// <summary>
@ -227,6 +246,16 @@ namespace Grpc.Core
} }
} }
private void DisposeHandle()
{
var activeCallCount = activeCallCounter.Count;
if (activeCallCount > 0)
{
Logger.Warning("Server shutdown has finished but there are still {0} active calls for that server.", activeCallCount);
}
handle.Dispose();
}
/// <summary> /// <summary>
/// Selects corresponding handler for given call and handles the call. /// Selects corresponding handler for given call and handles the call.
/// </summary> /// </summary>
@ -254,7 +283,7 @@ namespace Grpc.Core
{ {
if (success) if (success)
{ {
ServerRpcNew newRpc = ctx.GetServerRpcNew(); ServerRpcNew newRpc = ctx.GetServerRpcNew(this);
// after server shutdown, the callback returns with null call // after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid) if (!newRpc.Call.IsInvalid)

@ -39,8 +39,7 @@ namespace math
{ {
public static void Main(string[] args) public static void Main(string[] args)
{ {
using (Channel channel = new Channel("127.0.0.1", 23456, Credentials.Insecure)) var channel = new Channel("127.0.0.1", 23456, Credentials.Insecure);
{
Math.IMathClient client = new Math.MathClient(channel); Math.IMathClient client = new Math.MathClient(channel);
MathExamples.DivExample(client); MathExamples.DivExample(client);
@ -53,9 +52,8 @@ namespace math
MathExamples.DivManyExample(client).Wait(); MathExamples.DivManyExample(client).Wait();
MathExamples.DependendRequestsExample(client).Wait(); MathExamples.DependendRequestsExample(client).Wait();
}
GrpcEnvironment.Shutdown(); channel.ShutdownAsync().Wait();
} }
} }
} }

@ -56,7 +56,6 @@ namespace math
Console.ReadKey(); Console.ReadKey();
server.ShutdownAsync().Wait(); server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
} }
} }
} }

@ -68,9 +68,8 @@ namespace math.Tests
[TestFixtureTearDown] [TestFixtureTearDown]
public void Cleanup() public void Cleanup()
{ {
channel.Dispose(); channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait(); server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
} }
[Test] [Test]

@ -71,10 +71,9 @@ namespace Grpc.HealthCheck.Tests
[TestFixtureTearDown] [TestFixtureTearDown]
public void Cleanup() public void Cleanup()
{ {
channel.Dispose(); channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait(); server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
} }
[Test] [Test]

@ -120,12 +120,10 @@ namespace Grpc.IntegrationTesting
}; };
} }
using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions)) var channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions);
{
TestService.TestServiceClient client = new TestService.TestServiceClient(channel); TestService.TestServiceClient client = new TestService.TestServiceClient(channel);
await RunTestCaseAsync(options.testCase, client); await RunTestCaseAsync(options.testCase, client);
} channel.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
} }
private async Task RunTestCaseAsync(string testCase, TestService.TestServiceClient client) private async Task RunTestCaseAsync(string testCase, TestService.TestServiceClient client)

@ -75,9 +75,8 @@ namespace Grpc.IntegrationTesting
[TestFixtureTearDown] [TestFixtureTearDown]
public void Cleanup() public void Cleanup()
{ {
channel.Dispose(); channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait(); server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
} }
[Test] [Test]

@ -107,8 +107,6 @@ namespace Grpc.IntegrationTesting
server.Start(); server.Start();
server.ShutdownTask.Wait(); server.ShutdownTask.Wait();
GrpcEnvironment.Shutdown();
} }
private static ServerOptions ParseArguments(string[] args) private static ServerOptions ParseArguments(string[] args)

@ -85,9 +85,8 @@ namespace Grpc.IntegrationTesting
[TestFixtureTearDown] [TestFixtureTearDown]
public void Cleanup() public void Cleanup()
{ {
channel.Dispose(); channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait(); server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
} }
[Test] [Test]

Loading…
Cancel
Save