Merge pull request #8396 from jtattermusch/csharp_server_streamline_request_call_invocations

C# server streamline request call invocations
pull/8476/head
Jan Tattermusch 8 years ago committed by GitHub
commit 1445c47901
  1. 1
      src/csharp/Grpc.Core/Grpc.Core.csproj
  2. 15
      src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
  3. 29
      src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
  4. 41
      src/csharp/Grpc.Core/Internal/NativeMethods.cs
  5. 85
      src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs
  6. 6
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  7. 57
      src/csharp/Grpc.Core/Server.cs
  8. 64
      src/csharp/ext/grpc_csharp_ext.c

@ -138,6 +138,7 @@
<Compile Include="Internal\CallError.cs" /> <Compile Include="Internal\CallError.cs" />
<Compile Include="Logging\LogLevel.cs" /> <Compile Include="Logging\LogLevel.cs" />
<Compile Include="Logging\LogLevelFilterLogger.cs" /> <Compile Include="Logging\LogLevelFilterLogger.cs" />
<Compile Include="Internal\RequestCallContextSafeHandle.cs" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<None Include="Grpc.Core.nuspec" /> <None Include="Grpc.Core.nuspec" />

@ -93,21 +93,6 @@ namespace Grpc.Core.Internal
return data; return data;
} }
// Gets data of server_rpc_new completion.
public ServerRpcNew GetServerRpcNew(Server server)
{
var call = Native.grpcsharp_batch_context_server_rpc_new_call(this);
var method = Marshal.PtrToStringAnsi(Native.grpcsharp_batch_context_server_rpc_new_method(this));
var host = Marshal.PtrToStringAnsi(Native.grpcsharp_batch_context_server_rpc_new_host(this));
var deadline = Native.grpcsharp_batch_context_server_rpc_new_deadline(this);
IntPtr metadataArrayPtr = Native.grpcsharp_batch_context_server_rpc_new_request_metadata(this);
var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr);
return new ServerRpcNew(server, call, method, host, deadline, metadata);
}
// Gets data of receive_close_on_server completion. // Gets data of receive_close_on_server completion.
public bool GetReceivedCloseOnServerCancelled() public bool GetReceivedCloseOnServerCancelled()
{ {

@ -44,6 +44,8 @@ namespace Grpc.Core.Internal
internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx); internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx);
internal delegate void RequestCallCompletionDelegate(bool success, RequestCallContextSafeHandle ctx);
internal class CompletionRegistry internal class CompletionRegistry
{ {
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<CompletionRegistry>(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<CompletionRegistry>();
@ -68,6 +70,12 @@ namespace Grpc.Core.Internal
Register(ctx.Handle, opCallback); Register(ctx.Handle, opCallback);
} }
public void RegisterRequestCallCompletion(RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback)
{
OpCompletionDelegate opCallback = ((success) => HandleRequestCallCompletion(success, ctx, callback));
Register(ctx.Handle, opCallback);
}
public OpCompletionDelegate Extract(IntPtr key) public OpCompletionDelegate Extract(IntPtr key)
{ {
OpCompletionDelegate value; OpCompletionDelegate value;
@ -84,7 +92,26 @@ namespace Grpc.Core.Internal
} }
catch (Exception e) catch (Exception e)
{ {
Logger.Error(e, "Exception occured while invoking completion delegate."); Logger.Error(e, "Exception occured while invoking batch completion delegate.");
}
finally
{
if (ctx != null)
{
ctx.Dispose();
}
}
}
private static void HandleRequestCallCompletion(bool success, RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback)
{
try
{
callback(success, ctx);
}
catch (Exception e)
{
Logger.Error(e, "Exception occured while invoking request call completion delegate.");
} }
finally finally
{ {

@ -64,14 +64,17 @@ namespace Grpc.Core.Internal
public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_status_delegate grpcsharp_batch_context_recv_status_on_client_status; public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_status_delegate grpcsharp_batch_context_recv_status_on_client_status;
public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate grpcsharp_batch_context_recv_status_on_client_details; public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate grpcsharp_batch_context_recv_status_on_client_details;
public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate grpcsharp_batch_context_recv_status_on_client_trailing_metadata; public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate grpcsharp_batch_context_recv_status_on_client_trailing_metadata;
public readonly Delegates.grpcsharp_batch_context_server_rpc_new_call_delegate grpcsharp_batch_context_server_rpc_new_call;
public readonly Delegates.grpcsharp_batch_context_server_rpc_new_method_delegate grpcsharp_batch_context_server_rpc_new_method;
public readonly Delegates.grpcsharp_batch_context_server_rpc_new_host_delegate grpcsharp_batch_context_server_rpc_new_host;
public readonly Delegates.grpcsharp_batch_context_server_rpc_new_deadline_delegate grpcsharp_batch_context_server_rpc_new_deadline;
public readonly Delegates.grpcsharp_batch_context_server_rpc_new_request_metadata_delegate grpcsharp_batch_context_server_rpc_new_request_metadata;
public readonly Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate grpcsharp_batch_context_recv_close_on_server_cancelled; public readonly Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate grpcsharp_batch_context_recv_close_on_server_cancelled;
public readonly Delegates.grpcsharp_batch_context_destroy_delegate grpcsharp_batch_context_destroy; public readonly Delegates.grpcsharp_batch_context_destroy_delegate grpcsharp_batch_context_destroy;
public readonly Delegates.grpcsharp_request_call_context_create_delegate grpcsharp_request_call_context_create;
public readonly Delegates.grpcsharp_request_call_context_call_delegate grpcsharp_request_call_context_call;
public readonly Delegates.grpcsharp_request_call_context_method_delegate grpcsharp_request_call_context_method;
public readonly Delegates.grpcsharp_request_call_context_host_delegate grpcsharp_request_call_context_host;
public readonly Delegates.grpcsharp_request_call_context_deadline_delegate grpcsharp_request_call_context_deadline;
public readonly Delegates.grpcsharp_request_call_context_request_metadata_delegate grpcsharp_request_call_context_request_metadata;
public readonly Delegates.grpcsharp_request_call_context_destroy_delegate grpcsharp_request_call_context_destroy;
public readonly Delegates.grpcsharp_composite_call_credentials_create_delegate grpcsharp_composite_call_credentials_create; public readonly Delegates.grpcsharp_composite_call_credentials_create_delegate grpcsharp_composite_call_credentials_create;
public readonly Delegates.grpcsharp_call_credentials_release_delegate grpcsharp_call_credentials_release; public readonly Delegates.grpcsharp_call_credentials_release_delegate grpcsharp_call_credentials_release;
@ -170,14 +173,17 @@ namespace Grpc.Core.Internal
this.grpcsharp_batch_context_recv_status_on_client_status = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_status_delegate>(library); this.grpcsharp_batch_context_recv_status_on_client_status = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_status_delegate>(library);
this.grpcsharp_batch_context_recv_status_on_client_details = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate>(library); this.grpcsharp_batch_context_recv_status_on_client_details = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate>(library);
this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate>(library); this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate>(library);
this.grpcsharp_batch_context_server_rpc_new_call = GetMethodDelegate<Delegates.grpcsharp_batch_context_server_rpc_new_call_delegate>(library);
this.grpcsharp_batch_context_server_rpc_new_method = GetMethodDelegate<Delegates.grpcsharp_batch_context_server_rpc_new_method_delegate>(library);
this.grpcsharp_batch_context_server_rpc_new_host = GetMethodDelegate<Delegates.grpcsharp_batch_context_server_rpc_new_host_delegate>(library);
this.grpcsharp_batch_context_server_rpc_new_deadline = GetMethodDelegate<Delegates.grpcsharp_batch_context_server_rpc_new_deadline_delegate>(library);
this.grpcsharp_batch_context_server_rpc_new_request_metadata = GetMethodDelegate<Delegates.grpcsharp_batch_context_server_rpc_new_request_metadata_delegate>(library);
this.grpcsharp_batch_context_recv_close_on_server_cancelled = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate>(library); this.grpcsharp_batch_context_recv_close_on_server_cancelled = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate>(library);
this.grpcsharp_batch_context_destroy = GetMethodDelegate<Delegates.grpcsharp_batch_context_destroy_delegate>(library); this.grpcsharp_batch_context_destroy = GetMethodDelegate<Delegates.grpcsharp_batch_context_destroy_delegate>(library);
this.grpcsharp_request_call_context_create = GetMethodDelegate<Delegates.grpcsharp_request_call_context_create_delegate>(library);
this.grpcsharp_request_call_context_call = GetMethodDelegate<Delegates.grpcsharp_request_call_context_call_delegate>(library);
this.grpcsharp_request_call_context_method = GetMethodDelegate<Delegates.grpcsharp_request_call_context_method_delegate>(library);
this.grpcsharp_request_call_context_host = GetMethodDelegate<Delegates.grpcsharp_request_call_context_host_delegate>(library);
this.grpcsharp_request_call_context_deadline = GetMethodDelegate<Delegates.grpcsharp_request_call_context_deadline_delegate>(library);
this.grpcsharp_request_call_context_request_metadata = GetMethodDelegate<Delegates.grpcsharp_request_call_context_request_metadata_delegate>(library);
this.grpcsharp_request_call_context_destroy = GetMethodDelegate<Delegates.grpcsharp_request_call_context_destroy_delegate>(library);
this.grpcsharp_composite_call_credentials_create = GetMethodDelegate<Delegates.grpcsharp_composite_call_credentials_create_delegate>(library); this.grpcsharp_composite_call_credentials_create = GetMethodDelegate<Delegates.grpcsharp_composite_call_credentials_create_delegate>(library);
this.grpcsharp_call_credentials_release = GetMethodDelegate<Delegates.grpcsharp_call_credentials_release_delegate>(library); this.grpcsharp_call_credentials_release = GetMethodDelegate<Delegates.grpcsharp_call_credentials_release_delegate>(library);
@ -302,14 +308,17 @@ namespace Grpc.Core.Internal
public delegate StatusCode grpcsharp_batch_context_recv_status_on_client_status_delegate(BatchContextSafeHandle ctx); public delegate StatusCode grpcsharp_batch_context_recv_status_on_client_status_delegate(BatchContextSafeHandle ctx);
public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_details_delegate(BatchContextSafeHandle ctx); // returns const char* public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_details_delegate(BatchContextSafeHandle ctx); // returns const char*
public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate(BatchContextSafeHandle ctx); public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate(BatchContextSafeHandle ctx);
public delegate CallSafeHandle grpcsharp_batch_context_server_rpc_new_call_delegate(BatchContextSafeHandle ctx);
public delegate IntPtr grpcsharp_batch_context_server_rpc_new_method_delegate(BatchContextSafeHandle ctx); // returns const char*
public delegate IntPtr grpcsharp_batch_context_server_rpc_new_host_delegate(BatchContextSafeHandle ctx); // returns const char*
public delegate Timespec grpcsharp_batch_context_server_rpc_new_deadline_delegate(BatchContextSafeHandle ctx);
public delegate IntPtr grpcsharp_batch_context_server_rpc_new_request_metadata_delegate(BatchContextSafeHandle ctx);
public delegate int grpcsharp_batch_context_recv_close_on_server_cancelled_delegate(BatchContextSafeHandle ctx); public delegate int grpcsharp_batch_context_recv_close_on_server_cancelled_delegate(BatchContextSafeHandle ctx);
public delegate void grpcsharp_batch_context_destroy_delegate(IntPtr ctx); public delegate void grpcsharp_batch_context_destroy_delegate(IntPtr ctx);
public delegate RequestCallContextSafeHandle grpcsharp_request_call_context_create_delegate();
public delegate CallSafeHandle grpcsharp_request_call_context_call_delegate(RequestCallContextSafeHandle ctx);
public delegate IntPtr grpcsharp_request_call_context_method_delegate(RequestCallContextSafeHandle ctx); // returns const char*
public delegate IntPtr grpcsharp_request_call_context_host_delegate(RequestCallContextSafeHandle ctx); // returns const char*
public delegate Timespec grpcsharp_request_call_context_deadline_delegate(RequestCallContextSafeHandle ctx);
public delegate IntPtr grpcsharp_request_call_context_request_metadata_delegate(RequestCallContextSafeHandle ctx);
public delegate void grpcsharp_request_call_context_destroy_delegate(IntPtr ctx);
public delegate CallCredentialsSafeHandle grpcsharp_composite_call_credentials_create_delegate(CallCredentialsSafeHandle creds1, CallCredentialsSafeHandle creds2); public delegate CallCredentialsSafeHandle grpcsharp_composite_call_credentials_create_delegate(CallCredentialsSafeHandle creds1, CallCredentialsSafeHandle creds2);
public delegate void grpcsharp_call_credentials_release_delegate(IntPtr credentials); public delegate void grpcsharp_call_credentials_release_delegate(IntPtr credentials);
@ -393,7 +402,7 @@ namespace Grpc.Core.Internal
public delegate int grpcsharp_server_add_insecure_http2_port_delegate(ServerSafeHandle server, string addr); public delegate int grpcsharp_server_add_insecure_http2_port_delegate(ServerSafeHandle server, string addr);
public delegate int grpcsharp_server_add_secure_http2_port_delegate(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds); public delegate int grpcsharp_server_add_secure_http2_port_delegate(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds);
public delegate void grpcsharp_server_start_delegate(ServerSafeHandle server); public delegate void grpcsharp_server_start_delegate(ServerSafeHandle server);
public delegate CallError grpcsharp_server_request_call_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx); public delegate CallError grpcsharp_server_request_call_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, RequestCallContextSafeHandle ctx);
public delegate void grpcsharp_server_cancel_all_calls_delegate(ServerSafeHandle server); public delegate void grpcsharp_server_cancel_all_calls_delegate(ServerSafeHandle server);
public delegate void grpcsharp_server_shutdown_and_notify_callback_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx); public delegate void grpcsharp_server_shutdown_and_notify_callback_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
public delegate void grpcsharp_server_destroy_delegate(IntPtr server); public delegate void grpcsharp_server_destroy_delegate(IntPtr server);

