add support for multiple cqs to GrpcThreadPool

pull/6712/head
Jan Tattermusch 9 years ago
parent 22ca12b83f
commit 5ee8e77522
  1. 5
      src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
  2. 13
      src/csharp/Grpc.Core/Channel.cs
  3. 23
      src/csharp/Grpc.Core/GrpcEnvironment.cs
  4. 8
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  5. 4
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  6. 64
      src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
  7. 10
      src/csharp/Grpc.Core/Internal/NativeMethods.cs
  8. 22
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  9. 21
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  10. 32
      src/csharp/Grpc.Core/Server.cs
  11. 11
      src/csharp/ext/grpc_csharp_ext.c

@ -32,7 +32,7 @@
#endregion
using System;
using System.Threading;
using System.Linq;
using Grpc.Core;
using NUnit.Framework;
@ -44,7 +44,8 @@ namespace Grpc.Core.Tests
public void InitializeAndShutdownGrpcEnvironment()
{
var env = GrpcEnvironment.AddRef();
Assert.IsNotNull(env.CompletionQueue);
Assert.AreEqual(1, env.CompletionQueues.Count);
Assert.IsNotNull(env.CompletionQueues.ElementAt(0));
GrpcEnvironment.Release();
}

@ -31,7 +31,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@ -56,6 +55,7 @@ namespace Grpc.Core
readonly string target;
readonly GrpcEnvironment environment;
readonly CompletionQueueSafeHandle completionQueue;
readonly ChannelSafeHandle handle;
readonly Dictionary<string, ChannelOption> options;
@ -75,6 +75,7 @@ namespace Grpc.Core
EnsureUserAgentChannelOption(this.options);
this.environment = GrpcEnvironment.AddRef();
this.completionQueue = this.environment.PickCompletionQueue();
using (var nativeCredentials = credentials.ToNativeCredentials())
using (var nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options.Values))
{
@ -135,7 +136,7 @@ namespace Grpc.Core
tcs.SetCanceled();
}
});
handle.WatchConnectivityState(lastObservedState, deadlineTimespec, environment.CompletionQueue, environment.CompletionRegistry, handler);
handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, environment.CompletionRegistry, handler);
return tcs.Task;
}
@ -231,6 +232,14 @@ namespace Grpc.Core
}
}
internal CompletionQueueSafeHandle CompletionQueue
{
get
{
return this.completionQueue;
}
}
internal void AddCallReference(object call)
{
activeCallCounter.Increment();

@ -32,8 +32,9 @@
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
@ -46,6 +47,7 @@ namespace Grpc.Core
public class GrpcEnvironment
{
const int MinDefaultThreadPoolSize = 4;
const int DefaultCompletionQueueCount = 1;
static object staticLock = new object();
static GrpcEnvironment instance;
@ -57,6 +59,7 @@ namespace Grpc.Core
readonly GrpcThreadPool threadPool;
readonly CompletionRegistry completionRegistry;
readonly DebugStats debugStats = new DebugStats();
readonly AtomicCounter cqPickerCounter = new AtomicCounter();
bool isClosed;
/// <summary>
@ -147,7 +150,7 @@ namespace Grpc.Core
{
GrpcNativeInit();
completionRegistry = new CompletionRegistry(this);
threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault());
threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), DefaultCompletionQueueCount);
threadPool.Start();
}
@ -163,16 +166,26 @@ namespace Grpc.Core
}
/// <summary>
/// Gets the completion queue used by this gRPC environment.
/// Gets the completion queues used by this gRPC environment.
/// </summary>
internal CompletionQueueSafeHandle CompletionQueue
internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get
{
return this.threadPool.CompletionQueue;
return this.threadPool.CompletionQueues;
}
}
/// <summary>
/// Picks a completion queue in a round-robin fashion.
/// Shouldn't be invoked on a per-call basis (used at per-channel basis).
/// </summary>
internal CompletionQueueSafeHandle PickCompletionQueue()
{
var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count);
return this.threadPool.CompletionQueues.ElementAt(cqIndex);
}
/// <summary>
/// Gets the completion queue used by this gRPC environment.
/// </summary>

