diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 622813fb381..e75dc9faf1e 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -138,6 +138,7 @@ + diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index c28a6f64d39..26449ee5393 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -93,21 +93,6 @@ namespace Grpc.Core.Internal return data; } - // Gets data of server_rpc_new completion. - public ServerRpcNew GetServerRpcNew(Server server) - { - var call = Native.grpcsharp_batch_context_server_rpc_new_call(this); - - var method = Marshal.PtrToStringAnsi(Native.grpcsharp_batch_context_server_rpc_new_method(this)); - var host = Marshal.PtrToStringAnsi(Native.grpcsharp_batch_context_server_rpc_new_host(this)); - var deadline = Native.grpcsharp_batch_context_server_rpc_new_deadline(this); - - IntPtr metadataArrayPtr = Native.grpcsharp_batch_context_server_rpc_new_request_metadata(this); - var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr); - - return new ServerRpcNew(server, call, method, host, deadline, metadata); - } - // Gets data of receive_close_on_server completion. public bool GetReceivedCloseOnServerCancelled() { diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs index 628844f2422..7e2f0e9c6c9 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs @@ -44,6 +44,8 @@ namespace Grpc.Core.Internal internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx); + internal delegate void RequestCallCompletionDelegate(bool success, RequestCallContextSafeHandle ctx); + internal class CompletionRegistry { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); @@ -68,6 +70,12 @@ namespace Grpc.Core.Internal Register(ctx.Handle, opCallback); } + public void RegisterRequestCallCompletion(RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback) + { + OpCompletionDelegate opCallback = ((success) => HandleRequestCallCompletion(success, ctx, callback)); + Register(ctx.Handle, opCallback); + } + public OpCompletionDelegate Extract(IntPtr key) { OpCompletionDelegate value; @@ -84,7 +92,26 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Logger.Error(e, "Exception occured while invoking completion delegate."); + Logger.Error(e, "Exception occured while invoking batch completion delegate."); + } + finally + { + if (ctx != null) + { + ctx.Dispose(); + } + } + } + + private static void HandleRequestCallCompletion(bool success, RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback) + { + try + { + callback(success, ctx); + } + catch (Exception e) + { + Logger.Error(e, "Exception occured while invoking request call completion delegate."); } finally { diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index f457c9dbf1e..40ba7e30cb0 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -64,14 +64,17 @@ namespace Grpc.Core.Internal public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_status_delegate grpcsharp_batch_context_recv_status_on_client_status; public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate grpcsharp_batch_context_recv_status_on_client_details; public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate grpcsharp_batch_context_recv_status_on_client_trailing_metadata; - public readonly Delegates.grpcsharp_batch_context_server_rpc_new_call_delegate grpcsharp_batch_context_server_rpc_new_call; - public readonly Delegates.grpcsharp_batch_context_server_rpc_new_method_delegate grpcsharp_batch_context_server_rpc_new_method; - public readonly Delegates.grpcsharp_batch_context_server_rpc_new_host_delegate grpcsharp_batch_context_server_rpc_new_host; - public readonly Delegates.grpcsharp_batch_context_server_rpc_new_deadline_delegate grpcsharp_batch_context_server_rpc_new_deadline; - public readonly Delegates.grpcsharp_batch_context_server_rpc_new_request_metadata_delegate grpcsharp_batch_context_server_rpc_new_request_metadata; public readonly Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate grpcsharp_batch_context_recv_close_on_server_cancelled; public readonly Delegates.grpcsharp_batch_context_destroy_delegate grpcsharp_batch_context_destroy; + public readonly Delegates.grpcsharp_request_call_context_create_delegate grpcsharp_request_call_context_create; + public readonly Delegates.grpcsharp_request_call_context_call_delegate grpcsharp_request_call_context_call; + public readonly Delegates.grpcsharp_request_call_context_method_delegate grpcsharp_request_call_context_method; + public readonly Delegates.grpcsharp_request_call_context_host_delegate grpcsharp_request_call_context_host; + public readonly Delegates.grpcsharp_request_call_context_deadline_delegate grpcsharp_request_call_context_deadline; + public readonly Delegates.grpcsharp_request_call_context_request_metadata_delegate grpcsharp_request_call_context_request_metadata; + public readonly Delegates.grpcsharp_request_call_context_destroy_delegate grpcsharp_request_call_context_destroy; + public readonly Delegates.grpcsharp_composite_call_credentials_create_delegate grpcsharp_composite_call_credentials_create; public readonly Delegates.grpcsharp_call_credentials_release_delegate grpcsharp_call_credentials_release; @@ -170,14 +173,17 @@ namespace Grpc.Core.Internal this.grpcsharp_batch_context_recv_status_on_client_status = GetMethodDelegate(library); this.grpcsharp_batch_context_recv_status_on_client_details = GetMethodDelegate(library); this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = GetMethodDelegate(library); - this.grpcsharp_batch_context_server_rpc_new_call = GetMethodDelegate(library); - this.grpcsharp_batch_context_server_rpc_new_method = GetMethodDelegate(library); - this.grpcsharp_batch_context_server_rpc_new_host = GetMethodDelegate(library); - this.grpcsharp_batch_context_server_rpc_new_deadline = GetMethodDelegate(library); - this.grpcsharp_batch_context_server_rpc_new_request_metadata = GetMethodDelegate(library); this.grpcsharp_batch_context_recv_close_on_server_cancelled = GetMethodDelegate(library); this.grpcsharp_batch_context_destroy = GetMethodDelegate(library); + this.grpcsharp_request_call_context_create = GetMethodDelegate(library); + this.grpcsharp_request_call_context_call = GetMethodDelegate(library); + this.grpcsharp_request_call_context_method = GetMethodDelegate(library); + this.grpcsharp_request_call_context_host = GetMethodDelegate(library); + this.grpcsharp_request_call_context_deadline = GetMethodDelegate(library); + this.grpcsharp_request_call_context_request_metadata = GetMethodDelegate(library); + this.grpcsharp_request_call_context_destroy = GetMethodDelegate(library); + this.grpcsharp_composite_call_credentials_create = GetMethodDelegate(library); this.grpcsharp_call_credentials_release = GetMethodDelegate(library); @@ -302,14 +308,17 @@ namespace Grpc.Core.Internal public delegate StatusCode grpcsharp_batch_context_recv_status_on_client_status_delegate(BatchContextSafeHandle ctx); public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_details_delegate(BatchContextSafeHandle ctx); // returns const char* public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate(BatchContextSafeHandle ctx); - public delegate CallSafeHandle grpcsharp_batch_context_server_rpc_new_call_delegate(BatchContextSafeHandle ctx); - public delegate IntPtr grpcsharp_batch_context_server_rpc_new_method_delegate(BatchContextSafeHandle ctx); // returns const char* - public delegate IntPtr grpcsharp_batch_context_server_rpc_new_host_delegate(BatchContextSafeHandle ctx); // returns const char* - public delegate Timespec grpcsharp_batch_context_server_rpc_new_deadline_delegate(BatchContextSafeHandle ctx); - public delegate IntPtr grpcsharp_batch_context_server_rpc_new_request_metadata_delegate(BatchContextSafeHandle ctx); public delegate int grpcsharp_batch_context_recv_close_on_server_cancelled_delegate(BatchContextSafeHandle ctx); public delegate void grpcsharp_batch_context_destroy_delegate(IntPtr ctx); + public delegate RequestCallContextSafeHandle grpcsharp_request_call_context_create_delegate(); + public delegate CallSafeHandle grpcsharp_request_call_context_call_delegate(RequestCallContextSafeHandle ctx); + public delegate IntPtr grpcsharp_request_call_context_method_delegate(RequestCallContextSafeHandle ctx); // returns const char* + public delegate IntPtr grpcsharp_request_call_context_host_delegate(RequestCallContextSafeHandle ctx); // returns const char* + public delegate Timespec grpcsharp_request_call_context_deadline_delegate(RequestCallContextSafeHandle ctx); + public delegate IntPtr grpcsharp_request_call_context_request_metadata_delegate(RequestCallContextSafeHandle ctx); + public delegate void grpcsharp_request_call_context_destroy_delegate(IntPtr ctx); + public delegate CallCredentialsSafeHandle grpcsharp_composite_call_credentials_create_delegate(CallCredentialsSafeHandle creds1, CallCredentialsSafeHandle creds2); public delegate void grpcsharp_call_credentials_release_delegate(IntPtr credentials); @@ -393,7 +402,7 @@ namespace Grpc.Core.Internal public delegate int grpcsharp_server_add_insecure_http2_port_delegate(ServerSafeHandle server, string addr); public delegate int grpcsharp_server_add_secure_http2_port_delegate(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds); public delegate void grpcsharp_server_start_delegate(ServerSafeHandle server); - public delegate CallError grpcsharp_server_request_call_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx); + public delegate CallError grpcsharp_server_request_call_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, RequestCallContextSafeHandle ctx); public delegate void grpcsharp_server_cancel_all_calls_delegate(ServerSafeHandle server); public delegate void grpcsharp_server_shutdown_and_notify_callback_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx); public delegate void grpcsharp_server_destroy_delegate(IntPtr server); diff --git a/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs new file mode 100644 index 00000000000..ea7819d7b1f --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs @@ -0,0 +1,85 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using Grpc.Core; + +namespace Grpc.Core.Internal +{ + /// + /// grpcsharp_request_call_context + /// + internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid + { + static readonly NativeMethods Native = NativeMethods.Get(); + + private RequestCallContextSafeHandle() + { + } + + public static RequestCallContextSafeHandle Create() + { + return Native.grpcsharp_request_call_context_create(); + } + + public IntPtr Handle + { + get + { + return handle; + } + } + + // Gets data of server_rpc_new completion. + public ServerRpcNew GetServerRpcNew(Server server) + { + var call = Native.grpcsharp_request_call_context_call(this); + + var method = Marshal.PtrToStringAnsi(Native.grpcsharp_request_call_context_method(this)); + var host = Marshal.PtrToStringAnsi(Native.grpcsharp_request_call_context_host(this)); + var deadline = Native.grpcsharp_request_call_context_deadline(this); + + IntPtr metadataArrayPtr = Native.grpcsharp_request_call_context_request_metadata(this); + var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr); + + return new ServerRpcNew(server, call, method, host, deadline, metadata); + } + + protected override bool ReleaseHandle() + { + Native.grpcsharp_request_call_context_destroy(handle); + return true; + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index 014a8db78f2..7d7b8386162 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -85,12 +85,12 @@ namespace Grpc.Core.Internal } } - public void RequestCall(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue) + public void RequestCall(RequestCallCompletionDelegate callback, CompletionQueueSafeHandle completionQueue) { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + var ctx = RequestCallContextSafeHandle.Create(); + completionQueue.CompletionRegistry.RegisterRequestCallCompletion(ctx, callback); Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk(); } } diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index dd4a405ed92..961b911fbb1 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -47,7 +47,7 @@ namespace Grpc.Core /// public class Server { - const int InitialAllowRpcTokenCountPerCq = 10; + const int DefaultRequestCallTokensPerCq = 2000; static readonly ILogger Logger = GrpcEnvironment.Logger.ForType(); readonly AtomicCounter activeCallCounter = new AtomicCounter(); @@ -66,7 +66,7 @@ namespace Grpc.Core bool startRequested; volatile bool shutdownRequested; - + int requestCallTokensPerCq = DefaultRequestCallTokensPerCq; /// /// Creates a new server. @@ -132,6 +132,27 @@ namespace Grpc.Core } } + /// + /// Experimental API. Might anytime change without prior notice. + /// Number or calls requested via grpc_server_request_call at any given time for each completion queue. + /// + public int RequestCallTokensPerCompletionQueue + { + get + { + return requestCallTokensPerCq; + } + set + { + lock (myLock) + { + GrpcPreconditions.CheckState(!startRequested); + GrpcPreconditions.CheckArgument(value > 0); + requestCallTokensPerCq = value; + } + } + } + /// /// Starts the server. /// @@ -145,9 +166,7 @@ namespace Grpc.Core handle.Start(); - // Starting with more than one AllowOneRpc tokens can significantly increase - // unary RPC throughput. - for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++) + for (int i = 0; i < requestCallTokensPerCq; i++) { foreach (var cq in environment.CompletionQueues) { @@ -310,7 +329,7 @@ namespace Grpc.Core /// /// Selects corresponding handler for given call and handles the call. /// - private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) + private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq, Action continuation) { try { @@ -325,25 +344,41 @@ namespace Grpc.Core { Logger.Warning(e, "Exception while handling RPC."); } + + if (continuation != null) + { + continuation(); + } } /// /// Handles the native callback. /// - private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq) + private void HandleNewServerRpc(bool success, RequestCallContextSafeHandle ctx, CompletionQueueSafeHandle cq) { - Task.Run(() => AllowOneRpc(cq)); - + bool nextRpcRequested = false; if (success) { - ServerRpcNew newRpc = ctx.GetServerRpcNew(this); + var newRpc = ctx.GetServerRpcNew(this); // after server shutdown, the callback returns with null call if (!newRpc.Call.IsInvalid) { - HandleCallAsync(newRpc, cq); // we don't need to await. + nextRpcRequested = true; + + // Start asynchronous handler for the call. + // Don't await, the continuations will run on gRPC thread pool once triggered + // by cq.Next(). + #pragma warning disable 4014 + HandleCallAsync(newRpc, cq, () => AllowOneRpc(cq)); + #pragma warning restore 4014 } } + + if (!nextRpcRequested) + { + AllowOneRpc(cq); + } } /// diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 068bf709b81..9a5d7869d34 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -84,11 +84,6 @@ typedef struct grpcsharp_batch_context { size_t status_details_capacity; } recv_status_on_client; int recv_close_on_server_cancelled; - struct { - grpc_call *call; - grpc_call_details call_details; - grpc_metadata_array request_metadata; - } server_rpc_new; } grpcsharp_batch_context; GPR_EXPORT grpcsharp_batch_context *GPR_CALLTYPE grpcsharp_batch_context_create() { @@ -97,6 +92,18 @@ GPR_EXPORT grpcsharp_batch_context *GPR_CALLTYPE grpcsharp_batch_context_create( return ctx; } +typedef struct { + grpc_call *call; + grpc_call_details call_details; + grpc_metadata_array request_metadata; +} grpcsharp_request_call_context; + +GPR_EXPORT grpcsharp_request_call_context *GPR_CALLTYPE grpcsharp_request_call_context_create() { + grpcsharp_request_call_context *ctx = gpr_malloc(sizeof(grpcsharp_request_call_context)); + memset(ctx, 0, sizeof(grpcsharp_request_call_context)); + return ctx; +} + /* * Destroys array->metadata. * The array pointer itself is not freed. @@ -230,13 +237,20 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_destroy(grpcsharp_batch_con &(ctx->recv_status_on_client.trailing_metadata)); gpr_free((void *)ctx->recv_status_on_client.status_details); + gpr_free(ctx); +} + +GPR_EXPORT void GPR_CALLTYPE grpcsharp_request_call_context_destroy(grpcsharp_request_call_context *ctx) { + if (!ctx) { + return; + } /* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is supposed to take its ownership. */ - grpc_call_details_destroy(&(ctx->server_rpc_new.call_details)); + grpc_call_details_destroy(&(ctx->call_details)); grpcsharp_metadata_array_destroy_metadata_only( - &(ctx->server_rpc_new.request_metadata)); + &(ctx->request_metadata)); gpr_free(ctx); } @@ -303,32 +317,32 @@ grpcsharp_batch_context_recv_status_on_client_trailing_metadata( return &(ctx->recv_status_on_client.trailing_metadata); } -GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_batch_context_server_rpc_new_call( - const grpcsharp_batch_context *ctx) { - return ctx->server_rpc_new.call; +GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_request_call_context_call( + const grpcsharp_request_call_context *ctx) { + return ctx->call; } GPR_EXPORT const char *GPR_CALLTYPE -grpcsharp_batch_context_server_rpc_new_method( - const grpcsharp_batch_context *ctx) { - return ctx->server_rpc_new.call_details.method; +grpcsharp_request_call_context_method( + const grpcsharp_request_call_context *ctx) { + return ctx->call_details.method; } -GPR_EXPORT const char *GPR_CALLTYPE grpcsharp_batch_context_server_rpc_new_host( - const grpcsharp_batch_context *ctx) { - return ctx->server_rpc_new.call_details.host; +GPR_EXPORT const char *GPR_CALLTYPE grpcsharp_request_call_context_host( + const grpcsharp_request_call_context *ctx) { + return ctx->call_details.host; } GPR_EXPORT gpr_timespec GPR_CALLTYPE -grpcsharp_batch_context_server_rpc_new_deadline( - const grpcsharp_batch_context *ctx) { - return ctx->server_rpc_new.call_details.deadline; +grpcsharp_request_call_context_deadline( + const grpcsharp_request_call_context *ctx) { + return ctx->call_details.deadline; } GPR_EXPORT const grpc_metadata_array *GPR_CALLTYPE -grpcsharp_batch_context_server_rpc_new_request_metadata( - const grpcsharp_batch_context *ctx) { - return &(ctx->server_rpc_new.request_metadata); +grpcsharp_request_call_context_request_metadata( + const grpcsharp_request_call_context *ctx) { + return &(ctx->request_metadata); } GPR_EXPORT int32_t GPR_CALLTYPE @@ -853,10 +867,10 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) { GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq, - grpcsharp_batch_context *ctx) { + grpcsharp_request_call_context *ctx) { return grpc_server_request_call( - server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details), - &(ctx->server_rpc_new.request_metadata), cq, cq, ctx); + server, &(ctx->call), &(ctx->call_details), + &(ctx->request_metadata), cq, cq, ctx); } /* Security */