Merge pull request #6712 from jtattermusch/csharp_more_completion_queues

C# add support for multiple completion queues in GrpcThreadPool.
pull/6724/head
Jan Tattermusch 9 years ago
commit 5b521d2cbe
  1. 24
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  2. 10
      src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
  3. 8
      src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
  4. 4
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
  5. 10
      src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs
  6. 13
      src/csharp/Grpc.Core/Channel.cs
  7. 53
      src/csharp/Grpc.Core/GrpcEnvironment.cs
  8. 12
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  9. 4
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  10. 6
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  11. 26
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  12. 9
      src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
  13. 16
      src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
  14. 68
      src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
  15. 11
      src/csharp/Grpc.Core/Internal/NativeMethods.cs
  16. 32
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  17. 27
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  18. 39
      src/csharp/Grpc.Core/Server.cs
  19. 39
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs
  20. 11
      src/csharp/ext/grpc_csharp_ext.c

@ -235,8 +235,16 @@ namespace Grpc.Core.Tests
await barrier.Task; // make sure the handler has started.
cts.Cancel();
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync);
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
try
{
// cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
await call.ResponseAsync;
Assert.Fail();
}
catch (RpcException ex)
{
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
}
[Test]
@ -265,9 +273,15 @@ namespace Grpc.Core.Tests
await handlerStartedBarrier.Task;
cts.Cancel();
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync);
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
try
{
await call.ResponseAsync;
Assert.Fail();
}
catch (RpcException ex)
{
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
Assert.AreEqual("SUCCESS", await successTcs.Task);
}

@ -105,7 +105,15 @@ namespace Grpc.Core.Tests
var parentCall = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
await readyToCancelTcs.Task;
cts.Cancel();
Assert.ThrowsAsync(typeof(RpcException), async () => await parentCall);
try
{
// cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
await parentCall;
Assert.Fail();
}
catch (RpcException)
{
}
Assert.AreEqual("CHILD_CALL_CANCELLED", await successTcs.Task);
}

@ -32,7 +32,7 @@
#endregion
using System;
using System.Threading;
using System.Linq;
using Grpc.Core;
using NUnit.Framework;
@ -44,7 +44,11 @@ namespace Grpc.Core.Tests
public void InitializeAndShutdownGrpcEnvironment()
{
var env = GrpcEnvironment.AddRef();
Assert.IsNotNull(env.CompletionQueue);
Assert.IsTrue(env.CompletionQueues.Count > 0);
for (int i = 0; i < env.CompletionQueues.Count; i++)
{
Assert.IsNotNull(env.CompletionQueues.ElementAt(i));
}
GrpcEnvironment.Release();
}

@ -53,8 +53,6 @@ namespace Grpc.Core.Internal.Tests
[SetUp]
public void Init()
{
var environment = GrpcEnvironment.AddRef();
// Create a fake server just so we have an instance to refer to.
// The server won't actually be used at all.
server = new Server()
@ -66,7 +64,6 @@ namespace Grpc.Core.Internal.Tests
fakeCall = new FakeNativeCall();
asyncCallServer = new AsyncCallServer<string, string>(
Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer,
environment,
server);
asyncCallServer.InitializeForTesting(fakeCall);
}
@ -75,7 +72,6 @@ namespace Grpc.Core.Internal.Tests
public void Cleanup()
{
server.ShutdownAsync().Wait();
GrpcEnvironment.Release();
}
[Test]

@ -134,7 +134,15 @@ namespace Grpc.Core.Tests
{
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
Assert.ThrowsAsync<IOException>(async () => await requestStream.MoveNext());
try
{
// cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
await requestStream.MoveNext();
Assert.Fail();
}
catch (IOException)
{
}
return "RESPONSE";
});