@ -144,7 +144,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
Initialize(environment.CompletionQueue);
Initialize(details.Channel.CompletionQueue);
halfcloseRequested = true;
readingDone = true;
@ -171,7 +171,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
Initialize(environment.CompletionQueue);
Initialize(details.Channel.CompletionQueue);
readingDone = true;
@ -195,7 +195,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
Initialize(environment.CompletionQueue);
Initialize(details.Channel.CompletionQueue);
halfcloseRequested = true;
@ -220,7 +220,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
Initialize(environment.CompletionQueue);
Initialize(details.Channel.CompletionQueue);
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{

@ -56,9 +56,9 @@ namespace Grpc.Core.Internal
this.server = GrpcPreconditions.CheckNotNull(server);
}
public void Initialize(CallSafeHandle call)
public void Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue)
{
call.Initialize(environment.CompletionRegistry, environment.CompletionQueue);
call.Initialize(environment.CompletionRegistry, completionQueue);
server.AddCallReference(this);
InitializeInternal(call);

@ -33,15 +33,15 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// Pool of threads polling on the same completion queue.
/// Pool of threads polling on a set of completions queues.
/// </summary>
internal class GrpcThreadPool
{
@ -51,25 +51,31 @@ namespace Grpc.Core.Internal
readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>();
readonly int poolSize;
readonly int completionQueueCount;
CompletionQueueSafeHandle cq;
IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues;
public GrpcThreadPool(GrpcEnvironment environment, int poolSize)
/// <summary>
/// Creates a thread pool threads polling on a set of completions queues.
/// </summary>
/// <param name="environment">Environment.</param>
/// <param name="poolSize">Pool size.</param>
/// <param name="completionQueueCount">Completion queue count.</param>
public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount)
{
this.environment = environment;
this.poolSize = poolSize;
this.completionQueueCount = completionQueueCount;
GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
"Thread pool size cannot be smaller than the number of completion queues used.");
}
public void Start()
{
lock (myLock)
{
if (cq != null)
{
throw new InvalidOperationException("Already started.");
}
cq = CompletionQueueSafeHandle.Create();
GrpcPreconditions.CheckState(completionQueues == null, "Already started.");
completionQueues = CreateCompletionQueueList(completionQueueCount);
for (int i = 0; i < poolSize; i++)
{
@ -82,37 +88,47 @@ namespace Grpc.Core.Internal
{
lock (myLock)
{
cq.Shutdown();
foreach (var cq in completionQueues)
{
cq.Shutdown();
}
foreach (var thread in threads)
{
thread.Join();
}
cq.Dispose();
foreach (var cq in completionQueues)
{
cq.Dispose();
}
}
}
internal CompletionQueueSafeHandle CompletionQueue
internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get
{
return cq;
return completionQueues;
}
}
private Thread CreateAndStartThread(int i)
private Thread CreateAndStartThread(int threadIndex)
{
var thread = new Thread(new ThreadStart(RunHandlerLoop));
var cqIndex = threadIndex % completionQueues.Count;
var cq = completionQueues.ElementAt(cqIndex);
var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq)));
thread.IsBackground = false;
thread.Start();
thread.Name = "grpc " + i;
thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex);
return thread;
}
/// <summary>
/// Body of the polling thread.
/// </summary>
private void RunHandlerLoop()
private void RunHandlerLoop(CompletionQueueSafeHandle cq)
{
CompletionQueueEvent ev;
do
@ -135,5 +151,15 @@ namespace Grpc.Core.Internal
}
while (ev.type != CompletionQueueEvent.CompletionType.Shutdown);
}
private static IReadOnlyCollection<CompletionQueueSafeHandle> CreateCompletionQueueList(int completionQueueCount)
{
var list = new List<CompletionQueueSafeHandle>();
for (int i = 0; i < completionQueueCount; i++)
{
list.Add(CompletionQueueSafeHandle.Create());
}
return list.AsReadOnly();
}
}
}

