From 5ee8e77522f673611a8cba40ac03a705382ad2b2 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 24 May 2016 16:17:10 -0400 Subject: [PATCH] add support for multiple cqs to GrpcThreadPool --- .../Grpc.Core.Tests/GrpcEnvironmentTest.cs | 5 +- src/csharp/Grpc.Core/Channel.cs | 13 +++- src/csharp/Grpc.Core/GrpcEnvironment.cs | 23 +++++-- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 8 +-- .../Grpc.Core/Internal/AsyncCallServer.cs | 4 +- .../Grpc.Core/Internal/GrpcThreadPool.cs | 64 +++++++++++++------ .../Grpc.Core/Internal/NativeMethods.cs | 10 ++- .../Grpc.Core/Internal/ServerCallHandler.cs | 22 +++---- .../Grpc.Core/Internal/ServerSafeHandle.cs | 21 +++--- src/csharp/Grpc.Core/Server.cs | 32 ++++++---- src/csharp/ext/grpc_csharp_ext.c | 11 ++-- 11 files changed, 138 insertions(+), 75 deletions(-) diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs index ab12c120cba..ca29667c95b 100644 --- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs +++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs @@ -32,7 +32,7 @@ #endregion using System; -using System.Threading; +using System.Linq; using Grpc.Core; using NUnit.Framework; @@ -44,7 +44,8 @@ namespace Grpc.Core.Tests public void InitializeAndShutdownGrpcEnvironment() { var env = GrpcEnvironment.AddRef(); - Assert.IsNotNull(env.CompletionQueue); + Assert.AreEqual(1, env.CompletionQueues.Count); + Assert.IsNotNull(env.CompletionQueues.ElementAt(0)); GrpcEnvironment.Release(); } diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 93a6e6a3d95..f2211111f6c 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -31,7 +31,6 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -56,6 +55,7 @@ namespace Grpc.Core readonly string target; readonly GrpcEnvironment environment; + readonly CompletionQueueSafeHandle completionQueue; readonly ChannelSafeHandle handle; readonly Dictionary options; @@ -75,6 +75,7 @@ namespace Grpc.Core EnsureUserAgentChannelOption(this.options); this.environment = GrpcEnvironment.AddRef(); + this.completionQueue = this.environment.PickCompletionQueue(); using (var nativeCredentials = credentials.ToNativeCredentials()) using (var nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options.Values)) { @@ -135,7 +136,7 @@ namespace Grpc.Core tcs.SetCanceled(); } }); - handle.WatchConnectivityState(lastObservedState, deadlineTimespec, environment.CompletionQueue, environment.CompletionRegistry, handler); + handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, environment.CompletionRegistry, handler); return tcs.Task; } @@ -231,6 +232,14 @@ namespace Grpc.Core } } + internal CompletionQueueSafeHandle CompletionQueue + { + get + { + return this.completionQueue; + } + } + internal void AddCallReference(object call) { activeCallCounter.Increment(); diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index bee0ef1d62e..2851587567f 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -32,8 +32,9 @@ #endregion using System; +using System.Collections.Generic; +using System.Linq; using System.Runtime.InteropServices; -using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Logging; using Grpc.Core.Utils; @@ -46,6 +47,7 @@ namespace Grpc.Core public class GrpcEnvironment { const int MinDefaultThreadPoolSize = 4; + const int DefaultCompletionQueueCount = 1; static object staticLock = new object(); static GrpcEnvironment instance; @@ -57,6 +59,7 @@ namespace Grpc.Core readonly GrpcThreadPool threadPool; readonly CompletionRegistry completionRegistry; readonly DebugStats debugStats = new DebugStats(); + readonly AtomicCounter cqPickerCounter = new AtomicCounter(); bool isClosed; /// @@ -147,7 +150,7 @@ namespace Grpc.Core { GrpcNativeInit(); completionRegistry = new CompletionRegistry(this); - threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault()); + threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), DefaultCompletionQueueCount); threadPool.Start(); } @@ -163,16 +166,26 @@ namespace Grpc.Core } /// - /// Gets the completion queue used by this gRPC environment. + /// Gets the completion queues used by this gRPC environment. /// - internal CompletionQueueSafeHandle CompletionQueue + internal IReadOnlyCollection CompletionQueues { get { - return this.threadPool.CompletionQueue; + return this.threadPool.CompletionQueues; } } + /// + /// Picks a completion queue in a round-robin fashion. + /// Shouldn't be invoked on a per-call basis (used at per-channel basis). + /// + internal CompletionQueueSafeHandle PickCompletionQueue() + { + var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count); + return this.threadPool.CompletionQueues.ElementAt(cqIndex); + } + /// /// Gets the completion queue used by this gRPC environment. /// diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 55351869b5c..543c7b584f6 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -144,7 +144,7 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(!started); started = true; - Initialize(environment.CompletionQueue); + Initialize(details.Channel.CompletionQueue); halfcloseRequested = true; readingDone = true; @@ -171,7 +171,7 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(!started); started = true; - Initialize(environment.CompletionQueue); + Initialize(details.Channel.CompletionQueue); readingDone = true; @@ -195,7 +195,7 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(!started); started = true; - Initialize(environment.CompletionQueue); + Initialize(details.Channel.CompletionQueue); halfcloseRequested = true; @@ -220,7 +220,7 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(!started); started = true; - Initialize(environment.CompletionQueue); + Initialize(details.Channel.CompletionQueue); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index b1566b44a7c..cb9a9d4cd6f 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -56,9 +56,9 @@ namespace Grpc.Core.Internal this.server = GrpcPreconditions.CheckNotNull(server); } - public void Initialize(CallSafeHandle call) + public void Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue) { - call.Initialize(environment.CompletionRegistry, environment.CompletionQueue); + call.Initialize(environment.CompletionRegistry, completionQueue); server.AddCallReference(this); InitializeInternal(call); diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index b538726fa1b..27dcea71188 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -33,15 +33,15 @@ using System; using System.Collections.Generic; -using System.Runtime.InteropServices; +using System.Linq; using System.Threading; -using System.Threading.Tasks; using Grpc.Core.Logging; +using Grpc.Core.Utils; namespace Grpc.Core.Internal { /// - /// Pool of threads polling on the same completion queue. + /// Pool of threads polling on a set of completions queues. /// internal class GrpcThreadPool { @@ -51,25 +51,31 @@ namespace Grpc.Core.Internal readonly object myLock = new object(); readonly List threads = new List(); readonly int poolSize; + readonly int completionQueueCount; - CompletionQueueSafeHandle cq; + IReadOnlyCollection completionQueues; - public GrpcThreadPool(GrpcEnvironment environment, int poolSize) + /// + /// Creates a thread pool threads polling on a set of completions queues. + /// + /// Environment. + /// Pool size. + /// Completion queue count. + public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount) { this.environment = environment; this.poolSize = poolSize; + this.completionQueueCount = completionQueueCount; + GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount, + "Thread pool size cannot be smaller than the number of completion queues used."); } public void Start() { lock (myLock) { - if (cq != null) - { - throw new InvalidOperationException("Already started."); - } - - cq = CompletionQueueSafeHandle.Create(); + GrpcPreconditions.CheckState(completionQueues == null, "Already started."); + completionQueues = CreateCompletionQueueList(completionQueueCount); for (int i = 0; i < poolSize; i++) { @@ -82,37 +88,47 @@ namespace Grpc.Core.Internal { lock (myLock) { - cq.Shutdown(); + foreach (var cq in completionQueues) + { + cq.Shutdown(); + } + foreach (var thread in threads) { thread.Join(); } - cq.Dispose(); + foreach (var cq in completionQueues) + { + cq.Dispose(); + } } } - internal CompletionQueueSafeHandle CompletionQueue + internal IReadOnlyCollection CompletionQueues { get { - return cq; + return completionQueues; } } - private Thread CreateAndStartThread(int i) + private Thread CreateAndStartThread(int threadIndex) { - var thread = new Thread(new ThreadStart(RunHandlerLoop)); + var cqIndex = threadIndex % completionQueues.Count; + var cq = completionQueues.ElementAt(cqIndex); + + var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq))); thread.IsBackground = false; thread.Start(); - thread.Name = "grpc " + i; + thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex); return thread; } /// /// Body of the polling thread. /// - private void RunHandlerLoop() + private void RunHandlerLoop(CompletionQueueSafeHandle cq) { CompletionQueueEvent ev; do @@ -135,5 +151,15 @@ namespace Grpc.Core.Internal } while (ev.type != CompletionQueueEvent.CompletionType.Shutdown); } + + private static IReadOnlyCollection CreateCompletionQueueList(int completionQueueCount) + { + var list = new List(); + for (int i = 0; i < completionQueueCount; i++) + { + list.Add(CompletionQueueSafeHandle.Create()); + } + return list.AsReadOnly(); + } } } diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index 42fd4d4dc68..786b2252467 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -137,6 +137,7 @@ namespace Grpc.Core.Internal public readonly Delegates.grpcsharp_server_credentials_release_delegate grpcsharp_server_credentials_release; public readonly Delegates.grpcsharp_server_create_delegate grpcsharp_server_create; + public readonly Delegates.grpcsharp_server_register_completion_queue_delegate grpcsharp_server_register_completion_queue; public readonly Delegates.grpcsharp_server_add_insecure_http2_port_delegate grpcsharp_server_add_insecure_http2_port; public readonly Delegates.grpcsharp_server_add_secure_http2_port_delegate grpcsharp_server_add_secure_http2_port; public readonly Delegates.grpcsharp_server_start_delegate grpcsharp_server_start; @@ -244,6 +245,7 @@ namespace Grpc.Core.Internal this.grpcsharp_server_credentials_release = GetMethodDelegate(library); this.grpcsharp_server_create = GetMethodDelegate(library); + this.grpcsharp_server_register_completion_queue = GetMethodDelegate(library); this.grpcsharp_server_add_insecure_http2_port = GetMethodDelegate(library); this.grpcsharp_server_add_secure_http2_port = GetMethodDelegate(library); this.grpcsharp_server_start = GetMethodDelegate(library); @@ -493,7 +495,8 @@ namespace Grpc.Core.Internal public delegate ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create_delegate(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, bool forceClientAuth); public delegate void grpcsharp_server_credentials_release_delegate(IntPtr credentials); - public delegate ServerSafeHandle grpcsharp_server_create_delegate(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args); + public delegate ServerSafeHandle grpcsharp_server_create_delegate(ChannelArgsSafeHandle args); + public delegate void grpcsharp_server_register_completion_queue_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq); public delegate int grpcsharp_server_add_insecure_http2_port_delegate(ServerSafeHandle server, string addr); public delegate int grpcsharp_server_add_secure_http2_port_delegate(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds); public delegate void grpcsharp_server_start_delegate(ServerSafeHandle server); @@ -773,7 +776,10 @@ namespace Grpc.Core.Internal // ServerSafeHandle [DllImport("grpc_csharp_ext.dll")] - public static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args); + public static extern ServerSafeHandle grpcsharp_server_create(ChannelArgsSafeHandle args); + + [DllImport("grpc_csharp_ext.dll")] + public static extern void grpcsharp_server_register_completion_queue(ServerSafeHandle server, CompletionQueueSafeHandle cq); [DllImport("grpc_csharp_ext.dll")] public static extern int grpcsharp_server_add_insecure_http2_port(ServerSafeHandle server, string addr); diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index febebba209d..e4c9acc0999 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -44,7 +44,7 @@ namespace Grpc.Core.Internal { internal interface IServerCallHandler { - Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment); + Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment, CompletionQueueSafeHandle cq); } internal class UnaryServerCallHandler : IServerCallHandler @@ -62,14 +62,14 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, environment, newRpc.Server); - asyncCall.Initialize(newRpc.Call); + asyncCall.Initialize(newRpc.Call, cq); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream(asyncCall); var responseStream = new ServerResponseStream(asyncCall); @@ -121,14 +121,14 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, environment, newRpc.Server); - asyncCall.Initialize(newRpc.Call); + asyncCall.Initialize(newRpc.Call, cq); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream(asyncCall); var responseStream = new ServerResponseStream(asyncCall); @@ -179,14 +179,14 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, environment, newRpc.Server); - asyncCall.Initialize(newRpc.Call); + asyncCall.Initialize(newRpc.Call, cq); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream(asyncCall); var responseStream = new ServerResponseStream(asyncCall); @@ -237,14 +237,14 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, environment, newRpc.Server); - asyncCall.Initialize(newRpc.Call); + asyncCall.Initialize(newRpc.Call, cq); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream(asyncCall); var responseStream = new ServerResponseStream(asyncCall); @@ -281,13 +281,13 @@ namespace Grpc.Core.Internal { public static readonly NoSuchMethodCallHandler Instance = new NoSuchMethodCallHandler(); - public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment, CompletionQueueSafeHandle cq) { // We don't care about the payload type here. var asyncCall = new AsyncCallServer( (payload) => payload, (payload) => payload, environment, newRpc.Server); - asyncCall.Initialize(newRpc.Call); + asyncCall.Initialize(newRpc.Call, cq); var finishedTask = asyncCall.ServerSideCallAsync(); await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false); await finishedTask.ConfigureAwait(false); diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index 6b5f70e2207..191b412669e 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -31,12 +31,6 @@ #endregion -using System; -using System.Collections.Concurrent; -using System.Diagnostics; -using System.Runtime.InteropServices; -using Grpc.Core.Utils; - namespace Grpc.Core.Internal { /// @@ -50,12 +44,17 @@ namespace Grpc.Core.Internal { } - public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args) + public static ServerSafeHandle NewServer(ChannelArgsSafeHandle args) { // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle. // Doing so would make object finalizer crash if we end up abandoning the handle. GrpcEnvironment.GrpcNativeInit(); - return Native.grpcsharp_server_create(cq, args); + return Native.grpcsharp_server_create(args); + } + + public void RegisterCompletionQueue(CompletionQueueSafeHandle cq) + { + Native.grpcsharp_server_register_completion_queue(this, cq); } public int AddInsecurePort(string addr) @@ -77,14 +76,14 @@ namespace Grpc.Core.Internal { var ctx = BatchContextSafeHandle.Create(); environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); - Native.grpcsharp_server_shutdown_and_notify_callback(this, environment.CompletionQueue, ctx); + Native.grpcsharp_server_shutdown_and_notify_callback(this, environment.PickCompletionQueue(), ctx); } - public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment) + public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment, CompletionQueueSafeHandle completionQueue) { var ctx = BatchContextSafeHandle.Create(); environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); - Native.grpcsharp_server_request_call(this, environment.CompletionQueue, ctx).CheckOk(); + Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk(); } protected override bool ReleaseHandle() diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index d538a4671f6..6507f6e6722 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -34,8 +34,6 @@ using System; using System.Collections; using System.Collections.Generic; -using System.Diagnostics; -using System.Runtime.InteropServices; using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Logging; @@ -48,7 +46,7 @@ namespace Grpc.Core /// public class Server { - const int InitialAllowRpcTokenCount = 10; + const int InitialAllowRpcTokenCountPerCq = 10; static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); readonly AtomicCounter activeCallCounter = new AtomicCounter(); @@ -80,7 +78,12 @@ namespace Grpc.Core this.options = options != null ? new List(options) : new List(); using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options)) { - this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs); + this.handle = ServerSafeHandle.NewServer(channelArgs); + } + + foreach (var cq in environment.CompletionQueues) + { + this.handle.RegisterCompletionQueue(cq); } } @@ -133,9 +136,12 @@ namespace Grpc.Core // Starting with more than one AllowOneRpc tokens can significantly increase // unary RPC throughput. - for (int i = 0; i < InitialAllowRpcTokenCount; i++) + for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++) { - AllowOneRpc(); + foreach (var cq in environment.CompletionQueues) + { + AllowOneRpc(cq); + } } } } @@ -244,11 +250,11 @@ namespace Grpc.Core /// /// Allows one new RPC call to be received by server. /// - private void AllowOneRpc() + private void AllowOneRpc(CompletionQueueSafeHandle cq) { if (!shutdownRequested) { - handle.RequestCall(HandleNewServerRpc, environment); + handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), environment, cq); } } @@ -265,7 +271,7 @@ namespace Grpc.Core /// /// Selects corresponding handler for given call and handles the call. /// - private async Task HandleCallAsync(ServerRpcNew newRpc) + private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { try { @@ -274,7 +280,7 @@ namespace Grpc.Core { callHandler = NoSuchMethodCallHandler.Instance; } - await callHandler.HandleCall(newRpc, environment).ConfigureAwait(false); + await callHandler.HandleCall(newRpc, environment, cq).ConfigureAwait(false); } catch (Exception e) { @@ -285,9 +291,9 @@ namespace Grpc.Core /// /// Handles the native callback. /// - private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx) + private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq) { - Task.Run(() => AllowOneRpc()); + Task.Run(() => AllowOneRpc(cq)); if (success) { @@ -296,7 +302,7 @@ namespace Grpc.Core // after server shutdown, the callback returns with null call if (!newRpc.Call.IsInvalid) { - HandleCallAsync(newRpc); // we don't need to await. + HandleCallAsync(newRpc, cq); // we don't need to await. } } } diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 5b8ff9b819b..4beef9ded89 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -806,11 +806,14 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_set_credentials( /* Server */ GPR_EXPORT grpc_server *GPR_CALLTYPE -grpcsharp_server_create(grpc_completion_queue *cq, - const grpc_channel_args *args) { - grpc_server *server = grpc_server_create(args, NULL); +grpcsharp_server_create(const grpc_channel_args *args) { + return grpc_server_create(args, NULL); +} + +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq) { grpc_server_register_completion_queue(server, cq, NULL); - return server; } GPR_EXPORT int32_t GPR_CALLTYPE