Stop using native callbacks for C#

pull/1966/head
Jan Tattermusch 10 years ago
parent cce361f7ca
commit d367748d89
  1. 4
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  2. 52
      src/csharp/Grpc.Core.Tests/Internal/CompletionQueueEventTest.cs
  3. 64
      src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
  4. 8
      src/csharp/Grpc.Core.Tests/PInvokeTest.cs
  5. 4
      src/csharp/Grpc.Core/Grpc.Core.csproj
  6. 31
      src/csharp/Grpc.Core/GrpcEnvironment.cs
  7. 35
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  8. 38
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  9. 8
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  10. 44
      src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
  11. 104
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  12. 60
      src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
  13. 14
      src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
  14. 88
      src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
  15. 2
      src/csharp/Grpc.Core/Internal/DebugStats.cs
  16. 40
      src/csharp/Grpc.Core/Internal/Enums.cs
  17. 20
      src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
  18. 25
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  19. 2
      src/csharp/Grpc.Core/Internal/Timespec.cs
  20. 53
      src/csharp/Grpc.Core/Server.cs
  21. 112
      src/csharp/ext/grpc_csharp_ext.c

@ -3,7 +3,7 @@
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProductVersion>10.0.0</ProductVersion>
<ProductVersion>8.0.30703</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{86EC5CB4-4EA2-40A2-8057-86542A0353BB}</ProjectGuid>
<OutputType>Library</OutputType>
@ -46,6 +46,8 @@
<Compile Include="TimespecTest.cs" />
<Compile Include="PInvokeTest.cs" />
<Compile Include="Internal\MetadataArraySafeHandleTest.cs" />
<Compile Include="Internal\CompletionQueueSafeHandleTest.cs" />
<Compile Include="Internal\CompletionQueueEventTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -0,0 +1,52 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
public class CompletionQueueEventTest
{
[Test]
public void CreateAndDestroy()
{
Assert.AreEqual(CompletionQueueEvent.NativeSize, Marshal.SizeOf(typeof(CompletionQueueEvent)));
}
}
}

@ -0,0 +1,64 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
public class CompletionQueueSafeHandleTest
{
[Test]
public void CreateAndDestroy()
{
var cq = CompletionQueueSafeHandle.Create();
cq.Dispose();
}
[Test]
public void CreateAndShutdown()
{
var cq = CompletionQueueSafeHandle.Create();
cq.Shutdown();
var ev = cq.Next();
cq.Dispose();
Assert.AreEqual(GRPCCompletionType.Shutdown, ev.type);
Assert.AreNotEqual(IntPtr.Zero, ev.success);
Assert.AreEqual(IntPtr.Zero, ev.tag);
}
}
}

@ -48,7 +48,7 @@ namespace Grpc.Core.Tests
int counter;
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_test_nop(IntPtr ptr);
@ -88,7 +88,7 @@ namespace Grpc.Core.Tests
[Test]
public void NativeCallbackBenchmark()
{
CompletionCallbackDelegate handler = Handler;
OpCompletionDelegate handler = Handler;
counter = 0;
BenchmarkUtil.RunBenchmark(
@ -114,7 +114,7 @@ namespace Grpc.Core.Tests
10000, 10000,
() =>
{
grpcsharp_test_callback(new CompletionCallbackDelegate(Handler));
grpcsharp_test_callback(new OpCompletionDelegate(Handler));
});
Assert.AreNotEqual(0, counter);
}
@ -134,7 +134,7 @@ namespace Grpc.Core.Tests
});
}
private void Handler(bool success, IntPtr ptr)
private void Handler(bool success)
{
counter++;
}

@ -73,7 +73,6 @@
<Compile Include="Marshaller.cs" />
<Compile Include="ServerServiceDefinition.cs" />
<Compile Include="Utils\AsyncStreamExtensions.cs" />
<Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" />
<Compile Include="Utils\BenchmarkUtil.cs" />
<Compile Include="Utils\ExceptionHelper.cs" />
<Compile Include="Internal\CredentialsSafeHandle.cs" />
@ -101,6 +100,9 @@
<Compile Include="Internal\AtomicCounter.cs" />
<Compile Include="Internal\DebugStats.cs" />
<Compile Include="ServerCallContext.cs" />
<Compile Include="Internal\CompletionQueueEvent.cs" />
<Compile Include="Internal\CompletionRegistry.cs" />
<Compile Include="Internal\BatchContextSafeHandle.cs" />
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />

@ -54,6 +54,7 @@ namespace Grpc.Core
static volatile GrpcEnvironment instance;
readonly GrpcThreadPool threadPool;
readonly CompletionRegistry completionRegistry;
bool isClosed;
/// <summary>
@ -105,6 +106,19 @@ namespace Grpc.Core
}
}
internal static CompletionRegistry CompletionRegistry
{
get
{
var inst = instance;
if (inst == null)
{
throw new InvalidOperationException("GRPC environment not initialized");
}
return inst.completionRegistry;
}
}
/// <summary>
/// Creates gRPC environment.
/// </summary>
@ -112,6 +126,7 @@ namespace Grpc.Core
{
GrpcLog.RedirectNativeLogs(Console.Error);
grpcsharp_init();
completionRegistry = new CompletionRegistry();
threadPool = new GrpcThreadPool(THREAD_POOL_SIZE);
threadPool.Start();
// TODO: use proper logging here
@ -139,14 +154,24 @@ namespace Grpc.Core
{
var remainingClientCalls = DebugStats.ActiveClientCalls.Count;
if (remainingClientCalls != 0)
{
Console.WriteLine("Warning: Detected {0} client calls that weren't disposed properly.", remainingClientCalls);
{
DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
}
var remainingServerCalls = DebugStats.ActiveServerCalls.Count;
if (remainingServerCalls != 0)
{
Console.WriteLine("Warning: Detected {0} server calls that weren't disposed properly.", remainingServerCalls);
DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
}
var pendingBatchCompletions = DebugStats.PendingBatchCompletions.Count;
if (pendingBatchCompletions != 0)
{
DebugWarning(string.Format("Detected {0} pending batch completions.", pendingBatchCompletions));
}
}
private static void DebugWarning(string message)
{
throw new Exception("Shutdown check: " + message);
}
}
}