@ -137,6 +137,7 @@ namespace Grpc.Core.Internal
public readonly Delegates.grpcsharp_server_credentials_release_delegate grpcsharp_server_credentials_release;
public readonly Delegates.grpcsharp_server_create_delegate grpcsharp_server_create;
public readonly Delegates.grpcsharp_server_register_completion_queue_delegate grpcsharp_server_register_completion_queue;
public readonly Delegates.grpcsharp_server_add_insecure_http2_port_delegate grpcsharp_server_add_insecure_http2_port;
public readonly Delegates.grpcsharp_server_add_secure_http2_port_delegate grpcsharp_server_add_secure_http2_port;
public readonly Delegates.grpcsharp_server_start_delegate grpcsharp_server_start;
@ -244,6 +245,7 @@ namespace Grpc.Core.Internal
this.grpcsharp_server_credentials_release = GetMethodDelegate<Delegates.grpcsharp_server_credentials_release_delegate>(library);
this.grpcsharp_server_create = GetMethodDelegate<Delegates.grpcsharp_server_create_delegate>(library);
this.grpcsharp_server_register_completion_queue = GetMethodDelegate<Delegates.grpcsharp_server_register_completion_queue_delegate>(library);
this.grpcsharp_server_add_insecure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_insecure_http2_port_delegate>(library);
this.grpcsharp_server_add_secure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_secure_http2_port_delegate>(library);
this.grpcsharp_server_start = GetMethodDelegate<Delegates.grpcsharp_server_start_delegate>(library);
@ -493,7 +495,8 @@ namespace Grpc.Core.Internal
public delegate ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create_delegate(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, bool forceClientAuth);
public delegate void grpcsharp_server_credentials_release_delegate(IntPtr credentials);
public delegate ServerSafeHandle grpcsharp_server_create_delegate(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
public delegate ServerSafeHandle grpcsharp_server_create_delegate(ChannelArgsSafeHandle args);
public delegate void grpcsharp_server_register_completion_queue_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq);
public delegate int grpcsharp_server_add_insecure_http2_port_delegate(ServerSafeHandle server, string addr);
public delegate int grpcsharp_server_add_secure_http2_port_delegate(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds);
public delegate void grpcsharp_server_start_delegate(ServerSafeHandle server);
@ -773,7 +776,10 @@ namespace Grpc.Core.Internal
// ServerSafeHandle
[DllImport("grpc_csharp_ext.dll")]
public static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
public static extern ServerSafeHandle grpcsharp_server_create(ChannelArgsSafeHandle args);
[DllImport("grpc_csharp_ext.dll")]
public static extern void grpcsharp_server_register_completion_queue(ServerSafeHandle server, CompletionQueueSafeHandle cq);
[DllImport("grpc_csharp_ext.dll")]
public static extern int grpcsharp_server_add_insecure_http2_port(ServerSafeHandle server, string addr);

@ -44,7 +44,7 @@ namespace Grpc.Core.Internal
{
internal interface IServerCallHandler
{
Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment);
Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment, CompletionQueueSafeHandle cq);
}
internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
@ -62,14 +62,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@ -121,14 +121,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@ -179,14 +179,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@ -237,14 +237,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@ -281,13 +281,13 @@ namespace Grpc.Core.Internal
{
public static readonly NoSuchMethodCallHandler Instance = new NoSuchMethodCallHandler();
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment, CompletionQueueSafeHandle cq)
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
(payload) => payload, (payload) => payload, environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);