@ -31,7 +31,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -56,6 +55,7 @@ namespace Grpc.Core
readonly string target;
readonly GrpcEnvironment environment;
readonly CompletionQueueSafeHandle completionQueue;
readonly ChannelSafeHandle handle;
readonly Dictionary<string, ChannelOption> options;
@ -75,6 +75,7 @@ namespace Grpc.Core
EnsureUserAgentChannelOption(this.options);
this.environment = GrpcEnvironment.AddRef();
this.completionQueue = this.environment.PickCompletionQueue();
using (var nativeCredentials = credentials.ToNativeCredentials())
using (var nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options.Values))
{
@ -135,7 +136,7 @@ namespace Grpc.Core
tcs.SetCanceled();
}
});
handle.WatchConnectivityState(lastObservedState, deadlineTimespec, environment.CompletionQueue, environment.CompletionRegistry, handler);
handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, handler);
return tcs.Task;
}
@ -231,6 +232,14 @@ namespace Grpc.Core
}
}
internal CompletionQueueSafeHandle CompletionQueue
{
get
{
return this.completionQueue;
}
}
internal void AddCallReference(object call)
{
activeCallCounter.Increment();

@ -32,8 +32,9 @@
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
@ -51,12 +52,13 @@ namespace Grpc.Core
static GrpcEnvironment instance;
static int refCount;
static int? customThreadPoolSize;
static int? customCompletionQueueCount;
static ILogger logger = new ConsoleLogger();
readonly GrpcThreadPool threadPool;
readonly CompletionRegistry completionRegistry;
readonly DebugStats debugStats = new DebugStats();
readonly AtomicCounter cqPickerCounter = new AtomicCounter();
bool isClosed;
/// <summary>
@ -140,37 +142,52 @@ namespace Grpc.Core
}
}
/// <summary>
/// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events.
/// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
/// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing.
/// Most users should rely on the default value provided by gRPC library.
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.
/// </summary>
public static void SetCompletionQueueCount(int completionQueueCount)
{
lock (staticLock)
{
GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
GrpcPreconditions.CheckArgument(completionQueueCount > 0, "threadCount needs to be a positive number");
customCompletionQueueCount = completionQueueCount;
}
}
/// <summary>
/// Creates gRPC environment.
/// </summary>
private GrpcEnvironment()
{
GrpcNativeInit();
completionRegistry = new CompletionRegistry(this);
threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault());
threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault());
threadPool.Start();
}
/// <summary>
/// Gets the completion registry used by this gRPC environment.
/// Gets the completion queues used by this gRPC environment.
/// </summary>
internal CompletionRegistry CompletionRegistry
internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get
{
return this.completionRegistry;
return this.threadPool.CompletionQueues;
}
}
/// <summary>
/// Gets the completion queue used by this gRPC environment.
/// Picks a completion queue in a round-robin fashion.
/// Shouldn't be invoked on a per-call basis (used at per-channel basis).
/// </summary>
internal CompletionQueueSafeHandle CompletionQueue
internal CompletionQueueSafeHandle PickCompletionQueue()
{
get
{
return this.threadPool.CompletionQueue;
}
var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count);
return this.threadPool.CompletionQueues.ElementAt(cqIndex);
}
/// <summary>
@ -230,5 +247,15 @@ namespace Grpc.Core
// more work, but seems to work reasonably well for a start.
return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2);
}
private int GetCompletionQueueCountOrDefault()
{
if (customCompletionQueueCount.HasValue)
{
return customCompletionQueueCount.Value;
}
// by default, create a completion queue for each thread
return GetThreadPoolSizeOrDefault();
}
}
}