@ -47,9 +47,6 @@ namespace Grpc.Core.Internal
/// </summary>
internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
{
readonly CompletionCallbackDelegate unaryResponseHandler;
readonly CompletionCallbackDelegate finishedHandler;
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@ -60,8 +57,6 @@ namespace Grpc.Core.Internal
public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
{
this.unaryResponseHandler = CreateBatchCompletionCallback(HandleUnaryResponse);
this.finishedHandler = CreateBatchCompletionCallback(HandleFinished);
}
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName)
@ -96,7 +91,21 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.BlockingUnary(cq, payload, unaryResponseHandler, metadataArray);
using (var ctx = BatchContextSafeHandle.Create())
{
call.StartUnary(payload, ctx, metadataArray);
var ev = cq.Pluck(ctx.Handle);
bool success = (ev.success != 0);
try
{
HandleUnaryResponse(success, ctx);
}
catch (Exception e)
{
Console.WriteLine("Exception occured while invoking completion delegate: " + e);
}
}
}
try
@ -129,7 +138,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartUnary(payload, unaryResponseHandler, metadataArray);
call.StartUnary(payload, HandleUnaryResponse, metadataArray);
}
return unaryResponseTcs.Task;
}
@ -151,7 +160,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartClientStreaming(unaryResponseHandler, metadataArray);
call.StartClientStreaming(HandleUnaryResponse, metadataArray);
}
return unaryResponseTcs.Task;
@ -175,7 +184,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartServerStreaming(payload, finishedHandler, metadataArray);
call.StartServerStreaming(payload, HandleFinished, metadataArray);
}
}
}
@ -194,7 +203,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartDuplexStreaming(finishedHandler, metadataArray);
call.StartDuplexStreaming(HandleFinished, metadataArray);
}
}
}
@ -229,7 +238,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
call.StartSendCloseFromClient(halfclosedHandler);
call.StartSendCloseFromClient(HandleHalfclosed);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;
@ -274,7 +283,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handler for unary response completion.
/// </summary>
private void HandleUnaryResponse(bool success, BatchContextSafeHandleNotOwned ctx)
private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx)
{
lock (myLock)
{
@ -307,7 +316,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
private void HandleFinished(bool success, BatchContextSafeHandleNotOwned ctx)
private void HandleFinished(bool success, BatchContextSafeHandle ctx)
{
var status = ctx.GetReceivedStatus();

@ -51,10 +51,6 @@ namespace Grpc.Core.Internal
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
protected readonly CompletionCallbackDelegate sendFinishedHandler;
protected readonly CompletionCallbackDelegate readFinishedHandler;
protected readonly CompletionCallbackDelegate halfclosedHandler;
protected readonly object myLock = new object();
protected GCHandle gchandle;
@ -77,10 +73,6 @@ namespace Grpc.Core.Internal
{
this.serializer = Preconditions.CheckNotNull(serializer);
this.deserializer = Preconditions.CheckNotNull(deserializer);
this.sendFinishedHandler = CreateBatchCompletionCallback(HandleSendFinished);
this.readFinishedHandler = CreateBatchCompletionCallback(HandleReadFinished);
this.halfclosedHandler = CreateBatchCompletionCallback(HandleHalfclosed);
}
/// <summary>
@ -141,7 +133,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
call.StartSendMessage(payload, sendFinishedHandler);
call.StartSendMessage(payload, HandleSendFinished);
sendCompletionDelegate = completionDelegate;
}
}
@ -157,7 +149,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckReadingAllowed();
call.StartReceiveMessage(readFinishedHandler);
call.StartReceiveMessage(HandleReadFinished);
readCompletionDelegate = completionDelegate;
}
}
@ -281,30 +273,10 @@ namespace Grpc.Core.Internal
}
}
/// <summary>
/// Creates completion callback delegate that wraps the batch completion handler in a try catch block to
/// prevent propagating exceptions accross managed/unmanaged boundary.
/// </summary>
protected CompletionCallbackDelegate CreateBatchCompletionCallback(Action<bool, BatchContextSafeHandleNotOwned> handler)
{
return new CompletionCallbackDelegate((success, batchContextPtr) =>
{
try
{
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
handler(success, ctx);
}
catch (Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
});
}
/// <summary>
/// Handles send completion.
/// </summary>
private void HandleSendFinished(bool success, BatchContextSafeHandleNotOwned ctx)
protected void HandleSendFinished(bool success, BatchContextSafeHandle ctx)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
@ -328,7 +300,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles halfclose completion.
/// </summary>
private void HandleHalfclosed(bool success, BatchContextSafeHandleNotOwned ctx)
protected void HandleHalfclosed(bool success, BatchContextSafeHandle ctx)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
@ -353,7 +325,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles streaming read completion.
/// </summary>
private void HandleReadFinished(bool success, BatchContextSafeHandleNotOwned ctx)
protected void HandleReadFinished(bool success, BatchContextSafeHandle ctx)
{
var payload = ctx.GetReceivedMessage();

@ -47,12 +47,10 @@ namespace Grpc.Core.Internal
/// </summary>
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
{
readonly CompletionCallbackDelegate finishedServersideHandler;
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer) : base(serializer, deserializer)
{
this.finishedServersideHandler = CreateBatchCompletionCallback(HandleFinishedServerside);
}
public void Initialize(CallSafeHandle call)
@ -72,7 +70,7 @@ namespace Grpc.Core.Internal
started = true;
call.StartServerSide(finishedServersideHandler);
call.StartServerSide(HandleFinishedServerside);
return finishedServersideTcs.Task;
}
}
@ -107,7 +105,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
call.StartSendStatusFromServer(status, halfclosedHandler);
call.StartSendStatusFromServer(status, HandleHalfclosed);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;
}
@ -121,7 +119,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles the server side close completion.
/// </summary>
private void HandleFinishedServerside(bool success, BatchContextSafeHandleNotOwned ctx)
private void HandleFinishedServerside(bool success, BatchContextSafeHandle ctx)
{
bool cancelled = ctx.GetReceivedCloseOnServerCancelled();

@ -41,32 +41,50 @@ namespace Grpc.Core.Internal
/// Not owned version of
/// grpcsharp_batch_context
/// </summary>
internal class BatchContextSafeHandleNotOwned : SafeHandleZeroIsInvalid
internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandleNotOwned ctx);
static extern BatchContextSafeHandle grpcsharp_batch_context_create();
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandleNotOwned ctx, byte[] buffer, UIntPtr bufferLen);
static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandleNotOwned ctx);
static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandleNotOwned ctx); // returns const char*
static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandleNotOwned ctx);
static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandle ctx); // returns const char*
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char*
static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandleNotOwned ctx);
static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandle ctx); // returns const char*
public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false)
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_batch_context_destroy(IntPtr ctx);
private BatchContextSafeHandle()
{
}
public static BatchContextSafeHandle Create()
{
SetHandle(handle);
return grpcsharp_batch_context_create();
}
public IntPtr Handle
{
get
{
return handle;
}
}
public Status GetReceivedStatus()
@ -102,5 +120,11 @@ namespace Grpc.Core.Internal
{
return grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0;
}
protected override bool ReleaseHandle()
{
grpcsharp_batch_context_destroy(handle);
return true;
}
}
}