@ -0,0 +1,85 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using Grpc.Core;
namespace Grpc.Core.Internal
{
/// <summary>
/// grpcsharp_request_call_context
/// </summary>
internal class RequestCallContextSafeHandle : SafeHandleZeroIsInvalid
{
static readonly NativeMethods Native = NativeMethods.Get();
private RequestCallContextSafeHandle()
{
}
public static RequestCallContextSafeHandle Create()
{
return Native.grpcsharp_request_call_context_create();
}
public IntPtr Handle
{
get
{
return handle;
}
}
// Gets data of server_rpc_new completion.
public ServerRpcNew GetServerRpcNew(Server server)
{
var call = Native.grpcsharp_request_call_context_call(this);
var method = Marshal.PtrToStringAnsi(Native.grpcsharp_request_call_context_method(this));
var host = Marshal.PtrToStringAnsi(Native.grpcsharp_request_call_context_host(this));
var deadline = Native.grpcsharp_request_call_context_deadline(this);
IntPtr metadataArrayPtr = Native.grpcsharp_request_call_context_request_metadata(this);
var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr);
return new ServerRpcNew(server, call, method, host, deadline, metadata);
}
protected override bool ReleaseHandle()
{
Native.grpcsharp_request_call_context_destroy(handle);
return true;
}
}
}