@ -64,7 +64,7 @@ namespace Grpc.Core.Internal
ClientSideStatus? finishedStatus;
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
this.details = callDetails.WithOptions(callDetails.Options.Normalize());
this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
@ -141,7 +141,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
Initialize(environment.CompletionQueue);
Initialize(details.Channel.CompletionQueue);
halfcloseRequested = true;
readingDone = true;
@ -168,7 +168,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
Initialize(environment.CompletionQueue);
Initialize(details.Channel.CompletionQueue);
readingDone = true;
@ -192,7 +192,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
Initialize(environment.CompletionQueue);
Initialize(details.Channel.CompletionQueue);
halfcloseRequested = true;
@ -217,7 +217,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
Initialize(environment.CompletionQueue);
Initialize(details.Channel.CompletionQueue);
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
@ -406,7 +406,7 @@ namespace Grpc.Core.Internal
var credentials = details.Options.Credentials;
using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
{
var result = details.Channel.Handle.CreateCall(environment.CompletionRegistry,
var result = details.Channel.Handle.CreateCall(
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
return result;

@ -58,7 +58,6 @@ namespace Grpc.Core.Internal
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
protected readonly GrpcEnvironment environment;
protected readonly object myLock = new object();
protected INativeCall call;
@ -78,11 +77,10 @@ namespace Grpc.Core.Internal
protected bool initialMetadataSent;
protected long streamingWritesCounter; // Number of streaming send operations started so far.
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment)
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = GrpcPreconditions.CheckNotNull(serializer);
this.deserializer = GrpcPreconditions.CheckNotNull(deserializer);
this.environment = GrpcPreconditions.CheckNotNull(environment);
}
/// <summary>

@ -51,14 +51,14 @@ namespace Grpc.Core.Internal
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
readonly Server server;
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer, environment)
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, Server server) : base(serializer, deserializer)
{
this.server = GrpcPreconditions.CheckNotNull(server);
}
public void Initialize(CallSafeHandle call)
public void Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue)
{
call.Initialize(environment.CompletionRegistry, environment.CompletionQueue);
call.Initialize(completionQueue);
server.AddCallReference(this);
InitializeInternal(call);

@ -47,16 +47,14 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionRegistry completionRegistry;
CompletionQueueSafeHandle completionQueue;
private CallSafeHandle()
{
}
public void Initialize(CompletionRegistry completionRegistry, CompletionQueueSafeHandle completionQueue)
public void Initialize(CompletionQueueSafeHandle completionQueue)
{
this.completionRegistry = completionRegistry;
this.completionQueue = completionQueue;
}
@ -70,7 +68,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
@ -90,7 +88,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
}
@ -100,7 +98,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
}
@ -110,7 +108,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
}
@ -120,7 +118,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
}
@ -130,7 +128,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
}
@ -142,7 +140,7 @@ namespace Grpc.Core.Internal
{
var ctx = BatchContextSafeHandle.Create();
var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata,
optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
}
@ -153,7 +151,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
Native.grpcsharp_call_recv_message(this, ctx).CheckOk();
}
}
@ -163,7 +161,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
}
}
@ -173,7 +171,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
Native.grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
}
@ -183,7 +181,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
}
}

@ -63,7 +63,7 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials)
public CallSafeHandle CreateCall(CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials)
{
using (Profilers.ForCurrentThread().NewScope("ChannelSafeHandle.CreateCall"))
{
@ -72,7 +72,7 @@ namespace Grpc.Core.Internal
{
result.SetCredentials(credentials);
}
result.Initialize(registry, cq);
result.Initialize(cq);
return result;
}
}
@ -82,11 +82,10 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0);
}
public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq,
CompletionRegistry completionRegistry, BatchCompletionDelegate callback)
public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx);
}