@ -37,8 +37,6 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
internal delegate void CompletionCallbackDelegate(bool success, IntPtr batchContextPtr);
/// <summary>
/// grpc_call from <grpc/grpc.h>
/// </summary>
@ -57,49 +55,40 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray);
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len,
MetadataArraySafeHandle metadataArray);
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray);
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage);
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_destroy(IntPtr call);
@ -113,64 +102,84 @@ namespace Grpc.Core.Internal
return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
}
public void StartUnary(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
.CheckOk();
}
public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray)
{
grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray);
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
.CheckOk();
}
public void StartClientStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
AssertCallOk(grpcsharp_call_start_client_streaming(this, callback, metadataArray));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
}
public void StartDuplexStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback, metadataArray));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback)
public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
{
AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong)payload.Length)));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
}
public void StartSendCloseFromClient(CompletionCallbackDelegate callback)
public void StartSendCloseFromClient(BatchCompletionDelegate callback)
{
AssertCallOk(grpcsharp_call_send_close_from_client(this, callback));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback)
public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback)
{
AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail).CheckOk();
}
public void StartReceiveMessage(CompletionCallbackDelegate callback)
public void StartReceiveMessage(BatchCompletionDelegate callback)
{
AssertCallOk(grpcsharp_call_recv_message(this, callback));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_recv_message(this, ctx).CheckOk();
}
public void StartServerSide(CompletionCallbackDelegate callback)
public void StartServerSide(BatchCompletionDelegate callback)
{
AssertCallOk(grpcsharp_call_start_serverside(this, callback));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
public void Cancel()
{
AssertCallOk(grpcsharp_call_cancel(this));
grpcsharp_call_cancel(this).CheckOk();
}
public void CancelWithStatus(Status status)
{
AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail));
grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail).CheckOk();
}
protected override bool ReleaseHandle()
@ -179,14 +188,11 @@ namespace Grpc.Core.Internal
return true;
}
private static void AssertCallOk(GRPCCallError callError)
{
Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
}
private static uint GetFlags(bool buffered)
{
return buffered ? 0 : GRPC_WRITE_BUFFER_HINT;
}
}
}

@ -0,0 +1,60 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
namespace Grpc.Core.Internal
{
/// <summary>
/// grpc_event from grpc/grpc.h
/// </summary>
[StructLayout(LayoutKind.Sequential)]
internal struct CompletionQueueEvent
{
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_sizeof_grpc_event();
public GRPCCompletionType type;
public int success;
public IntPtr tag;
internal static int NativeSize
{
get
{
return grpcsharp_sizeof_grpc_event();
}
}
}
}

@ -46,7 +46,10 @@ namespace Grpc.Core.Internal
static extern void grpcsharp_completion_queue_shutdown(CompletionQueueSafeHandle cq);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCompletionType grpcsharp_completion_queue_next_with_callback(CompletionQueueSafeHandle cq);
static extern CompletionQueueEvent grpcsharp_completion_queue_next(CompletionQueueSafeHandle cq);
[DllImport("grpc_csharp_ext.dll")]
static extern CompletionQueueEvent grpcsharp_completion_queue_pluck(CompletionQueueSafeHandle cq, IntPtr tag);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_completion_queue_destroy(IntPtr cq);
@ -60,9 +63,14 @@ namespace Grpc.Core.Internal
return grpcsharp_completion_queue_create();
}
public GRPCCompletionType NextWithCallback()
public CompletionQueueEvent Next()
{
return grpcsharp_completion_queue_next(this);
}
public CompletionQueueEvent Pluck(IntPtr tag)
{
return grpcsharp_completion_queue_next_with_callback(this);
return grpcsharp_completion_queue_pluck(this, tag);
}
public void Shutdown()

