diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index 21f94d3cf55..e797dd82f24 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -73,12 +73,6 @@ namespace Grpc.Core.Tests
Server server;
Channel channel;
- [TestFixtureSetUp]
- public void InitClass()
- {
- GrpcEnvironment.Initialize();
- }
-
[SetUp]
public void Init()
{
diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
index 6a132a5b224..9ae12776f3c 100644
--- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
+++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
@@ -43,16 +43,17 @@ namespace Grpc.Core.Tests
[Test]
public void InitializeAndShutdownGrpcEnvironment()
{
- GrpcEnvironment.Initialize();
- Assert.IsNotNull(GrpcEnvironment.ThreadPool.CompletionQueue);
+ var env = GrpcEnvironment.GetInstance();
+ Assert.IsNotNull(env.CompletionQueue);
GrpcEnvironment.Shutdown();
}
[Test]
public void SubsequentInvocations()
{
- GrpcEnvironment.Initialize();
- GrpcEnvironment.Initialize();
+ var env1 = GrpcEnvironment.GetInstance();
+ var env2 = GrpcEnvironment.GetInstance();
+ Assert.IsTrue(object.ReferenceEquals(env1, env2));
GrpcEnvironment.Shutdown();
GrpcEnvironment.Shutdown();
}
@@ -60,15 +61,13 @@ namespace Grpc.Core.Tests
[Test]
public void InitializeAfterShutdown()
{
- GrpcEnvironment.Initialize();
- var tp1 = GrpcEnvironment.ThreadPool;
+ var env1 = GrpcEnvironment.GetInstance();
GrpcEnvironment.Shutdown();
- GrpcEnvironment.Initialize();
- var tp2 = GrpcEnvironment.ThreadPool;
+ var env2 = GrpcEnvironment.GetInstance();
GrpcEnvironment.Shutdown();
- Assert.IsFalse(object.ReferenceEquals(tp1, tp2));
+ Assert.IsFalse(object.ReferenceEquals(env1, env2));
}
}
}
diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
index 8b3c9102513..714c2f7494c 100644
--- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
+++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
@@ -53,18 +53,6 @@ namespace Grpc.Core.Tests
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_test_nop(IntPtr ptr);
- [TestFixtureSetUp]
- public void Init()
- {
- GrpcEnvironment.Initialize();
- }
-
- [TestFixtureTearDown]
- public void Cleanup()
- {
- GrpcEnvironment.Shutdown();
- }
-
///
/// (~1.26us .NET Windows)
///
diff --git a/src/csharp/Grpc.Core.Tests/ServerTest.cs b/src/csharp/Grpc.Core.Tests/ServerTest.cs
index 02c773c9ccc..1119aa370e5 100644
--- a/src/csharp/Grpc.Core.Tests/ServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ServerTest.cs
@@ -44,13 +44,10 @@ namespace Grpc.Core.Tests
[Test]
public void StartAndShutdownServer()
{
- GrpcEnvironment.Initialize();
-
Server server = new Server();
server.AddListeningPort("localhost", Server.PickUnusedPort);
server.Start();
server.ShutdownAsync().Wait();
-
GrpcEnvironment.Shutdown();
}
}
diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs
index 9f8baac6841..750282258f2 100644
--- a/src/csharp/Grpc.Core/Calls.cs
+++ b/src/csharp/Grpc.Core/Calls.cs
@@ -58,7 +58,7 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
+ asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
var asyncResult = asyncCall.UnaryCallAsync(req, call.Headers);
RegisterCancellationCallback(asyncCall, token);
return await asyncResult;
@@ -69,7 +69,7 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
+ asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
asyncCall.StartServerStreamingCall(req, call.Headers);
RegisterCancellationCallback(asyncCall, token);
var responseStream = new ClientResponseStream(asyncCall);
@@ -81,7 +81,7 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
+ asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers);
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream(asyncCall);
@@ -93,7 +93,7 @@ namespace Grpc.Core
where TResponse : class
{
var asyncCall = new AsyncCall(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
- asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
+ asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
asyncCall.StartDuplexStreamingCall(call.Headers);
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream(asyncCall);
@@ -108,13 +108,5 @@ namespace Grpc.Core
token.Register(() => asyncCall.Cancel());
}
}
-
- ///
- /// Gets shared completion queue used for async calls.
- ///
- private static CompletionQueueSafeHandle GetCompletionQueue()
- {
- return GrpcEnvironment.ThreadPool.CompletionQueue;
- }
}
}
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index d6bfbb7bc45..5baf2600031 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -42,8 +42,10 @@ namespace Grpc.Core
///
public class Channel : IDisposable
{
+ readonly GrpcEnvironment environment;
readonly ChannelSafeHandle handle;
readonly string target;
+ bool disposed;
///
/// Creates a channel that connects to a specific host.
@@ -54,6 +56,7 @@ namespace Grpc.Core
/// Channel options.
public Channel(string host, Credentials credentials = null, IEnumerable options = null)
{
+ this.environment = GrpcEnvironment.GetInstance();
using (ChannelArgsSafeHandle nativeChannelArgs = ChannelOptions.CreateChannelArgs(options))
{
if (credentials != null)
@@ -105,10 +108,35 @@ namespace Grpc.Core
}
}
+ internal CompletionQueueSafeHandle CompletionQueue
+ {
+ get
+ {
+ return this.environment.CompletionQueue;
+ }
+ }
+
+ internal CompletionRegistry CompletionRegistry
+ {
+ get
+ {
+ return this.environment.CompletionRegistry;
+ }
+ }
+
+ internal GrpcEnvironment Environment
+ {
+ get
+ {
+ return this.environment;
+ }
+ }
+
protected virtual void Dispose(bool disposing)
{
- if (handle != null && !handle.IsInvalid)
+ if (disposing && handle != null && !disposed)
{
+ disposed = true;
handle.Dispose();
}
}
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 30ff2897145..47d1651aab8 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -33,7 +33,9 @@
using System;
using System.Runtime.InteropServices;
+using System.Threading.Tasks;
using Grpc.Core.Internal;
+using Grpc.Core.Utils;
namespace Grpc.Core
{
@@ -51,20 +53,18 @@ namespace Grpc.Core
static extern void grpcsharp_shutdown();
static object staticLock = new object();
- static volatile GrpcEnvironment instance;
+ static GrpcEnvironment instance;
readonly GrpcThreadPool threadPool;
readonly CompletionRegistry completionRegistry;
+ readonly DebugStats debugStats = new DebugStats();
bool isClosed;
///
- /// Makes sure GRPC environment is initialized. Subsequent invocations don't have any
- /// effect unless you call Shutdown first.
- /// Although normal use cases assume you will call this just once in your application's
- /// lifetime (and call Shutdown once you're done), for the sake of easier testing it's
- /// allowed to initialize the environment again after it has been successfully shutdown.
+ /// Returns an instance of initialized gRPC environment.
+ /// Subsequent invocations return the same instance unless Shutdown has been called first.
///
- public static void Initialize()
+ internal static GrpcEnvironment GetInstance()
{
lock (staticLock)
{
@@ -72,12 +72,13 @@ namespace Grpc.Core
{
instance = new GrpcEnvironment();
}
+ return instance;
}
}
///
- /// Shuts down the GRPC environment if it was initialized before.
- /// Repeated invocations have no effect.
+ /// Shuts down the gRPC environment if it was initialized before.
+ /// Blocks until the environment has been fully shutdown.
///
public static void Shutdown()
{
@@ -87,50 +88,55 @@ namespace Grpc.Core
{
instance.Close();
instance = null;
-
- CheckDebugStats();
}
}
}
- internal static GrpcThreadPool ThreadPool
+ ///
+ /// Creates gRPC environment.
+ ///
+ private GrpcEnvironment()
+ {
+ GrpcLog.RedirectNativeLogs(Console.Error);
+ grpcsharp_init();
+ completionRegistry = new CompletionRegistry(this);
+ threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE);
+ threadPool.Start();
+ // TODO: use proper logging here
+ Console.WriteLine("GRPC initialized.");
+ }
+
+ ///
+ /// Gets the completion registry used by this gRPC environment.
+ ///
+ internal CompletionRegistry CompletionRegistry
{
get
{
- var inst = instance;
- if (inst == null)
- {
- throw new InvalidOperationException("GRPC environment not initialized");
- }
- return inst.threadPool;
+ return this.completionRegistry;
}
}
- internal static CompletionRegistry CompletionRegistry
+ ///
+ /// Gets the completion queue used by this gRPC environment.
+ ///
+ internal CompletionQueueSafeHandle CompletionQueue
{
get
{
- var inst = instance;
- if (inst == null)
- {
- throw new InvalidOperationException("GRPC environment not initialized");
- }
- return inst.completionRegistry;
+ return this.threadPool.CompletionQueue;
}
}
///
- /// Creates gRPC environment.
+ /// Gets the completion queue used by this gRPC environment.
///
- private GrpcEnvironment()
+ internal DebugStats DebugStats
{
- GrpcLog.RedirectNativeLogs(Console.Error);
- grpcsharp_init();
- completionRegistry = new CompletionRegistry();
- threadPool = new GrpcThreadPool(THREAD_POOL_SIZE);
- threadPool.Start();
- // TODO: use proper logging here
- Console.WriteLine("GRPC initialized.");
+ get
+ {
+ return this.debugStats;
+ }
}
///
@@ -146,32 +152,28 @@ namespace Grpc.Core
grpcsharp_shutdown();
isClosed = true;
+ debugStats.CheckOK();
+
// TODO: use proper logging here
Console.WriteLine("GRPC shutdown.");
}
- private static void CheckDebugStats()
+ ///
+ /// Shuts down this environment asynchronously.
+ ///
+ private Task CloseAsync()
{
- var remainingClientCalls = DebugStats.ActiveClientCalls.Count;
- if (remainingClientCalls != 0)
- {
- DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
- }
- var remainingServerCalls = DebugStats.ActiveServerCalls.Count;
- if (remainingServerCalls != 0)
- {
- DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
- }
- var pendingBatchCompletions = DebugStats.PendingBatchCompletions.Count;
- if (pendingBatchCompletions != 0)
+ return Task.Run(() =>
{
- DebugWarning(string.Format("Detected {0} pending batch completions.", pendingBatchCompletions));
- }
- }
-
- private static void DebugWarning(string message)
- {
- throw new Exception("Shutdown check: " + message);
+ try
+ {
+ Close();
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Error occured while shutting down GrpcEnvironment: " + e);
+ }
+ });
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index d350f45da6b..24b75d16686 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -47,6 +47,8 @@ namespace Grpc.Core.Internal
///
internal class AsyncCall : AsyncCallBase
{
+ Channel channel;
+
// Completion of a pending unary response if not null.
TaskCompletionSource unaryResponseTcs;
@@ -61,8 +63,9 @@ namespace Grpc.Core.Internal
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName)
{
- var call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture);
- DebugStats.ActiveClientCalls.Increment();
+ this.channel = channel;
+ var call = CallSafeHandle.Create(channel.Handle, channel.CompletionRegistry, cq, methodName, channel.Target, Timespec.InfFuture);
+ channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
}
@@ -277,7 +280,7 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
- DebugStats.ActiveClientCalls.Decrement();
+ channel.Environment.DebugStats.ActiveClientCalls.Decrement();
}
///
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 4f510ba40ac..309067ea9de 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -48,14 +48,17 @@ namespace Grpc.Core.Internal
internal class AsyncCallServer : AsyncCallBase
{
readonly TaskCompletionSource
public const int PickUnusedPort = 0;
+ readonly GrpcEnvironment environment;
readonly ServerSafeHandle handle;
readonly object myLock = new object();
@@ -67,9 +68,10 @@ namespace Grpc.Core
/// Channel options.
public Server(IEnumerable options = null)
{
+ this.environment = GrpcEnvironment.GetInstance();
using (var channelArgs = ChannelOptions.CreateChannelArgs(options))
{
- this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), channelArgs);
+ this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs);
}
}
@@ -144,7 +146,7 @@ namespace Grpc.Core
shutdownRequested = true;
}
- handle.ShutdownAndNotify(GetCompletionQueue(), HandleServerShutdown);
+ handle.ShutdownAndNotify(HandleServerShutdown, environment);
await shutdownTcs.Task;
handle.Dispose();
}
@@ -173,7 +175,7 @@ namespace Grpc.Core
shutdownRequested = true;
}
- handle.ShutdownAndNotify(GetCompletionQueue(), HandleServerShutdown);
+ handle.ShutdownAndNotify(HandleServerShutdown, environment);
handle.CancelAllCalls();
await shutdownTcs.Task;
handle.Dispose();
@@ -208,7 +210,7 @@ namespace Grpc.Core
{
if (!shutdownRequested)
{
- handle.RequestCall(GetCompletionQueue(), HandleNewServerRpc);
+ handle.RequestCall(HandleNewServerRpc, environment);
}
}
}
@@ -225,7 +227,7 @@ namespace Grpc.Core
{
callHandler = new NoSuchMethodCallHandler();
}
- await callHandler.HandleCall(method, call, GetCompletionQueue());
+ await callHandler.HandleCall(method, call, environment);
}
catch (Exception e)
{
@@ -259,10 +261,5 @@ namespace Grpc.Core
{
shutdownTcs.SetResult(null);
}
-
- private static CompletionQueueSafeHandle GetCompletionQueue()
- {
- return GrpcEnvironment.ThreadPool.CompletionQueue;
- }
}
}
diff --git a/src/csharp/Grpc.Examples.MathClient/MathClient.cs b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
index 360fe928dd8..b7637214600 100644
--- a/src/csharp/Grpc.Examples.MathClient/MathClient.cs
+++ b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
@@ -39,8 +39,6 @@ namespace math
{
public static void Main(string[] args)
{
- GrpcEnvironment.Initialize();
-
using (Channel channel = new Channel("127.0.0.1", 23456))
{
Math.IMathClient stub = new Math.MathClient(channel);
diff --git a/src/csharp/Grpc.Examples.MathServer/MathServer.cs b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
index d05e3f28080..f4409851127 100644
--- a/src/csharp/Grpc.Examples.MathServer/MathServer.cs
+++ b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
@@ -42,8 +42,6 @@ namespace math
{
string host = "0.0.0.0";
- GrpcEnvironment.Initialize();
-
Server server = new Server();
server.AddServiceDefinition(Math.BindService(new MathServiceImpl()));
int port = server.AddListeningPort(host, 23456);
diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
index aadd49f795d..10dceb60aaa 100644
--- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
+++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
@@ -54,8 +54,6 @@ namespace math.Tests
[TestFixtureSetUp]
public void Init()
{
- GrpcEnvironment.Initialize();
-
server = new Server();
server.AddServiceDefinition(Math.BindService(new MathServiceImpl()));
int port = server.AddListeningPort(host, Server.PickUnusedPort);
@@ -75,7 +73,6 @@ namespace math.Tests
public void Cleanup()
{
channel.Dispose();
-
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index f0be522bc6e..bdcb2c505c2 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -102,8 +102,6 @@ namespace Grpc.IntegrationTesting
private void Run()
{
- GrpcEnvironment.Initialize();
-
Credentials credentials = null;
if (options.useTls)
{
@@ -135,7 +133,6 @@ namespace Grpc.IntegrationTesting
TestService.ITestServiceClient client = new TestService.TestServiceClient(channel, stubConfig);
RunTestCase(options.testCase, client);
}
-
GrpcEnvironment.Shutdown();
}
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
index 1a733450c1a..6c2da9d2ee6 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
@@ -55,8 +55,6 @@ namespace Grpc.IntegrationTesting
[TestFixtureSetUp]
public void Init()
{
- GrpcEnvironment.Initialize();
-
server = new Server();
server.AddServiceDefinition(TestService.BindService(new TestServiceImpl()));
int port = server.AddListeningPort(host, Server.PickUnusedPort, TestCredentials.CreateTestServerCredentials());
@@ -74,7 +72,6 @@ namespace Grpc.IntegrationTesting
public void Cleanup()
{
channel.Dispose();
-
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
index 87c3cbe1d4c..9475e66c408 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
@@ -88,8 +88,6 @@ namespace Grpc.IntegrationTesting
private void Run()
{
- GrpcEnvironment.Initialize();
-
var server = new Server();
server.AddServiceDefinition(TestService.BindService(new TestServiceImpl()));