@ -45,6 +45,7 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
AtomicCounter shutdownRefcount = new AtomicCounter(1);
CompletionRegistry completionRegistry;
private CompletionQueueSafeHandle()
{
@ -53,7 +54,13 @@ namespace Grpc.Core.Internal
public static CompletionQueueSafeHandle Create()
{
return Native.grpcsharp_completion_queue_create();
}
public static CompletionQueueSafeHandle Create(CompletionRegistry completionRegistry)
{
var cq = Native.grpcsharp_completion_queue_create();
cq.completionRegistry = completionRegistry;
return cq;
}
public CompletionQueueEvent Next()
@ -83,6 +90,15 @@ namespace Grpc.Core.Internal
DecrementShutdownRefcount();
}
/// <summary>
/// Completion registry associated with this completion queue.
/// Doesn't need to be set if only using Pluck() operations.
/// </summary>
public CompletionRegistry CompletionRegistry
{
get { return completionRegistry; }
}
protected override bool ReleaseHandle()
{
Native.grpcsharp_completion_queue_destroy(handle);

@ -33,15 +33,15 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// Pool of threads polling on the same completion queue.
/// Pool of threads polling on a set of completions queues.
/// </summary>
internal class GrpcThreadPool
{
@ -51,25 +51,31 @@ namespace Grpc.Core.Internal
readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>();
readonly int poolSize;
readonly int completionQueueCount;
CompletionQueueSafeHandle cq;
IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues;
public GrpcThreadPool(GrpcEnvironment environment, int poolSize)
/// <summary>
/// Creates a thread pool threads polling on a set of completions queues.
/// </summary>
/// <param name="environment">Environment.</param>
/// <param name="poolSize">Pool size.</param>
/// <param name="completionQueueCount">Completion queue count.</param>
public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount)
{
this.environment = environment;
this.poolSize = poolSize;
this.completionQueueCount = completionQueueCount;
GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
"Thread pool size cannot be smaller than the number of completion queues used.");
}
public void Start()
{
lock (myLock)
{
if (cq != null)
{
throw new InvalidOperationException("Already started.");
}
cq = CompletionQueueSafeHandle.Create();
GrpcPreconditions.CheckState(completionQueues == null, "Already started.");
completionQueues = CreateCompletionQueueList(environment, completionQueueCount);
for (int i = 0; i < poolSize; i++)
{
@ -82,37 +88,48 @@ namespace Grpc.Core.Internal
{
lock (myLock)
{
cq.Shutdown();
foreach (var cq in completionQueues)
{
cq.Shutdown();
}
foreach (var thread in threads)
{
thread.Join();
}
cq.Dispose();
foreach (var cq in completionQueues)
{
cq.Dispose();
}
}
}
internal CompletionQueueSafeHandle CompletionQueue
internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get
{
return cq;
return completionQueues;
}
}
private Thread CreateAndStartThread(int i)
private Thread CreateAndStartThread(int threadIndex)
{
var thread = new Thread(new ThreadStart(RunHandlerLoop));
var cqIndex = threadIndex % completionQueues.Count;
var cq = completionQueues.ElementAt(cqIndex);
var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq)));
thread.IsBackground = false;
thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex);
thread.Start();
thread.Name = "grpc " + i;
return thread;
}
/// <summary>
/// Body of the polling thread.
/// </summary>
private void RunHandlerLoop()
private void RunHandlerLoop(CompletionQueueSafeHandle cq)
{
CompletionQueueEvent ev;
do
@ -124,7 +141,7 @@ namespace Grpc.Core.Internal
IntPtr tag = ev.tag;
try
{
var callback = environment.CompletionRegistry.Extract(tag);
var callback = cq.CompletionRegistry.Extract(tag);
callback(success);
}
catch (Exception e)
@ -135,5 +152,16 @@ namespace Grpc.Core.Internal
}
while (ev.type != CompletionQueueEvent.CompletionType.Shutdown);
}
private static IReadOnlyCollection<CompletionQueueSafeHandle> CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount)
{
var list = new List<CompletionQueueSafeHandle>();
for (int i = 0; i < completionQueueCount; i++)
{
var completionRegistry = new CompletionRegistry(environment);
list.Add(CompletionQueueSafeHandle.Create(completionRegistry));
}
return list.AsReadOnly();
}
}
}

@ -137,6 +137,7 @@ namespace Grpc.Core.Internal
public readonly Delegates.grpcsharp_server_credentials_release_delegate grpcsharp_server_credentials_release;
public readonly Delegates.grpcsharp_server_create_delegate grpcsharp_server_create;
public readonly Delegates.grpcsharp_server_register_completion_queue_delegate grpcsharp_server_register_completion_queue;
public readonly Delegates.grpcsharp_server_add_insecure_http2_port_delegate grpcsharp_server_add_insecure_http2_port;
public readonly Delegates.grpcsharp_server_add_secure_http2_port_delegate grpcsharp_server_add_secure_http2_port;
public readonly Delegates.grpcsharp_server_start_delegate grpcsharp_server_start;
@ -244,6 +245,7 @@ namespace Grpc.Core.Internal
this.grpcsharp_server_credentials_release = GetMethodDelegate<Delegates.grpcsharp_server_credentials_release_delegate>(library);
this.grpcsharp_server_create = GetMethodDelegate<Delegates.grpcsharp_server_create_delegate>(library);
this.grpcsharp_server_register_completion_queue = GetMethodDelegate<Delegates.grpcsharp_server_register_completion_queue_delegate>(library);
this.grpcsharp_server_add_insecure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_insecure_http2_port_delegate>(library);
this.grpcsharp_server_add_secure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_secure_http2_port_delegate>(library);
this.grpcsharp_server_start = GetMethodDelegate<Delegates.grpcsharp_server_start_delegate>(library);
@ -348,6 +350,7 @@ namespace Grpc.Core.Internal
this.grpcsharp_server_credentials_release = PInvokeMethods.grpcsharp_server_credentials_release;
this.grpcsharp_server_create = PInvokeMethods.grpcsharp_server_create;
this.grpcsharp_server_register_completion_queue = PInvokeMethods.grpcsharp_server_register_completion_queue;
this.grpcsharp_server_add_insecure_http2_port = PInvokeMethods.grpcsharp_server_add_insecure_http2_port;
this.grpcsharp_server_add_secure_http2_port = PInvokeMethods.grpcsharp_server_add_secure_http2_port;
this.grpcsharp_server_start = PInvokeMethods.grpcsharp_server_start;
@ -493,7 +496,8 @@ namespace Grpc.Core.Internal
public delegate ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create_delegate(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, bool forceClientAuth);
public delegate void grpcsharp_server_credentials_release_delegate(IntPtr credentials);
public delegate ServerSafeHandle grpcsharp_server_create_delegate(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
public delegate ServerSafeHandle grpcsharp_server_create_delegate(ChannelArgsSafeHandle args);
public delegate void grpcsharp_server_register_completion_queue_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq);
public delegate int grpcsharp_server_add_insecure_http2_port_delegate(ServerSafeHandle server, string addr);
public delegate int grpcsharp_server_add_secure_http2_port_delegate(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds);
public delegate void grpcsharp_server_start_delegate(ServerSafeHandle server);
@ -773,7 +777,10 @@ namespace Grpc.Core.Internal
// ServerSafeHandle
[DllImport("grpc_csharp_ext.dll")]
public static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
public static extern ServerSafeHandle grpcsharp_server_create(ChannelArgsSafeHandle args);
[DllImport("grpc_csharp_ext.dll")]
public static extern void grpcsharp_server_register_completion_queue(ServerSafeHandle server, CompletionQueueSafeHandle cq);
[DllImport("grpc_csharp_ext.dll")]
public static extern int grpcsharp_server_add_insecure_http2_port(ServerSafeHandle server, string addr);