@ -0,0 +1,88 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
internal delegate void OpCompletionDelegate(bool success);
internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx);
internal class CompletionRegistry
{
readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>();
public void Register(IntPtr key, OpCompletionDelegate callback)
{
DebugStats.PendingBatchCompletions.Increment();
Preconditions.CheckState(dict.TryAdd(key, callback));
}
public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
{
OpCompletionDelegate opCallback = ((success) => HandleBatchCompletion(success, ctx, callback));
Register(ctx.Handle, opCallback);
}
public OpCompletionDelegate Extract(IntPtr key)
{
OpCompletionDelegate value;
Preconditions.CheckState(dict.TryRemove(key, out value));
DebugStats.PendingBatchCompletions.Decrement();
return value;
}
private static void HandleBatchCompletion(bool success, BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
{
try
{
callback(success, ctx);
}
catch (Exception e)
{
Console.WriteLine("Exception occured while invoking completion delegate: " + e);
}
finally
{
if (ctx != null)
{
ctx.Dispose();
}
}
}
}
}

@ -41,5 +41,7 @@ namespace Grpc.Core.Internal
public static readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
public static readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
public static readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
}
}

@ -33,35 +33,47 @@
using System;
using System.Runtime.InteropServices;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// from grpc/grpc.h
/// grpc_call_error from grpc/grpc.h
/// </summary>
internal enum GRPCCallError
{
/* everything went ok */
GRPC_CALL_OK = 0,
OK = 0,
/* something failed, we don't know what */
GRPC_CALL_ERROR,
Error,
/* this method is not available on the server */
GRPC_CALL_ERROR_NOT_ON_SERVER,
NotOnServer,
/* this method is not available on the client */
GRPC_CALL_ERROR_NOT_ON_CLIENT,
NotOnClient,
/* this method must be called before server_accept */
GRPC_CALL_ERROR_ALREADY_ACCEPTED,
AlreadyAccepted,
/* this method must be called before invoke */
GRPC_CALL_ERROR_ALREADY_INVOKED,
AlreadyInvoked,
/* this method must be called after invoke */
GRPC_CALL_ERROR_NOT_INVOKED,
NotInvoked,
/* this call is already finished
(writes_done or write_status has already been called) */
GRPC_CALL_ERROR_ALREADY_FINISHED,
AlreadyFinished,
/* there is already an outstanding read/write operation on the call */
GRPC_CALL_ERROR_TOO_MANY_OPERATIONS,
TooManyOperations,
/* the flags value was illegal for this call */
GRPC_CALL_ERROR_INVALID_FLAGS
InvalidFlags
}
internal static class CallErrorExtensions
{
/// <summary>
/// Checks the call API invocation's result is OK.
/// </summary>
public static void CheckOk(this GRPCCallError callError)
{
Preconditions.CheckState(callError == GRPCCallError.OK, "Call error: " + callError);
}
}
/// <summary>
@ -70,12 +82,12 @@ namespace Grpc.Core.Internal
internal enum GRPCCompletionType
{
/* Shutting down */
GRPC_QUEUE_SHUTDOWN,
Shutdown,
/* No event before timeout */
GRPC_QUEUE_TIMEOUT,
Timeout,
/* operation completion */
GRPC_OP_COMPLETE
OpComplete
}
}

@ -112,12 +112,26 @@ namespace Grpc.Core.Internal
/// </summary>
private void RunHandlerLoop()
{
GRPCCompletionType completionType;
CompletionQueueEvent ev;
do
{
completionType = cq.NextWithCallback();
ev = cq.Next();
if (ev.type == GRPCCompletionType.OpComplete)
{
bool success = (ev.success != 0);
IntPtr tag = ev.tag;
try
{
var callback = GrpcEnvironment.CompletionRegistry.Extract(tag);
callback(success);
}
catch (Exception e)
{
Console.WriteLine("Exception occured while invoking completion delegate: " + e);
}
}
}
while (completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN);
while (ev.type != GRPCCompletionType.Shutdown);
Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting.");
}
}