@ -31,12 +31,6 @@
#endregion
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.InteropServices;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
@ -50,12 +44,17 @@ namespace Grpc.Core.Internal
{
}
public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args)
public static ServerSafeHandle NewServer(ChannelArgsSafeHandle args)
{
// Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
// Doing so would make object finalizer crash if we end up abandoning the handle.
GrpcEnvironment.GrpcNativeInit();
return Native.grpcsharp_server_create(cq, args);
return Native.grpcsharp_server_create(args);
}
public void RegisterCompletionQueue(CompletionQueueSafeHandle cq)
{
Native.grpcsharp_server_register_completion_queue(this, cq);
}
public int AddInsecurePort(string addr)
@ -77,14 +76,14 @@ namespace Grpc.Core.Internal
{
var ctx = BatchContextSafeHandle.Create();
environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_server_shutdown_and_notify_callback(this, environment.CompletionQueue, ctx);
Native.grpcsharp_server_shutdown_and_notify_callback(this, environment.PickCompletionQueue(), ctx);
}
public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment)
public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment, CompletionQueueSafeHandle completionQueue)
{
var ctx = BatchContextSafeHandle.Create();
environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_server_request_call(this, environment.CompletionQueue, ctx).CheckOk();
Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk();
}
protected override bool ReleaseHandle()

@ -34,8 +34,6 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
@ -48,7 +46,7 @@ namespace Grpc.Core
/// </summary>
public class Server
{
const int InitialAllowRpcTokenCount = 10;
const int InitialAllowRpcTokenCountPerCq = 10;
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
@ -80,7 +78,12 @@ namespace Grpc.Core
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
{
this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs);
this.handle = ServerSafeHandle.NewServer(channelArgs);
}
foreach (var cq in environment.CompletionQueues)
{
this.handle.RegisterCompletionQueue(cq);
}
}
@ -133,9 +136,12 @@ namespace Grpc.Core
// Starting with more than one AllowOneRpc tokens can significantly increase
// unary RPC throughput.
for (int i = 0; i < InitialAllowRpcTokenCount; i++)
for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++)
{
AllowOneRpc();
foreach (var cq in environment.CompletionQueues)
{
AllowOneRpc(cq);
}
}
}
}
@ -244,11 +250,11 @@ namespace Grpc.Core
/// <summary>
/// Allows one new RPC call to be received by server.
/// </summary>
private void AllowOneRpc()
private void AllowOneRpc(CompletionQueueSafeHandle cq)
{
if (!shutdownRequested)
{
handle.RequestCall(HandleNewServerRpc, environment);
handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), environment, cq);
}
}
@ -265,7 +271,7 @@ namespace Grpc.Core
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
private async Task HandleCallAsync(ServerRpcNew newRpc)
private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
try
{
@ -274,7 +280,7 @@ namespace Grpc.Core
{
callHandler = NoSuchMethodCallHandler.Instance;
}
await callHandler.HandleCall(newRpc, environment).ConfigureAwait(false);
await callHandler.HandleCall(newRpc, environment, cq).ConfigureAwait(false);
}
catch (Exception e)
{
@ -285,9 +291,9 @@ namespace Grpc.Core
/// <summary>
/// Handles the native callback.
/// </summary>
private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx)
private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq)
{
Task.Run(() => AllowOneRpc());
Task.Run(() => AllowOneRpc(cq));
if (success)
{
@ -296,7 +302,7 @@ namespace Grpc.Core
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
{
HandleCallAsync(newRpc); // we don't need to await.
HandleCallAsync(newRpc, cq); // we don't need to await.
}
}
}

@ -806,11 +806,14 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_set_credentials(
/* Server */
GPR_EXPORT grpc_server *GPR_CALLTYPE
grpcsharp_server_create(grpc_completion_queue *cq,
const grpc_channel_args *args) {
grpc_server *server = grpc_server_create(args, NULL);
grpcsharp_server_create(const grpc_channel_args *args) {
return grpc_server_create(args, NULL);
}
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq) {
grpc_server_register_completion_queue(server, cq, NULL);
return server;
}
GPR_EXPORT int32_t GPR_CALLTYPE

Loading…
Cancel
Save