@ -44,7 +44,7 @@ namespace Grpc.Core.Internal
{
internal interface IServerCallHandler
{
Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment);
Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq);
}
internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
@ -62,14 +62,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment, newRpc.Server);
newRpc.Server);
asyncCall.Initialize(newRpc.Call);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@ -121,14 +121,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment, newRpc.Server);
newRpc.Server);
asyncCall.Initialize(newRpc.Call);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@ -179,14 +179,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment, newRpc.Server);
newRpc.Server);
asyncCall.Initialize(newRpc.Call);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@ -237,14 +237,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment, newRpc.Server);
newRpc.Server);
asyncCall.Initialize(newRpc.Call);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@ -281,13 +281,13 @@ namespace Grpc.Core.Internal
{
public static readonly NoSuchMethodCallHandler Instance = new NoSuchMethodCallHandler();
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
(payload) => payload, (payload) => payload, environment, newRpc.Server);
(payload) => payload, (payload) => payload, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);

@ -31,12 +31,6 @@
#endregion
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.InteropServices;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
@ -50,12 +44,17 @@ namespace Grpc.Core.Internal
{
}
public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args)
public static ServerSafeHandle NewServer(ChannelArgsSafeHandle args)
{
// Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
// Doing so would make object finalizer crash if we end up abandoning the handle.
GrpcEnvironment.GrpcNativeInit();
return Native.grpcsharp_server_create(cq, args);
return Native.grpcsharp_server_create(args);
}
public void RegisterCompletionQueue(CompletionQueueSafeHandle cq)
{
Native.grpcsharp_server_register_completion_queue(this, cq);
}
public int AddInsecurePort(string addr)
@ -73,18 +72,18 @@ namespace Grpc.Core.Internal
Native.grpcsharp_server_start(this);
}
public void ShutdownAndNotify(BatchCompletionDelegate callback, GrpcEnvironment environment)
public void ShutdownAndNotify(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{
var ctx = BatchContextSafeHandle.Create();
environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_server_shutdown_and_notify_callback(this, environment.CompletionQueue, ctx);
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx);
}
public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment)
public void RequestCall(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{
var ctx = BatchContextSafeHandle.Create();
environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_server_request_call(this, environment.CompletionQueue, ctx).CheckOk();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk();
}
protected override bool ReleaseHandle()

@ -34,8 +34,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
@ -48,7 +47,7 @@ namespace Grpc.Core
/// </summary>
public class Server
{
const int InitialAllowRpcTokenCount = 10;
const int InitialAllowRpcTokenCountPerCq = 10;
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
@ -80,7 +79,12 @@ namespace Grpc.Core
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
{
this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs);
this.handle = ServerSafeHandle.NewServer(channelArgs);
}
foreach (var cq in environment.CompletionQueues)
{
this.handle.RegisterCompletionQueue(cq);
}
}
@ -133,9 +137,12 @@ namespace Grpc.Core
// Starting with more than one AllowOneRpc tokens can significantly increase
// unary RPC throughput.
for (int i = 0; i < InitialAllowRpcTokenCount; i++)
for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++)
{
AllowOneRpc();
foreach (var cq in environment.CompletionQueues)
{
AllowOneRpc(cq);
}
}
}
}
@ -154,7 +161,8 @@ namespace Grpc.Core
shutdownRequested = true;
}
handle.ShutdownAndNotify(HandleServerShutdown, environment);
var cq = environment.CompletionQueues.First(); // any cq will do
handle.ShutdownAndNotify(HandleServerShutdown, cq);
await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle();
@ -174,7 +182,8 @@ namespace Grpc.Core
shutdownRequested = true;
}
handle.ShutdownAndNotify(HandleServerShutdown, environment);
var cq = environment.CompletionQueues.First(); // any cq will do
handle.ShutdownAndNotify(HandleServerShutdown, cq);
handle.CancelAllCalls();
await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle();
@ -244,11 +253,11 @@ namespace Grpc.Core
/// <summary>
/// Allows one new RPC call to be received by server.
/// </summary>
private void AllowOneRpc()
private void AllowOneRpc(CompletionQueueSafeHandle cq)
{
if (!shutdownRequested)
{
handle.RequestCall(HandleNewServerRpc, environment);
handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq);
}
}
@ -265,7 +274,7 @@ namespace Grpc.Core
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
private async Task HandleCallAsync(ServerRpcNew newRpc)
private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
try
{
@ -274,7 +283,7 @@ namespace Grpc.Core
{
callHandler = NoSuchMethodCallHandler.Instance;
}
await callHandler.HandleCall(newRpc, environment).ConfigureAwait(false);
await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false);
}
catch (Exception e)
{
@ -285,9 +294,9 @@ namespace Grpc.Core
/// <summary>
/// Handles the native callback.
/// </summary>
private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx)
private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq)
{
Task.Run(() => AllowOneRpc());
Task.Run(() => AllowOneRpc(cq));
if (success)
{
@ -296,7 +305,7 @@ namespace Grpc.Core
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
{
HandleCallAsync(newRpc); // we don't need to await.
HandleCallAsync(newRpc, cq); // we don't need to await.
}
}
}