@ -44,9 +44,6 @@ namespace Grpc.Core.Internal
/// </summary>
internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
@ -59,11 +56,14 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_start(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_shutdown(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_shutdown_and_notify_callback(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
static extern void grpcsharp_server_shutdown_and_notify_callback(ServerSafeHandle server, BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_destroy(IntPtr server);
@ -97,14 +97,18 @@ namespace Grpc.Core.Internal
grpcsharp_server_shutdown(this);
}
public void ShutdownAndNotify(CompletionCallbackDelegate callback)
public void ShutdownAndNotify(BatchCompletionDelegate callback)
{
grpcsharp_server_shutdown_and_notify_callback(this, callback);
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_server_shutdown_and_notify_callback(this, ctx);
}
public void RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
public void RequestCall(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
{
AssertCallOk(grpcsharp_server_request_call(this, cq, callback));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_server_request_call(this, cq, ctx).CheckOk();
}
protected override bool ReleaseHandle()
@ -112,10 +116,5 @@ namespace Grpc.Core.Internal
grpcsharp_server_destroy(handle);
return true;
}
private static void AssertCallOk(GRPCCallError callError)
{
Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
}
}
}

@ -51,7 +51,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern int gprsharp_sizeof_timespec();
// TODO: revisit this.
// NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8
// so IntPtr seems to have the right size to work on both.
public System.IntPtr tv_sec;

@ -52,10 +52,8 @@ namespace Grpc.Core
/// </summary>
public const int PickUnusedPort = 0;
// TODO(jtattermusch) : make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly CompletionCallbackDelegate serverShutdownHandler;
readonly CompletionCallbackDelegate newServerRpcHandler;
//readonly OpCompletionDelegate serverShutdownHandler;
//readonly OpCompletionDelegate newServerRpcHandler;
readonly ServerSafeHandle handle;
readonly object myLock = new object();
@ -69,8 +67,8 @@ namespace Grpc.Core
public Server()
{
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
this.newServerRpcHandler = HandleNewServerRpc;
this.serverShutdownHandler = HandleServerShutdown;
//this.newServerRpcHandler = HandleNewServerRpc;
//this.serverShutdownHandler = HandleServerShutdown;
}
/// <summary>
@ -144,7 +142,8 @@ namespace Grpc.Core
shutdownRequested = true;
}
handle.ShutdownAndNotify(serverShutdownHandler);
var ctx = BatchContextSafeHandle.Create();
handle.ShutdownAndNotify(HandleServerShutdown);
await shutdownTcs.Task;
handle.Dispose();
}
@ -194,7 +193,7 @@ namespace Grpc.Core
{
if (!shutdownRequested)
{
handle.RequestCall(GetCompletionQueue(), newServerRpcHandler);
handle.RequestCall(GetCompletionQueue(), HandleNewServerRpc);
}
}
}
@ -222,44 +221,28 @@ namespace Grpc.Core
/// <summary>
/// Handles the native callback.
/// </summary>
private void HandleNewServerRpc(bool success, IntPtr batchContextPtr)
private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx)
{
try
{
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
// TODO: handle error
CallSafeHandle call = ctx.GetServerRpcNewCall();
string method = ctx.GetServerRpcNewMethod();
// TODO: handle error
// after server shutdown, the callback returns with null call
if (!call.IsInvalid)
{
Task.Run(async () => await InvokeCallHandler(call, method));
}
CallSafeHandle call = ctx.GetServerRpcNewCall();
string method = ctx.GetServerRpcNewMethod();
AllowOneRpc();
}
catch (Exception e)
// after server shutdown, the callback returns with null call
if (!call.IsInvalid)
{
Console.WriteLine("Caught exception in a native handler: " + e);
Task.Run(async () => await InvokeCallHandler(call, method));
}
AllowOneRpc();
}
/// <summary>
/// Handles native callback.
/// </summary>
private void HandleServerShutdown(bool success, IntPtr batchContextPtr)
private void HandleServerShutdown(bool success, BatchContextSafeHandle ctx)
{
try
{
shutdownTcs.SetResult(null);
}
catch (Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
shutdownTcs.SetResult(null);
}
private static CompletionQueueSafeHandle GetCompletionQueue()

@ -91,13 +91,9 @@ typedef struct gprcsharp_batch_context {
grpc_call_details call_details;
grpc_metadata_array request_metadata;
} server_rpc_new;
/* callback will be called upon completion */
callback_funcptr callback;
} grpcsharp_batch_context;
grpcsharp_batch_context *grpcsharp_batch_context_create() {
GPR_EXPORT grpcsharp_batch_context *GPR_CALLTYPE grpcsharp_batch_context_create() {
grpcsharp_batch_context *ctx = gpr_malloc(sizeof(grpcsharp_batch_context));
memset(ctx, 0, sizeof(grpcsharp_batch_context));
return ctx;
@ -192,7 +188,7 @@ void grpcsharp_metadata_array_move(grpc_metadata_array *dest,
src->metadata = NULL;
}
void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) {
GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) {
if (!ctx) {
return;
}
@ -306,25 +302,14 @@ grpcsharp_completion_queue_destroy(grpc_completion_queue *cq) {
grpc_completion_queue_destroy(cq);
}
GPR_EXPORT grpc_completion_type GPR_CALLTYPE
grpcsharp_completion_queue_next_with_callback(grpc_completion_queue *cq) {
grpc_event ev;
grpcsharp_batch_context *batch_context;
grpc_completion_type t;
ev = grpc_completion_queue_next(cq, gpr_inf_future);
t = ev.type;
if (t == GRPC_OP_COMPLETE && ev.tag) {
/* NEW API handler */
batch_context = (grpcsharp_batch_context *)ev.tag;
batch_context->callback((gpr_int32) ev.success, batch_context);
grpcsharp_batch_context_destroy(batch_context);
}
GPR_EXPORT grpc_event GPR_CALLTYPE
grpcsharp_completion_queue_next(grpc_completion_queue *cq) {
return grpc_completion_queue_next(cq, gpr_inf_future);
}
/* return completion type to allow some handling for events that have no
* tag - such as GRPC_QUEUE_SHUTDOWN
*/
return t;
GPR_EXPORT grpc_event GPR_CALLTYPE
grpcsharp_completion_queue_pluck(grpc_completion_queue *cq, void *tag) {
return grpc_completion_queue_pluck(cq, tag, gpr_inf_future);
}
/* Channel */
@ -413,14 +398,11 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[6];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@ -454,34 +436,12 @@ grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
/* Synchronous unary call */
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_call_blocking_unary(grpc_call *call,
grpc_completion_queue *dedicated_cq,
callback_funcptr callback,
const char *send_buffer, size_t send_buffer_len,
grpc_metadata_array *initial_metadata) {
GPR_ASSERT(grpcsharp_call_start_unary(call, callback, send_buffer,
send_buffer_len,
initial_metadata) == GRPC_CALL_OK);
/* TODO: we would like to use pluck, but we don't know the tag */
GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) ==
GRPC_OP_COMPLETE);
grpc_completion_queue_shutdown(dedicated_cq);
GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) ==
GRPC_QUEUE_SHUTDOWN);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_client_streaming(grpc_call *call,
callback_funcptr callback,
grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[4];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@ -510,13 +470,10 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
grpc_call *call, callback_funcptr callback, const char *send_buffer,
grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer,
size_t send_buffer_len, grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[5];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@ -549,13 +506,10 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_duplex_streaming(grpc_call *call,
callback_funcptr callback,
grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[3];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@ -581,13 +535,10 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_message(grpc_call *call, callback_funcptr callback,
grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
@ -597,12 +548,9 @@ grpcsharp_call_send_message(grpc_call *call, callback_funcptr callback,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_close_from_client(grpc_call *call,
callback_funcptr callback) {
grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
@ -610,14 +558,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_status_from_server(grpc_call *call,
callback_funcptr callback,
grpcsharp_batch_context *ctx,
grpc_status_code status_code,
const char *status_details) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
@ -629,25 +574,18 @@ grpcsharp_call_send_status_from_server(grpc_call *call,
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_recv_message(grpc_call *call, callback_funcptr callback) {
grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_RECV_MESSAGE;
ops[0].data.recv_message = &(ctx->recv_message);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_serverside(grpc_call *call, callback_funcptr callback) {
grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[2];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
@ -684,9 +622,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_shutdown(grpc_server *server) {
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_server_shutdown_and_notify_callback(grpc_server *server,
callback_funcptr callback) {
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
grpcsharp_batch_context *ctx) {
grpc_server_shutdown_and_notify(server, ctx);
}
@ -696,10 +632,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq,
callback_funcptr callback) {
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
grpcsharp_batch_context *ctx) {
return grpc_server_request_call(
server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details),
&(ctx->server_rpc_new.request_metadata), cq, cq, ctx);
@ -796,3 +729,8 @@ grpcsharp_test_callback(callback_funcptr callback) {
/* For testing */
GPR_EXPORT void *GPR_CALLTYPE grpcsharp_test_nop(void *ptr) { return ptr; }
/* For testing */
GPR_EXPORT gpr_int32 GPR_CALLTYPE grpcsharp_sizeof_grpc_event(void) {
return sizeof(grpc_event);
}

Loading…
Cancel
Save