@ -85,12 +85,12 @@ namespace Grpc.Core.Internal
} }
} }
public void RequestCall(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue) public void RequestCall(RequestCallCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{ {
using (completionQueue.NewScope()) using (completionQueue.NewScope())
{ {
var ctx = BatchContextSafeHandle.Create(); var ctx = RequestCallContextSafeHandle.Create();
completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback); completionQueue.CompletionRegistry.RegisterRequestCallCompletion(ctx, callback);
Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk(); Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk();
} }
} }

@ -47,7 +47,7 @@ namespace Grpc.Core
/// </summary> /// </summary>
public class Server public class Server
{ {
const int InitialAllowRpcTokenCountPerCq = 10; const int DefaultRequestCallTokensPerCq = 2000;
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
readonly AtomicCounter activeCallCounter = new AtomicCounter(); readonly AtomicCounter activeCallCounter = new AtomicCounter();
@ -66,7 +66,7 @@ namespace Grpc.Core
bool startRequested; bool startRequested;
volatile bool shutdownRequested; volatile bool shutdownRequested;
int requestCallTokensPerCq = DefaultRequestCallTokensPerCq;
/// <summary> /// <summary>
/// Creates a new server. /// Creates a new server.
@ -132,6 +132,27 @@ namespace Grpc.Core
} }
} }
/// <summary>
/// Experimental API. Might anytime change without prior notice.
/// Number or calls requested via grpc_server_request_call at any given time for each completion queue.
/// </summary>
public int RequestCallTokensPerCompletionQueue
{
get
{
return requestCallTokensPerCq;
}
set
{
lock (myLock)
{
GrpcPreconditions.CheckState(!startRequested);
GrpcPreconditions.CheckArgument(value > 0);
requestCallTokensPerCq = value;
}
}
}
/// <summary> /// <summary>
/// Starts the server. /// Starts the server.
/// </summary> /// </summary>
@ -145,9 +166,7 @@ namespace Grpc.Core
handle.Start(); handle.Start();
// Starting with more than one AllowOneRpc tokens can significantly increase for (int i = 0; i < requestCallTokensPerCq; i++)
// unary RPC throughput.
for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++)
{ {
foreach (var cq in environment.CompletionQueues) foreach (var cq in environment.CompletionQueues)
{ {
@ -310,7 +329,7 @@ namespace Grpc.Core
/// <summary> /// <summary>
/// Selects corresponding handler for given call and handles the call. /// Selects corresponding handler for given call and handles the call.
/// </summary> /// </summary>
private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq, Action continuation)
{ {
try try
{ {
@ -325,24 +344,40 @@ namespace Grpc.Core
{ {
Logger.Warning(e, "Exception while handling RPC."); Logger.Warning(e, "Exception while handling RPC.");
} }
if (continuation != null)
{
continuation();
}
} }
/// <summary> /// <summary>
/// Handles the native callback. /// Handles the native callback.
/// </summary> /// </summary>
private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq) private void HandleNewServerRpc(bool success, RequestCallContextSafeHandle ctx, CompletionQueueSafeHandle cq)
{ {
Task.Run(() => AllowOneRpc(cq)); bool nextRpcRequested = false;
if (success) if (success)
{ {
ServerRpcNew newRpc = ctx.GetServerRpcNew(this); var newRpc = ctx.GetServerRpcNew(this);
// after server shutdown, the callback returns with null call // after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid) if (!newRpc.Call.IsInvalid)
{ {
HandleCallAsync(newRpc, cq); // we don't need to await. nextRpcRequested = true;
// Start asynchronous handler for the call.
// Don't await, the continuations will run on gRPC thread pool once triggered
// by cq.Next().
#pragma warning disable 4014
HandleCallAsync(newRpc, cq, () => AllowOneRpc(cq));
#pragma warning restore 4014
}
} }
if (!nextRpcRequested)
{
AllowOneRpc(cq);
} }
} }

@ -84,11 +84,6 @@ typedef struct grpcsharp_batch_context {
size_t status_details_capacity; size_t status_details_capacity;
} recv_status_on_client; } recv_status_on_client;
int recv_close_on_server_cancelled; int recv_close_on_server_cancelled;
struct {
grpc_call *call;
grpc_call_details call_details;
grpc_metadata_array request_metadata;
} server_rpc_new;
} grpcsharp_batch_context; } grpcsharp_batch_context;
GPR_EXPORT grpcsharp_batch_context *GPR_CALLTYPE grpcsharp_batch_context_create() { GPR_EXPORT grpcsharp_batch_context *GPR_CALLTYPE grpcsharp_batch_context_create() {
@ -97,6 +92,18 @@ GPR_EXPORT grpcsharp_batch_context *GPR_CALLTYPE grpcsharp_batch_context_create(
return ctx; return ctx;
} }
typedef struct {
grpc_call *call;
grpc_call_details call_details;
grpc_metadata_array request_metadata;
} grpcsharp_request_call_context;
GPR_EXPORT grpcsharp_request_call_context *GPR_CALLTYPE grpcsharp_request_call_context_create() {
grpcsharp_request_call_context *ctx = gpr_malloc(sizeof(grpcsharp_request_call_context));
memset(ctx, 0, sizeof(grpcsharp_request_call_context));
return ctx;
}
/* /*
* Destroys array->metadata. * Destroys array->metadata.
* The array pointer itself is not freed. * The array pointer itself is not freed.
@ -230,13 +237,20 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_destroy(grpcsharp_batch_con
&(ctx->recv_status_on_client.trailing_metadata)); &(ctx->recv_status_on_client.trailing_metadata));
gpr_free((void *)ctx->recv_status_on_client.status_details); gpr_free((void *)ctx->recv_status_on_client.status_details);
gpr_free(ctx);
}
GPR_EXPORT void GPR_CALLTYPE grpcsharp_request_call_context_destroy(grpcsharp_request_call_context *ctx) {
if (!ctx) {
return;
}
/* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is /* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is
supposed supposed
to take its ownership. */ to take its ownership. */
grpc_call_details_destroy(&(ctx->server_rpc_new.call_details)); grpc_call_details_destroy(&(ctx->call_details));
grpcsharp_metadata_array_destroy_metadata_only( grpcsharp_metadata_array_destroy_metadata_only(
&(ctx->server_rpc_new.request_metadata)); &(ctx->request_metadata));
gpr_free(ctx); gpr_free(ctx);
} }
@ -303,32 +317,32 @@ grpcsharp_batch_context_recv_status_on_client_trailing_metadata(
return &(ctx->recv_status_on_client.trailing_metadata); return &(ctx->recv_status_on_client.trailing_metadata);
} }
GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_batch_context_server_rpc_new_call( GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_request_call_context_call(
const grpcsharp_batch_context *ctx) { const grpcsharp_request_call_context *ctx) {
return ctx->server_rpc_new.call; return ctx->call;
} }
GPR_EXPORT const char *GPR_CALLTYPE GPR_EXPORT const char *GPR_CALLTYPE
grpcsharp_batch_context_server_rpc_new_method( grpcsharp_request_call_context_method(
const grpcsharp_batch_context *ctx) { const grpcsharp_request_call_context *ctx) {
return ctx->server_rpc_new.call_details.method; return ctx->call_details.method;
} }
GPR_EXPORT const char *GPR_CALLTYPE grpcsharp_batch_context_server_rpc_new_host( GPR_EXPORT const char *GPR_CALLTYPE grpcsharp_request_call_context_host(
const grpcsharp_batch_context *ctx) { const grpcsharp_request_call_context *ctx) {
return ctx->server_rpc_new.call_details.host; return ctx->call_details.host;
} }
GPR_EXPORT gpr_timespec GPR_CALLTYPE GPR_EXPORT gpr_timespec GPR_CALLTYPE
grpcsharp_batch_context_server_rpc_new_deadline( grpcsharp_request_call_context_deadline(
const grpcsharp_batch_context *ctx) { const grpcsharp_request_call_context *ctx) {
return ctx->server_rpc_new.call_details.deadline; return ctx->call_details.deadline;
} }
GPR_EXPORT const grpc_metadata_array *GPR_CALLTYPE GPR_EXPORT const grpc_metadata_array *GPR_CALLTYPE
grpcsharp_batch_context_server_rpc_new_request_metadata( grpcsharp_request_call_context_request_metadata(
const grpcsharp_batch_context *ctx) { const grpcsharp_request_call_context *ctx) {
return &(ctx->server_rpc_new.request_metadata); return &(ctx->request_metadata);
} }
GPR_EXPORT int32_t GPR_CALLTYPE GPR_EXPORT int32_t GPR_CALLTYPE
@ -853,10 +867,10 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq, grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq,
grpcsharp_batch_context *ctx) { grpcsharp_request_call_context *ctx) {
return grpc_server_request_call( return grpc_server_request_call(
server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details), server, &(ctx->call), &(ctx->call_details),
&(ctx->server_rpc_new.request_metadata), cq, cq, ctx); &(ctx->request_metadata), cq, cq, ctx);
} }
/* Security */ /* Security */

Loading…
Cancel
Save