@ -471,8 +471,16 @@ namespace Grpc.IntegrationTesting
cts.Cancel();
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
try
{
// cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
await call.ResponseStream.MoveNext();
Assert.Fail();
}
catch (RpcException ex)
{
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
}
Console.WriteLine("Passed!");
}
@ -497,9 +505,16 @@ namespace Grpc.IntegrationTesting
// Deadline was reached before write has started. Eat the exception and continue.
}
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.
Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
try
{
await call.ResponseStream.MoveNext();
Assert.Fail();
}
catch (RpcException ex)
{
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.
Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
}
Console.WriteLine("Passed!");
}
@ -577,9 +592,17 @@ namespace Grpc.IntegrationTesting
await call.RequestStream.WriteAsync(request);
await call.RequestStream.CompleteAsync();
var e = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.ToListAsync());
Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
Assert.AreEqual(echoStatus.Message, e.Status.Detail);
try
{
// cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
await call.ResponseStream.ToListAsync();
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
Assert.AreEqual(echoStatus.Message, e.Status.Detail);
}
}
Console.WriteLine("Passed!");

@ -806,11 +806,14 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_set_credentials(
/* Server */
GPR_EXPORT grpc_server *GPR_CALLTYPE
grpcsharp_server_create(grpc_completion_queue *cq,
const grpc_channel_args *args) {
grpc_server *server = grpc_server_create(args, NULL);
grpcsharp_server_create(const grpc_channel_args *args) {
return grpc_server_create(args, NULL);
}
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq) {
grpc_server_register_completion_queue(server, cq, NULL);
return server;
}
GPR_EXPORT int32_t GPR_CALLTYPE

Loading…
Cancel
Save