Merge pull request #578 from jtattermusch/csharp_new_api

Migration of C# to the new C API
pull/589/head
Michael Lumish 10 years ago
commit 5977a6ae14
  1. 6
      src/csharp/GrpcApi/MathGrpc.cs
  2. 6
      src/csharp/GrpcApi/TestServiceGrpc.cs
  3. 18
      src/csharp/GrpcApiTests/MathClientServerTests.cs
  4. 58
      src/csharp/GrpcCore/Calls.cs
  5. 6
      src/csharp/GrpcCore/GrpcCore.csproj
  6. 2
      src/csharp/GrpcCore/GrpcEnvironment.cs
  7. 588
      src/csharp/GrpcCore/Internal/AsyncCall.cs
  8. 96
      src/csharp/GrpcCore/Internal/BatchContextSafeHandleNotOwned.cs
  9. 138
      src/csharp/GrpcCore/Internal/CallSafeHandle.cs
  10. 15
      src/csharp/GrpcCore/Internal/ClientStreamingInputObserver.cs
  11. 16
      src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs
  12. 224
      src/csharp/GrpcCore/Internal/Event.cs
  13. 47
      src/csharp/GrpcCore/Internal/GrpcThreadPool.cs
  14. 6
      src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs
  15. 24
      src/csharp/GrpcCore/Internal/ServerSafeHandle.cs
  16. 10
      src/csharp/GrpcCore/Internal/ServerStreamingOutputObserver.cs
  17. 31
      src/csharp/GrpcCore/Server.cs
  18. 47
      src/csharp/GrpcCore/ServerCallHandler.cs
  19. 73
      src/csharp/GrpcCoreTests/ClientServerTest.cs
  20. 507
      src/csharp/ext/grpc_csharp_ext.c

@ -81,7 +81,7 @@ namespace math
Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken));
Task Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken));
void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken));
ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken));
@ -109,10 +109,10 @@ namespace math
return Calls.AsyncUnaryCall(call, request, token);
}
public Task Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken))
public void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken))
{
var call = new Google.GRPC.Core.Call<FibArgs, Num>(fibMethod, channel);
return Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
}
public ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken))

@ -99,7 +99,7 @@ namespace grpc.testing
Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken));
Task StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken));
void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken));
ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken));
@ -141,9 +141,9 @@ namespace grpc.testing
return Calls.AsyncUnaryCall(call, request, token);
}
public Task StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) {
public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) {
var call = new Google.GRPC.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(streamingOutputCallMethod, channel);
return Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
}
public ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken))

@ -64,6 +64,15 @@ namespace math.Tests
client = MathGrpc.NewStub(channel);
}
[TestFixtureTearDown]
public void Cleanup()
{
channel.Dispose();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
[Test]
public void Div1()
{
@ -136,15 +145,6 @@ namespace math.Tests
CollectionAssert.AreEqual(new long[] {3, 4, 3}, result.ConvertAll((divReply) => divReply.Quotient));
CollectionAssert.AreEqual(new long[] {1, 16, 1}, result.ConvertAll((divReply) => divReply.Remainder));
}
[TestFixtureTearDown]
public void Cleanup()
{
channel.Dispose();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
}
}

@ -47,50 +47,42 @@ namespace Google.GRPC.Core
{
public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{
//TODO: implement this in real synchronous style once new GRPC C core API is available.
return AsyncUnaryCall(call, req, token).Result;
//TODO: implement this in real synchronous style.
try {
return AsyncUnaryCall(call, req, token).Result;
} catch(AggregateException ae) {
foreach (var e in ae.InnerExceptions)
{
if (e is RpcException)
{
throw e;
}
}
throw;
}
}
public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
await asyncCall.WriteAsync(req);
await asyncCall.WritesCompletedAsync();
TResponse response = await asyncCall.ReadAsync();
Status status = await asyncCall.Finished;
if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
{
throw new RpcException(status);
}
return response;
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName);
return await asyncCall.UnaryCallAsync(req);
}
public static async Task AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token)
public static void AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
asyncCall.StartReadingToStream(outputs);
await asyncCall.WriteAsync(req);
await asyncCall.WritesCompletedAsync();
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName);
asyncCall.StartServerStreamingCall(req, outputs);
}
public static ClientStreamingAsyncResult<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
var task = asyncCall.ReadAsync();
var inputs = new StreamingInputObserver<TRequest, TResponse>(asyncCall);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName);
var task = asyncCall.ClientStreamingCallAsync();
var inputs = new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall);
return new ClientStreamingAsyncResult<TRequest, TResponse>(task, inputs);
}
@ -102,12 +94,10 @@ namespace Google.GRPC.Core
public static IObserver<TRequest> DuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, IObserver<TResponse> outputs, CancellationToken token)
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer);
asyncCall.Initialize(call.Channel, call.MethodName);
asyncCall.Start(false, GetCompletionQueue());
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName);
asyncCall.StartReadingToStream(outputs);
var inputs = new StreamingInputObserver<TRequest, TResponse>(asyncCall);
return inputs;
asyncCall.StartDuplexStreamingCall(outputs);
return new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall);
}
private static CompletionQueueSafeHandle GetCompletionQueue() {

@ -47,21 +47,21 @@
<Compile Include="Internal\ChannelSafeHandle.cs" />
<Compile Include="Internal\CompletionQueueSafeHandle.cs" />
<Compile Include="Internal\Enums.cs" />
<Compile Include="Internal\Event.cs" />
<Compile Include="Internal\SafeHandleZeroIsInvalid.cs" />
<Compile Include="Internal\Timespec.cs" />
<Compile Include="Internal\GrpcThreadPool.cs" />
<Compile Include="Internal\AsyncCall.cs" />
<Compile Include="Internal\ServerSafeHandle.cs" />
<Compile Include="Internal\StreamingInputObserver.cs" />
<Compile Include="Method.cs" />
<Compile Include="ServerCalls.cs" />
<Compile Include="ServerCallHandler.cs" />
<Compile Include="Internal\ServerWritingObserver.cs" />
<Compile Include="Marshaller.cs" />
<Compile Include="ServerServiceDefinition.cs" />
<Compile Include="Utils\RecordingObserver.cs" />
<Compile Include="Utils\RecordingQueue.cs" />
<Compile Include="Internal\ClientStreamingInputObserver.cs" />
<Compile Include="Internal\ServerStreamingOutputObserver.cs" />
<Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -42,7 +42,7 @@ namespace Google.GRPC.Core
/// </summary>
public class GrpcEnvironment
{
const int THREAD_POOL_SIZE = 1;
const int THREAD_POOL_SIZE = 4;
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_init();

@ -2,11 +2,11 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
@ -16,7 +16,7 @@
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
@ -42,171 +42,177 @@ using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core.Internal
{
/// <summary>
/// Listener for call events that can be delivered from a completion queue.
/// Handles native call lifecycle and provides convenience methods.
/// </summary>
internal interface ICallEventListener {
void OnClientMetadata();
void OnRead(byte[] payload);
void OnWriteAccepted(GRPCOpError error);
void OnFinishAccepted(GRPCOpError error);
// ignore the status on server
void OnFinished(Status status);
}
/// <summary>
/// Handle native call lifecycle and provides convenience methods.
/// </summary>
internal class AsyncCall<TWrite, TRead>: ICallEventListener, IDisposable
internal class AsyncCall<TWrite, TRead>
{
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
// TODO: make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly EventCallbackDelegate callbackHandler;
readonly CompletionCallbackDelegate unaryResponseHandler;
readonly CompletionCallbackDelegate finishedHandler;
readonly CompletionCallbackDelegate writeFinishedHandler;
readonly CompletionCallbackDelegate readFinishedHandler;
readonly CompletionCallbackDelegate halfclosedHandler;
readonly CompletionCallbackDelegate finishedServersideHandler;
object myLock = new object();
bool disposed;
GCHandle gchandle;
CallSafeHandle call;
bool disposed;
bool server;
bool started;
bool errorOccured;
bool cancelRequested;
bool readingDone;
bool halfcloseRequested;
bool halfclosed;
bool doneWithReading;
Nullable<Status> finishedStatus;
bool finished;
// Completion of a pending write if not null.
TaskCompletionSource<object> writeTcs;
// Completion of a pending read if not null.
TaskCompletionSource<TRead> readTcs;
TaskCompletionSource<object> halfcloseTcs = new TaskCompletionSource<object>();
TaskCompletionSource<Status> finishedTcs = new TaskCompletionSource<Status>();
// Completion of a pending halfclose if not null.
TaskCompletionSource<object> halfcloseTcs;
// Completion of a pending unary response if not null.
TaskCompletionSource<TRead> unaryResponseTcs;
// Set after status is received on client. Only used for server streaming and duplex streaming calls.
Nullable<Status> finishedStatus;
TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
// For streaming, the reads will be delivered to this observer.
IObserver<TRead> readObserver;
public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = serializer;
this.deserializer = deserializer;
this.callbackHandler = HandleEvent;
this.unaryResponseHandler = HandleUnaryResponse;
this.finishedHandler = HandleFinished;
this.writeFinishedHandler = HandleWriteFinished;
this.readFinishedHandler = HandleReadFinished;
this.halfclosedHandler = HandleHalfclosed;
this.finishedServersideHandler = HandleFinishedServerside;
}
public Task WriteAsync(TWrite msg)
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName)
{
return StartWrite(msg, false).Task;
InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false);
}
public Task WritesCompletedAsync()
public void InitializeServer(CallSafeHandle call)
{
WritesDone();
return halfcloseTcs.Task;
InitializeInternal(call, true);
}
public Task WriteStatusAsync(Status status)
public Task<TRead> UnaryCallAsync(TWrite msg)
{
WriteStatus(status);
return halfcloseTcs.Task;
}
lock (myLock)
{
started = true;
halfcloseRequested = true;
readingDone = true;
public Task<TRead> ReadAsync()
{
return StartRead().Task;
}
// TODO: handle serialization error...
byte[] payload = serializer(msg);
public Task Halfclosed
{
get
{
return halfcloseTcs.Task;
unaryResponseTcs = new TaskCompletionSource<TRead>();
call.StartUnary(payload, unaryResponseHandler);
return unaryResponseTcs.Task;
}
}
public Task<Status> Finished
public Task<TRead> ClientStreamingCallAsync()
{
get
lock (myLock)
{
return finishedTcs.Task;
started = true;
readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TRead>();
call.StartClientStreaming(unaryResponseHandler);
return unaryResponseTcs.Task;
}
}
/// <summary>
/// Initiates reading to given observer.
/// </summary>
public void StartReadingToStream(IObserver<TRead> readObserver) {
public void StartServerStreamingCall(TWrite msg, IObserver<TRead> readObserver)
{
lock (myLock)
{
CheckStarted();
if (this.readObserver != null)
{
throw new InvalidOperationException("Already registered an observer.");
}
started = true;
halfcloseRequested = true;
this.readObserver = readObserver;
StartRead();
}
}
public void Initialize(Channel channel, String methodName) {
lock (myLock)
{
this.call = CallSafeHandle.Create(channel.Handle, methodName, channel.Target, Timespec.InfFuture);
// TODO: handle serialization error...
byte[] payload = serializer(msg);
call.StartServerStreaming(payload, finishedHandler);
ReceiveMessageAsync();
}
}
public void InitializeServer(CallSafeHandle call)
public void StartDuplexStreamingCall(IObserver<TRead> readObserver)
{
lock(myLock)
lock (myLock)
{
this.call = call;
started = true;
this.readObserver = readObserver;
call.StartDuplexStreaming(finishedHandler);
ReceiveMessageAsync();
}
}
// Client only
public void Start(bool buffered, CompletionQueueSafeHandle cq)
public Task ServerSideUnaryRequestCallAsync()
{
lock (myLock)
{
if (started)
{
throw new InvalidOperationException("Already started.");
}
call.Invoke(cq, buffered, callbackHandler, callbackHandler);
started = true;
call.StartServerSide(finishedServersideHandler);
return finishedServersideTcs.Task;
}
}
// Server only
public void Accept(CompletionQueueSafeHandle cq)
public Task ServerSideStreamingRequestCallAsync(IObserver<TRead> readObserver)
{
lock (myLock)
{
if (started)
started = true;
call.StartServerSide(finishedServersideHandler);
if (this.readObserver != null)
{
throw new InvalidOperationException("Already started.");
throw new InvalidOperationException("Already registered an observer.");
}
this.readObserver = readObserver;
ReceiveMessageAsync();
call.ServerAccept(cq, callbackHandler);
call.ServerEndInitialMetadata(0);
started = true;
return finishedServersideTcs.Task;
}
}
public TaskCompletionSource<object> StartWrite(TWrite msg, bool buffered)
public Task SendMessageAsync(TWrite msg)
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
CheckNoError();
CheckCancelNotRequested();
if (halfcloseRequested || halfclosed)
if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
@ -219,63 +225,62 @@ namespace Google.GRPC.Core.Internal
// TODO: wrap serialization...
byte[] payload = serializer(msg);
call.StartWrite(payload, buffered, callbackHandler);
call.StartSendMessage(payload, writeFinishedHandler);
writeTcs = new TaskCompletionSource<object>();
return writeTcs;
return writeTcs.Task;
}
}
// client only
public void WritesDone()
public Task SendCloseFromClientAsync()
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
CheckNoError();
CheckCancelNotRequested();
if (halfcloseRequested || halfclosed)
if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
call.WritesDone(callbackHandler);
call.StartSendCloseFromClient(halfclosedHandler);
halfcloseRequested = true;
halfcloseTcs = new TaskCompletionSource<object>();
return halfcloseTcs.Task;
}
}
// server only
public void WriteStatus(Status status)
public Task SendStatusFromServerAsync(Status status)
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
CheckNoError();
CheckCancelNotRequested();
if (halfcloseRequested || halfclosed)
if (halfcloseRequested)
{
throw new InvalidOperationException("Already halfclosed.");
}
call.StartWriteStatus(status, callbackHandler);
call.StartSendStatusFromServer(status, halfclosedHandler);
halfcloseRequested = true;
halfcloseTcs = new TaskCompletionSource<object>();
return halfcloseTcs.Task;
}
}
public TaskCompletionSource<TRead> StartRead()
public Task<TRead> ReceiveMessageAsync()
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
CheckNoError();
// TODO: add check for not cancelled?
if (doneWithReading)
if (readingDone)
{
throw new InvalidOperationException("Already read the last message.");
}
@ -285,10 +290,10 @@ namespace Google.GRPC.Core.Internal
throw new InvalidOperationException("Only one read can be pending at a time");
}
call.StartRead(callbackHandler);
call.StartReceiveMessage(readFinishedHandler);
readTcs = new TaskCompletionSource<TRead>();
return readTcs;
return readTcs.Task;
}
}
@ -296,9 +301,8 @@ namespace Google.GRPC.Core.Internal
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
cancelRequested = true;
}
// grpc_call_cancel is threadsafe
@ -309,218 +313,304 @@ namespace Google.GRPC.Core.Internal
{
lock (myLock)
{
CheckNotDisposed();
CheckStarted();
CheckNotFinished();
cancelRequested = true;
}
// grpc_call_cancel_with_status is threadsafe
call.CancelWithStatus(status);
}
public void OnClientMetadata()
private void InitializeInternal(CallSafeHandle call, bool server)
{
// TODO: implement....
lock (myLock)
{
// Make sure this object and the delegated held by it will not be garbage collected
// before we release this handle.
gchandle = GCHandle.Alloc(this);
this.call = call;
this.server = server;
}
}
public void OnRead(byte[] payload)
private void CheckStarted()
{
TaskCompletionSource<TRead> oldTcs = null;
IObserver<TRead> observer = null;
lock (myLock)
if (!started)
{
oldTcs = readTcs;
readTcs = null;
if (payload == null)
{
doneWithReading = true;
}
observer = readObserver;
throw new InvalidOperationException("Call not started");
}
}
// TODO: wrap deserialization...
TRead msg = payload != null ? deserializer(payload) : default(TRead);
oldTcs.SetResult(msg);
// TODO: make sure we deliver reads in the right order.
private void CheckNotDisposed()
{
if (disposed)
{
throw new InvalidOperationException("Call has already been disposed.");
}
}
if (observer != null)
private void CheckNoError()
{
if (errorOccured)
{
if (payload != null)
{
// TODO: wrap to handle exceptions
observer.OnNext(msg);
throw new InvalidOperationException("Error occured when processing call.");
}
}
// start a new read
StartRead();
}
else
private bool ReleaseResourcesIfPossible()
{
if (!disposed && call != null)
{
if (halfclosed && readingDone && finished)
{
// TODO: wrap to handle exceptions;
observer.OnCompleted();
ReleaseResources();
return true;
}
}
return false;
}
private void ReleaseResources()
{
if (call != null) {
call.Dispose();
}
gchandle.Free();
disposed = true;
}
public void OnWriteAccepted(GRPCOpError error)
private void CompleteStreamObserver(Status status)
{
TaskCompletionSource<object> oldTcs = null;
lock (myLock)
if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
{
UpdateErrorOccured(error);
oldTcs = writeTcs;
writeTcs = null;
// TODO: wrap to handle exceptions;
readObserver.OnError(new RpcException(status));
} else {
// TODO: wrap to handle exceptions;
readObserver.OnCompleted();
}
}
if (errorOccured)
/// <summary>
/// Handler for unary response completion.
/// </summary>
private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr)
{
try
{
// TODO: use the right type of exception...
oldTcs.SetException(new Exception("Write failed"));
}
else
TaskCompletionSource<TRead> tcs;
lock(myLock)
{
finished = true;
halfclosed = true;
tcs = unaryResponseTcs;
ReleaseResourcesIfPossible();
}
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
if (error != GRPCOpError.GRPC_OP_OK)
{
tcs.SetException(new RpcException(
new Status(StatusCode.GRPC_STATUS_INTERNAL, "Internal error occured.")
));
return;
}
var status = ctx.GetReceivedStatus();
if (status.StatusCode != StatusCode.GRPC_STATUS_OK)
{
tcs.SetException(new RpcException(status));
return;
}
// TODO: handle deserialize error...
var msg = deserializer(ctx.GetReceivedMessage());
tcs.SetResult(msg);
}
catch(Exception e)
{
// TODO: where does the continuation run?
oldTcs.SetResult(null);
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
public void OnFinishAccepted(GRPCOpError error)
private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr)
{
lock (myLock)
try
{
UpdateErrorOccured(error);
halfclosed = true;
}
TaskCompletionSource<object> oldTcs = null;
lock (myLock)
{
oldTcs = writeTcs;
writeTcs = null;
}
if (errorOccured)
{
halfcloseTcs.SetException(new Exception("Halfclose failed"));
if (errorOccured)
{
// TODO: use the right type of exception...
oldTcs.SetException(new Exception("Write failed"));
}
else
{
// TODO: where does the continuation run?
oldTcs.SetResult(null);
}
}
else
catch(Exception e)
{
halfcloseTcs.SetResult(null);
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
public void OnFinished(Status status)
private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr)
{
lock (myLock)
try
{
finishedStatus = status;
lock (myLock)
{
halfclosed = true;
DisposeResourcesIfNeeded();
}
finishedTcs.SetResult(status);
ReleaseResourcesIfPossible();
}
}
if (error != GRPCOpError.GRPC_OP_OK)
{
halfcloseTcs.SetException(new Exception("Halfclose failed"));
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
else
{
halfcloseTcs.SetResult(null);
}
}
catch(Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
protected virtual void Dispose(bool disposing)
private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr)
{
if (!disposed)
try
{
if (disposing)
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
var payload = ctx.GetReceivedMessage();
TaskCompletionSource<TRead> oldTcs = null;
IObserver<TRead> observer = null;
Nullable<Status> status = null;
lock (myLock)
{
if (call != null)
oldTcs = readTcs;
readTcs = null;
if (payload == null)
{
call.Dispose();
readingDone = true;
}
observer = readObserver;
status = finishedStatus;
}
disposed = true;
}
}
private void UpdateErrorOccured(GRPCOpError error)
{
if (error == GRPCOpError.GRPC_OP_ERROR)
{
errorOccured = true;
}
}
// TODO: wrap deserialization...
TRead msg = payload != null ? deserializer(payload) : default(TRead);
private void CheckStarted()
{
if (!started)
{
throw new InvalidOperationException("Call not started");
}
}
oldTcs.SetResult(msg);
private void CheckNoError()
{
if (errorOccured)
// TODO: make sure we deliver reads in the right order.
if (observer != null)
{
if (payload != null)
{
// TODO: wrap to handle exceptions
observer.OnNext(msg);
// start a new read
ReceiveMessageAsync();
}
else
{
if (!server)
{
if (status.HasValue)
{
CompleteStreamObserver(status.Value);
}
}
else
{
// TODO: wrap to handle exceptions..
observer.OnCompleted();
}
// TODO: completeStreamObserver serverside...
}
}
}
catch(Exception e)
{
throw new InvalidOperationException("Error occured when processing call.");
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private void CheckNotFinished()
private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr)
{
if (finishedStatus.HasValue)
try
{
throw new InvalidOperationException("Already finished.");
}
}
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
var status = ctx.GetReceivedStatus();
private void CheckCancelNotRequested()
{
if (cancelRequested)
bool wasReadingDone;
lock (myLock)
{
finished = true;
finishedStatus = status;
wasReadingDone = readingDone;
ReleaseResourcesIfPossible();
}
if (wasReadingDone) {
CompleteStreamObserver(status);
}
}
catch(Exception e)
{
throw new InvalidOperationException("Cancel has been requested.");
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private void DisposeResourcesIfNeeded()
private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr)
{
if (call != null && started && finishedStatus.HasValue)
try
{
// TODO: should we also wait for all the pending events to finish?
call.Dispose();
}
}
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
private void HandleEvent(IntPtr eventPtr) {
try {
var ev = new EventSafeHandleNotOwned(eventPtr);
switch (ev.GetCompletionType())
lock(myLock)
{
case GRPCCompletionType.GRPC_CLIENT_METADATA_READ:
OnClientMetadata();
break;
case GRPCCompletionType.GRPC_READ:
byte[] payload = ev.GetReadData();
OnRead(payload);
break;
finished = true;
case GRPCCompletionType.GRPC_WRITE_ACCEPTED:
OnWriteAccepted(ev.GetWriteAccepted());
break;
// TODO: because of the way server calls are implemented, we need to set
// reading done to true here. Should be fixed in the future.
readingDone = true;
case GRPCCompletionType.GRPC_FINISH_ACCEPTED:
OnFinishAccepted(ev.GetFinishAccepted());
break;
ReleaseResourcesIfPossible();
}
// TODO: handle error ...
case GRPCCompletionType.GRPC_FINISHED:
OnFinished(ev.GetFinished());
break;
finishedServersideTcs.SetResult(null);
default:
throw new ArgumentException("Unexpected completion type");
}
} catch(Exception e) {
}
catch(Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
}
}
}

@ -0,0 +1,96 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using Google.GRPC.Core;
namespace Google.GRPC.Core.Internal
{
/// <summary>
/// Not owned version of
/// grpcsharp_batch_context
/// </summary>
internal class BatchContextSafeHandleNotOwned : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandleNotOwned ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandleNotOwned ctx, byte[] buffer, UIntPtr bufferLen);
[DllImport("grpc_csharp_ext.dll")]
static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandleNotOwned ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandleNotOwned ctx); // returns const char*
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandleNotOwned ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char*
public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false)
{
SetHandle(handle);
}
public Status GetReceivedStatus()
{
// TODO: can the native method return string directly?
string details = Marshal.PtrToStringAnsi(grpcsharp_batch_context_recv_status_on_client_details(this));
return new Status(grpcsharp_batch_context_recv_status_on_client_status(this), details);
}
public byte[] GetReceivedMessage()
{
IntPtr len = grpcsharp_batch_context_recv_message_length(this);
if (len == new IntPtr(-1))
{
return null;
}
byte[] data = new byte[(int) len];
grpcsharp_batch_context_recv_message_to_buffer(this, data, new UIntPtr((ulong)data.Length));
return data;
}
public CallSafeHandle GetServerRpcNewCall() {
return grpcsharp_batch_context_server_rpc_new_call(this);
}
public string GetServerRpcNewMethod() {
return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this));
}
}
}

@ -2,11 +2,11 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
@ -16,7 +16,7 @@
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
@ -38,8 +38,8 @@ using Google.GRPC.Core;
namespace Google.GRPC.Core.Internal
{
// TODO: we need to make sure that the delegates are not collected before invoked.
internal delegate void EventCallbackDelegate(IntPtr eventPtr);
//TODO: rename the delegate
internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr);
/// <summary>
/// grpc_call from <grpc/grpc.h>
@ -49,142 +49,108 @@ namespace Google.GRPC.Core.Internal
const UInt32 GRPC_WRITE_BUFFER_HINT = 1;
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_channel_create_call_old(ChannelSafeHandle channel, string method, string host, Timespec deadline);
static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_add_metadata(CallSafeHandle call, IntPtr metadata, UInt32 flags);
static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_invoke_old(CallSafeHandle call, CompletionQueueSafeHandle cq, IntPtr metadataReadTag, IntPtr finishedTag, UInt32 flags);
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_invoke_old")]
static extern GRPCCallError grpcsharp_call_invoke_old_CALLBACK(CallSafeHandle call, CompletionQueueSafeHandle cq,
[MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate metadataReadCallback,
[MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate finishedCallback,
UInt32 flags);
static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_server_accept_old(CallSafeHandle call, CompletionQueueSafeHandle completionQueue, IntPtr finishedTag);
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_server_accept_old")]
static extern GRPCCallError grpcsharp_call_server_accept_old_CALLBACK(CallSafeHandle call, CompletionQueueSafeHandle completionQueue, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate finishedCallback);
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_server_end_initial_metadata_old(CallSafeHandle call, UInt32 flags);
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description);
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_write_status_old(CallSafeHandle call, StatusCode statusCode, string statusMessage, IntPtr tag);
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_write_status_old")]
static extern GRPCCallError grpcsharp_call_start_write_status_old_CALLBACK(CallSafeHandle call, StatusCode statusCode, string statusMessage, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_writes_done_old(CallSafeHandle call, IntPtr tag);
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_writes_done_old")]
static extern GRPCCallError grpcsharp_call_writes_done_old_CALLBACK(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_read_old(CallSafeHandle call, IntPtr tag);
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_read_old")]
static extern GRPCCallError grpcsharp_call_start_read_old_CALLBACK(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_start_write_from_copied_buffer(CallSafeHandle call,
byte[] buffer, UIntPtr length,
IntPtr tag, UInt32 flags);
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_write_from_copied_buffer")]
static extern void grpcsharp_call_start_write_from_copied_buffer_CALLBACK(CallSafeHandle call,
byte[] buffer, UIntPtr length,
[MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback,
UInt32 flags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_destroy(IntPtr call);
private CallSafeHandle()
{
}
/// <summary>
/// Creates a client call.
/// </summary>
public static CallSafeHandle Create(ChannelSafeHandle channel, string method, string host, Timespec deadline)
{
return grpcsharp_channel_create_call_old(channel, method, host, deadline);
}
public void Invoke(CompletionQueueSafeHandle cq, IntPtr metadataReadTag, IntPtr finishedTag, bool buffered)
{
AssertCallOk(grpcsharp_call_invoke_old(this, cq, metadataReadTag, finishedTag, GetFlags(buffered)));
}
public void Invoke(CompletionQueueSafeHandle cq, bool buffered, EventCallbackDelegate metadataReadCallback, EventCallbackDelegate finishedCallback)
{
AssertCallOk(grpcsharp_call_invoke_old_CALLBACK(this, cq, metadataReadCallback, finishedCallback, GetFlags(buffered)));
}
public void ServerAccept(CompletionQueueSafeHandle cq, IntPtr finishedTag)
private CallSafeHandle()
{
AssertCallOk(grpcsharp_call_server_accept_old(this, cq, finishedTag));
}
public void ServerAccept(CompletionQueueSafeHandle cq, EventCallbackDelegate callback)
public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
AssertCallOk(grpcsharp_call_server_accept_old_CALLBACK(this, cq, callback));
return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
}
public void ServerEndInitialMetadata(UInt32 flags)
public void StartUnary(byte[] payload, CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_server_end_initial_metadata_old(this, flags));
AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length)));
}
public void StartWrite(byte[] payload, IntPtr tag, bool buffered)
public void StartClientStreaming(CompletionCallbackDelegate callback)
{
grpcsharp_call_start_write_from_copied_buffer(this, payload, new UIntPtr((ulong) payload.Length), tag, GetFlags(buffered));
AssertCallOk(grpcsharp_call_start_client_streaming(this, callback));
}
public void StartWrite(byte[] payload, bool buffered, EventCallbackDelegate callback)
public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback)
{
grpcsharp_call_start_write_from_copied_buffer_CALLBACK(this, payload, new UIntPtr((ulong) payload.Length), callback, GetFlags(buffered));
AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong) payload.Length)));
}
public void StartWriteStatus(Status status, IntPtr tag)
public void StartDuplexStreaming(CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_start_write_status_old(this, status.StatusCode, status.Detail, tag));
AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback));
}
public void StartWriteStatus(Status status, EventCallbackDelegate callback)
public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_start_write_status_old_CALLBACK(this, status.StatusCode, status.Detail, callback));
AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong) payload.Length)));
}
public void WritesDone(IntPtr tag)
public void StartSendCloseFromClient(CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_writes_done_old(this, tag));
AssertCallOk(grpcsharp_call_send_close_from_client(this, callback));
}
public void WritesDone(EventCallbackDelegate callback)
public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_writes_done_old_CALLBACK(this, callback));
AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail));
}
public void StartRead(IntPtr tag)
public void StartReceiveMessage(CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_start_read_old(this, tag));
AssertCallOk(grpcsharp_call_recv_message(this, callback));
}
public void StartRead(EventCallbackDelegate callback)
public void StartServerSide(CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_start_read_old_CALLBACK(this, callback));
AssertCallOk(grpcsharp_call_start_serverside(this, callback));
}
public void Cancel()
@ -212,4 +178,4 @@ namespace Google.GRPC.Core.Internal
return buffered ? 0 : GRPC_WRITE_BUFFER_HINT;
}
}
}
}

@ -2,11 +2,11 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
@ -16,7 +16,7 @@
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
@ -36,19 +36,20 @@ using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core.Internal
{
internal class StreamingInputObserver<TWrite, TRead> : IObserver<TWrite>
internal class ClientStreamingInputObserver<TWrite, TRead> : IObserver<TWrite>
{
readonly AsyncCall<TWrite, TRead> call;
public StreamingInputObserver(AsyncCall<TWrite, TRead> call)
public ClientStreamingInputObserver(AsyncCall<TWrite, TRead> call)
{
this.call = call;
}
public void OnCompleted()
{
// TODO: how bad is the Wait here?
call.WritesCompletedAsync().Wait();
call.SendCloseFromClientAsync().Wait();
}
public void OnError(Exception error)
@ -59,7 +60,7 @@ namespace Google.GRPC.Core.Internal
public void OnNext(TWrite value)
{
// TODO: how bad is the Wait here?
call.WriteAsync(value).Wait();
call.SendMessageAsync(value).Wait();
}
}
}

@ -45,12 +45,6 @@ namespace Google.GRPC.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern CompletionQueueSafeHandle grpcsharp_completion_queue_create();
[DllImport("grpc_csharp_ext.dll")]
static extern EventSafeHandle grpcsharp_completion_queue_pluck(CompletionQueueSafeHandle cq, IntPtr tag, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
static extern EventSafeHandle grpcsharp_completion_queue_next(CompletionQueueSafeHandle cq, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_completion_queue_shutdown(CompletionQueueSafeHandle cq);
@ -69,21 +63,11 @@ namespace Google.GRPC.Core.Internal
return grpcsharp_completion_queue_create();
}
public EventSafeHandle Next(Timespec deadline)
{
return grpcsharp_completion_queue_next(this, deadline);
}
public GRPCCompletionType NextWithCallback()
{
return grpcsharp_completion_queue_next_with_callback(this);
}
public EventSafeHandle Pluck(IntPtr tag, Timespec deadline)
{
return grpcsharp_completion_queue_pluck(this, tag, deadline);
}
public void Shutdown()
{
grpcsharp_completion_queue_shutdown(this);

@ -1,224 +0,0 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using Google.GRPC.Core;
namespace Google.GRPC.Core.Internal
{
/// <summary>
/// grpc_event from grpc/grpc.h
/// </summary>
internal class EventSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_event_finish(IntPtr ev);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCompletionType grpcsharp_event_type(EventSafeHandle ev);
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_event_call(EventSafeHandle ev);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCOpError grpcsharp_event_write_accepted(EventSafeHandle ev);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCOpError grpcsharp_event_finish_accepted(EventSafeHandle ev);
[DllImport("grpc_csharp_ext.dll")]
static extern StatusCode grpcsharp_event_finished_status(EventSafeHandle ev);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_event_finished_details(EventSafeHandle ev); // returns const char*
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_event_read_length(EventSafeHandle ev);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_event_read_copy_to_buffer(EventSafeHandle ev, byte[] buffer, UIntPtr bufferLen);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_event_server_rpc_new_method(EventSafeHandle ev); // returns const char*
public GRPCCompletionType GetCompletionType()
{
return grpcsharp_event_type(this);
}
public GRPCOpError GetWriteAccepted()
{
return grpcsharp_event_write_accepted(this);
}
public GRPCOpError GetFinishAccepted()
{
return grpcsharp_event_finish_accepted(this);
}
public Status GetFinished()
{
// TODO: can the native method return string directly?
string details = Marshal.PtrToStringAnsi(grpcsharp_event_finished_details(this));
return new Status(grpcsharp_event_finished_status(this), details);
}
public byte[] GetReadData()
{
IntPtr len = grpcsharp_event_read_length(this);
if (len == new IntPtr(-1))
{
return null;
}
byte[] data = new byte[(int) len];
grpcsharp_event_read_copy_to_buffer(this, data, new UIntPtr((ulong)data.Length));
return data;
}
public CallSafeHandle GetCall() {
return grpcsharp_event_call(this);
}
public string GetServerRpcNewMethod() {
// TODO: can the native method return string directly?
return Marshal.PtrToStringAnsi(grpcsharp_event_server_rpc_new_method(this));
}
//TODO: client_metadata_read event type
protected override bool ReleaseHandle()
{
grpcsharp_event_finish(handle);
return true;
}
}
// TODO: this is basically c&p of EventSafeHandle. Unify!
/// <summary>
/// Not owned version of
/// grpc_event from grpc/grpc.h
/// </summary>
internal class EventSafeHandleNotOwned : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_event_finish(IntPtr ev);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCompletionType grpcsharp_event_type(EventSafeHandleNotOwned ev);
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_event_call(EventSafeHandleNotOwned ev);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCOpError grpcsharp_event_write_accepted(EventSafeHandleNotOwned ev);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCOpError grpcsharp_event_finish_accepted(EventSafeHandleNotOwned ev);
[DllImport("grpc_csharp_ext.dll")]
static extern StatusCode grpcsharp_event_finished_status(EventSafeHandleNotOwned ev);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_event_finished_details(EventSafeHandleNotOwned ev); // returns const char*
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_event_read_length(EventSafeHandleNotOwned ev);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_event_read_copy_to_buffer(EventSafeHandleNotOwned ev, byte[] buffer, UIntPtr bufferLen);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_event_server_rpc_new_method(EventSafeHandleNotOwned ev); // returns const char*
public EventSafeHandleNotOwned() : base(false)
{
}
public EventSafeHandleNotOwned(IntPtr handle) : base(false)
{
SetHandle(handle);
}
public GRPCCompletionType GetCompletionType()
{
return grpcsharp_event_type(this);
}
public GRPCOpError GetWriteAccepted()
{
return grpcsharp_event_write_accepted(this);
}
public GRPCOpError GetFinishAccepted()
{
return grpcsharp_event_finish_accepted(this);
}
public Status GetFinished()
{
// TODO: can the native method return string directly?
string details = Marshal.PtrToStringAnsi(grpcsharp_event_finished_details(this));
return new Status(grpcsharp_event_finished_status(this), details);
}
public byte[] GetReadData()
{
IntPtr len = grpcsharp_event_read_length(this);
if (len == new IntPtr(-1))
{
return null;
}
byte[] data = new byte[(int) len];
grpcsharp_event_read_copy_to_buffer(this, data, new UIntPtr((ulong)data.Length));
return data;
}
public CallSafeHandle GetCall() {
return grpcsharp_event_call(this);
}
public string GetServerRpcNewMethod() {
// TODO: can the native method return string directly?
return Marshal.PtrToStringAnsi(grpcsharp_event_server_rpc_new_method(this));
}
//TODO: client_metadata_read event type
protected override bool ReleaseHandle()
{
grpcsharp_event_finish(handle);
return true;
}
}
}

@ -48,7 +48,6 @@ namespace Google.GRPC.Core.Internal
readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>();
readonly int poolSize;
readonly Action<EventSafeHandle> eventHandler;
CompletionQueueSafeHandle cq;
@ -56,11 +55,6 @@ namespace Google.GRPC.Core.Internal
this.poolSize = poolSize;
}
internal GrpcThreadPool(int poolSize, Action<EventSafeHandle> eventHandler) {
this.poolSize = poolSize;
this.eventHandler = eventHandler;
}
public void Start() {
lock (myLock)
@ -104,34 +98,19 @@ namespace Google.GRPC.Core.Internal
}
}
private Thread CreateAndStartThread(int i) {
Action body;
if (eventHandler != null)
{
body = ThreadBodyWithHandler;
}
else
{
body = ThreadBodyNoHandler;
}
var thread = new Thread(new ThreadStart(body));
private Thread CreateAndStartThread(int i)
{
var thread = new Thread(new ThreadStart(RunHandlerLoop));
thread.IsBackground = false;
thread.Start();
if (eventHandler != null)
{
thread.Name = "grpc_server_newrpc " + i;
}
else
{
thread.Name = "grpc " + i;
}
thread.Name = "grpc " + i;
return thread;
}
/// <summary>
/// Body of the polling thread.
/// </summary>
private void ThreadBodyNoHandler()
private void RunHandlerLoop()
{
GRPCCompletionType completionType;
do
@ -140,22 +119,6 @@ namespace Google.GRPC.Core.Internal
} while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN);
Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting.");
}
/// <summary>
/// Body of the polling thread.
/// </summary>
private void ThreadBodyWithHandler()
{
GRPCCompletionType completionType;
do
{
using (EventSafeHandle ev = cq.Next(Timespec.InfFuture)) {
completionType = ev.GetCompletionType();
eventHandler(ev);
}
} while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN);
Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting.");
}
}
}

@ -56,6 +56,12 @@ namespace Google.GRPC.Core.Internal
return handle == IntPtr.Zero;
}
}
protected override bool ReleaseHandle()
{
// handle is not owned.
return true;
}
}
}

@ -38,24 +38,22 @@ using System.Collections.Concurrent;
namespace Google.GRPC.Core.Internal
{
// TODO: we need to make sure that the delegates are not collected before invoked.
internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr);
/// <summary>
/// grpc_server from grpc/grpc.h
/// </summary>
internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_request_call_old")]
static extern GRPCCallError grpcsharp_server_request_call_old_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
// TODO: check int representation size
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr);
static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
// TODO: check int representation size
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_server_add_secure_http2_port(ServerSafeHandle server, string addr);
static extern Int32 grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_start(ServerSafeHandle server);
@ -63,8 +61,9 @@ namespace Google.GRPC.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_shutdown(ServerSafeHandle server);
// TODO: get rid of the old callback style
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_shutdown_and_notify")]
static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_destroy(IntPtr server);
@ -81,7 +80,6 @@ namespace Google.GRPC.Core.Internal
public int AddPort(string addr)
{
// TODO: also grpc_server_add_secure_http2_port...
return grpcsharp_server_add_http2_port(this, addr);
}
@ -95,14 +93,14 @@ namespace Google.GRPC.Core.Internal
grpcsharp_server_shutdown(this);
}
public void ShutdownAndNotify(EventCallbackDelegate callback)
public void ShutdownAndNotify(ServerShutdownCallbackDelegate callback)
{
grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback);
}
public GRPCCallError RequestCall(EventCallbackDelegate callback)
public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
{
return grpcsharp_server_request_call_old_CALLBACK(this, callback);
return grpcsharp_server_request_call(this, cq, callback);
}
protected override bool ReleaseHandle()

@ -40,11 +40,11 @@ namespace Google.GRPC.Core.Internal
/// Observer that writes all arriving messages to a call abstraction (in blocking fashion)
/// and then halfcloses the call. Used for server-side call handling.
/// </summary>
internal class ServerWritingObserver<TWrite, TRead> : IObserver<TWrite>
internal class ServerStreamingOutputObserver<TWrite, TRead> : IObserver<TWrite>
{
readonly AsyncCall<TWrite, TRead> call;
public ServerWritingObserver(AsyncCall<TWrite, TRead> call)
public ServerStreamingOutputObserver(AsyncCall<TWrite, TRead> call)
{
this.call = call;
}
@ -52,19 +52,19 @@ namespace Google.GRPC.Core.Internal
public void OnCompleted()
{
// TODO: how bad is the Wait here?
call.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
call.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
}
public void OnError(Exception error)
{
// TODO: handle this...
// TODO: implement this...
throw new InvalidOperationException("This should never be called.");
}
public void OnNext(TWrite value)
{
// TODO: how bad is the Wait here?
call.WriteAsync(value).Wait();
call.SendMessageAsync(value).Wait();
}
}
}

@ -49,8 +49,8 @@ namespace Google.GRPC.Core
{
// TODO: make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly EventCallbackDelegate newRpcHandler;
readonly EventCallbackDelegate serverShutdownHandler;
readonly ServerShutdownCallbackDelegate serverShutdownHandler;
readonly CompletionCallbackDelegate newServerRpcHandler;
readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>();
readonly ServerSafeHandle handle;
@ -61,9 +61,8 @@ namespace Google.GRPC.Core
public Server()
{
// TODO: what is the tag for server shutdown?
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
this.newRpcHandler = HandleNewRpc;
this.newServerRpcHandler = HandleNewServerRpc;
this.serverShutdownHandler = HandleServerShutdown;
}
@ -99,7 +98,7 @@ namespace Google.GRPC.Core
{
var rpcInfo = newRpcQueue.Take();
Console.WriteLine("Server received RPC " + rpcInfo.Method);
//Console.WriteLine("Server received RPC " + rpcInfo.Method);
IServerCallHandler callHandler;
if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler))
@ -138,23 +137,25 @@ namespace Google.GRPC.Core
private void AllowOneRpc()
{
AssertCallOk(handle.RequestCall(newRpcHandler));
AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler));
}
private void HandleNewRpc(IntPtr eventPtr)
{
try
{
var ev = new EventSafeHandleNotOwned(eventPtr);
var rpcInfo = new NewRpcInfo(ev.GetCall(), ev.GetServerRpcNewMethod());
private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) {
try {
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
if (error != GRPCOpError.GRPC_OP_OK) {
// TODO: handle error
}
var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod());
// after server shutdown, the callback returns with null call
if (!rpcInfo.Call.IsInvalid) {
newRpcQueue.Add(rpcInfo);
}
}
catch (Exception e)
{
} catch(Exception e) {
Console.WriteLine("Caught exception in a native handler: " + e);
}
}

@ -59,15 +59,16 @@ namespace Google.GRPC.Core
method.RequestMarshaller.Deserializer);
asyncCall.InitializeServer(call);
asyncCall.Accept(cq);
var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync();
var request = asyncCall.ReadAsync().Result;
var request = asyncCall.ReceiveMessageAsync().Result;
var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall);
var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall);
handler(request, responseObserver);
asyncCall.Halfclosed.Wait();
asyncCall.Finished.Wait();
finishedTask.Wait();
}
}
@ -89,16 +90,11 @@ namespace Google.GRPC.Core
method.RequestMarshaller.Deserializer);
asyncCall.InitializeServer(call);
asyncCall.Accept(cq);
var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall);
var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall);
var requestObserver = handler(responseObserver);
// feed the requests
asyncCall.StartReadingToStream(requestObserver);
asyncCall.Halfclosed.Wait();
asyncCall.Finished.Wait();
var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(requestObserver);
finishedTask.Wait();
}
}
@ -110,12 +106,31 @@ namespace Google.GRPC.Core
AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>(
(payload) => payload, (payload) => payload);
asyncCall.InitializeServer(call);
asyncCall.Accept(cq);
asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait();
asyncCall.Finished.Wait();
var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver<byte[]>());
asyncCall.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait();
finishedTask.Wait();
}
}
internal class NullObserver<T> : IObserver<T>
{
public void OnCompleted()
{
}
public void OnError(Exception error)
{
}
public void OnNext(T value)
{
}
}
}

@ -36,6 +36,7 @@ using NUnit.Framework;
using Google.GRPC.Core;
using Google.GRPC.Core.Internal;
using System.Threading;
using System.Diagnostics;
using System.Threading.Tasks;
using Google.GRPC.Core.Utils;
@ -51,11 +52,21 @@ namespace Google.GRPC.Core.Tests
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
[Test]
public void EmptyCall()
[TestFixtureSetUp]
public void Init()
{
GrpcEnvironment.Initialize();
}
[TestFixtureTearDown]
public void Cleanup()
{
GrpcEnvironment.Shutdown();
}
[Test]
public void UnaryCall()
{
Server server = new Server();
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService")
@ -69,19 +80,71 @@ namespace Google.GRPC.Core.Tests
var call = new Call<string, string>(unaryEchoStringMethod, channel);
Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)));
Assert.AreEqual("abcdef", Calls.BlockingUnaryCall(call, "abcdef", default(CancellationToken)));
}
server.ShutdownAsync().Wait();
}
GrpcEnvironment.Shutdown();
[Test]
public void UnaryCallPerformance()
{
Server server = new Server();
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService")
.AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build());
int port = server.AddPort(host + ":0");
server.Start();
using (Channel channel = new Channel(host + ":" + port))
{
var call = new Call<string, string>(unaryEchoStringMethod, channel);
var stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = 0; i < 1000; i++)
{
Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken));
}
stopwatch.Stop();
Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms");
}
server.ShutdownAsync().Wait();
}
private void HandleUnaryEchoString(string request, IObserver<string> responseObserver) {
[Test]
public void UnknownMethodHandler()
{
Server server = new Server();
server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService").Build());
int port = server.AddPort(host + ":0");
server.Start();
using (Channel channel = new Channel(host + ":" + port))
{
var call = new Call<string, string>(unaryEchoStringMethod, channel);
try {
Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken));
Assert.Fail();
} catch(RpcException e) {
Assert.AreEqual(StatusCode.GRPC_STATUS_UNIMPLEMENTED, e.Status.StatusCode);
}
}
server.ShutdownAsync().Wait();
}
private void HandleUnaryEchoString(string request, IObserver<string> responseObserver)
{
responseObserver.OnNext(request);
responseObserver.OnCompleted();
}
}
}

@ -32,9 +32,11 @@
*/
#include <grpc/support/port_platform.h>
#include <grpc/support/alloc.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice.h>
#include <grpc/support/string.h>
#include <string.h>
@ -58,6 +60,139 @@ grpc_byte_buffer *string_to_byte_buffer(const char *buffer, size_t len) {
return bb;
}
typedef void(GPR_CALLTYPE *callback_funcptr)(grpc_op_error op_error,
void *batch_context);
/*
* Helper to maintain lifetime of batch op inputs and store batch op outputs.
*/
typedef struct gprcsharp_batch_context {
grpc_metadata_array send_initial_metadata;
grpc_byte_buffer *send_message;
struct {
grpc_metadata_array trailing_metadata;
char *status_details;
} send_status_from_server;
grpc_metadata_array recv_initial_metadata;
grpc_byte_buffer *recv_message;
struct {
grpc_metadata_array trailing_metadata;
grpc_status_code status;
char *status_details;
size_t status_details_capacity;
} recv_status_on_client;
int recv_close_on_server_cancelled;
struct {
grpc_call *call;
grpc_call_details call_details;
grpc_metadata_array request_metadata;
} server_rpc_new;
/* callback will be called upon completion */
callback_funcptr callback;
} grpcsharp_batch_context;
grpcsharp_batch_context *grpcsharp_batch_context_create() {
grpcsharp_batch_context *ctx = gpr_malloc(sizeof(grpcsharp_batch_context));
memset(ctx, 0, sizeof(grpcsharp_batch_context));
return ctx;
}
/**
* Destroys metadata array including keys and values.
*/
void grpcsharp_metadata_array_destroy_recursive(grpc_metadata_array *array) {
if (!array->metadata) {
return;
}
/* TODO: destroy also keys and values */
grpc_metadata_array_destroy(array);
}
void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) {
if (!ctx) {
return;
}
grpcsharp_metadata_array_destroy_recursive(&(ctx->send_initial_metadata));
grpc_byte_buffer_destroy(ctx->send_message);
grpcsharp_metadata_array_destroy_recursive(
&(ctx->send_status_from_server.trailing_metadata));
gpr_free(ctx->send_status_from_server.status_details);
grpc_metadata_array_destroy(&(ctx->recv_initial_metadata));
grpc_byte_buffer_destroy(ctx->recv_message);
grpc_metadata_array_destroy(&(ctx->recv_status_on_client.trailing_metadata));
gpr_free((void *)ctx->recv_status_on_client.status_details);
/* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is
supposed
to take its ownership. */
grpc_call_details_destroy(&(ctx->server_rpc_new.call_details));
grpc_metadata_array_destroy(&(ctx->server_rpc_new.request_metadata));
gpr_free(ctx);
}
GPR_EXPORT gpr_intptr GPR_CALLTYPE grpcsharp_batch_context_recv_message_length(
const grpcsharp_batch_context *ctx) {
if (!ctx->recv_message) {
return -1;
}
return grpc_byte_buffer_length(ctx->recv_message);
}
/*
* Copies data from recv_message to a buffer. Fatal error occurs if
* buffer is too small.
*/
GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_recv_message_to_buffer(
const grpcsharp_batch_context *ctx, char *buffer, size_t buffer_len) {
grpc_byte_buffer_reader *reader;
gpr_slice slice;
size_t offset = 0;
reader = grpc_byte_buffer_reader_create(ctx->recv_message);
while (grpc_byte_buffer_reader_next(reader, &slice)) {
size_t len = GPR_SLICE_LENGTH(slice);
GPR_ASSERT(offset + len <= buffer_len);
memcpy(buffer + offset, GPR_SLICE_START_PTR(slice),
GPR_SLICE_LENGTH(slice));
offset += len;
gpr_slice_unref(slice);
}
grpc_byte_buffer_reader_destroy(reader);
}
GPR_EXPORT grpc_status_code GPR_CALLTYPE
grpcsharp_batch_context_recv_status_on_client_status(
const grpcsharp_batch_context *ctx) {
return ctx->recv_status_on_client.status;
}
GPR_EXPORT const char *GPR_CALLTYPE
grpcsharp_batch_context_recv_status_on_client_details(
const grpcsharp_batch_context *ctx) {
return ctx->recv_status_on_client.status_details;
}
GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_batch_context_server_rpc_new_call(
const grpcsharp_batch_context *ctx) {
return ctx->server_rpc_new.call;
}
GPR_EXPORT const char *GPR_CALLTYPE
grpcsharp_batch_context_server_rpc_new_method(
const grpcsharp_batch_context *ctx) {
return ctx->server_rpc_new.call_details.method;
}
/* Init & shutdown */
GPR_EXPORT void GPR_CALLTYPE grpcsharp_init(void) { grpc_init(); }
@ -71,18 +206,6 @@ grpcsharp_completion_queue_create(void) {
return grpc_completion_queue_create();
}
GPR_EXPORT grpc_event *GPR_CALLTYPE
grpcsharp_completion_queue_next(grpc_completion_queue *cq,
gpr_timespec deadline) {
return grpc_completion_queue_next(cq, deadline);
}
GPR_EXPORT grpc_event *GPR_CALLTYPE
grpcsharp_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline) {
return grpc_completion_queue_pluck(cq, tag, deadline);
}
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_completion_queue_shutdown(grpc_completion_queue *cq) {
grpc_completion_queue_shutdown(cq);
@ -96,12 +219,18 @@ grpcsharp_completion_queue_destroy(grpc_completion_queue *cq) {
GPR_EXPORT grpc_completion_type GPR_CALLTYPE
grpcsharp_completion_queue_next_with_callback(grpc_completion_queue *cq) {
grpc_event *ev;
grpcsharp_batch_context *batch_context;
grpc_completion_type t;
void(GPR_CALLTYPE * callback)(grpc_event *);
ev = grpc_completion_queue_next(cq, gpr_inf_future);
t = ev->type;
if (ev->tag) {
if (t == GRPC_OP_COMPLETE && ev->tag) {
/* NEW API handler */
batch_context = (grpcsharp_batch_context *)ev->tag;
batch_context->callback(ev->data.op_complete, batch_context);
grpcsharp_batch_context_destroy(batch_context);
} else if (ev->tag) {
/* call the callback in ev->tag */
/* C forbids to cast object pointers to function pointers, so
* we cast to intptr first.
@ -129,195 +258,282 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_destroy(grpc_channel *channel) {
}
GPR_EXPORT grpc_call *GPR_CALLTYPE
grpcsharp_channel_create_call_old(grpc_channel *channel, const char *method,
const char *host, gpr_timespec deadline) {
return grpc_channel_create_call_old(channel, method, host, deadline);
grpcsharp_channel_create_call(grpc_channel *channel, grpc_completion_queue *cq,
const char *method, const char *host,
gpr_timespec deadline) {
return grpc_channel_create_call(channel, cq, method, host, deadline);
}
/* Event */
/* Timespec */
GPR_EXPORT void GPR_CALLTYPE grpcsharp_event_finish(grpc_event *event) {
grpc_event_finish(event);
}
GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_now(void) { return gpr_now(); }
GPR_EXPORT grpc_completion_type GPR_CALLTYPE
grpcsharp_event_type(const grpc_event *event) {
return event->type;
GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_inf_future(void) {
return gpr_inf_future;
}
GPR_EXPORT grpc_op_error GPR_CALLTYPE
grpcsharp_event_write_accepted(const grpc_event *event) {
GPR_ASSERT(event->type == GRPC_WRITE_ACCEPTED);
return event->data.invoke_accepted;
GPR_EXPORT gpr_int32 GPR_CALLTYPE gprsharp_sizeof_timespec(void) {
return sizeof(gpr_timespec);
}
GPR_EXPORT grpc_op_error GPR_CALLTYPE
grpcsharp_event_finish_accepted(const grpc_event *event) {
GPR_ASSERT(event->type == GRPC_FINISH_ACCEPTED);
return event->data.finish_accepted;
}
/* Call */
GPR_EXPORT grpc_status_code GPR_CALLTYPE
grpcsharp_event_finished_status(const grpc_event *event) {
GPR_ASSERT(event->type == GRPC_FINISHED);
return event->data.finished.status;
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_cancel(grpc_call *call) {
return grpc_call_cancel(call);
}
GPR_EXPORT const char *GPR_CALLTYPE
grpcsharp_event_finished_details(const grpc_event *event) {
GPR_ASSERT(event->type == GRPC_FINISHED);
return event->data.finished.details;
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_cancel_with_status(grpc_call *call, grpc_status_code status,
const char *description) {
return grpc_call_cancel_with_status(call, status, description);
}
GPR_EXPORT gpr_intptr GPR_CALLTYPE
grpcsharp_event_read_length(const grpc_event *event) {
GPR_ASSERT(event->type == GRPC_READ);
if (!event->data.read) {
return -1;
}
return grpc_byte_buffer_length(event->data.read);
GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
grpc_call_destroy(call);
}
/*
* Copies data from read event to a buffer. Fatal error occurs if
* buffer is too small.
*/
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_event_read_copy_to_buffer(const grpc_event *event, char *buffer,
size_t buffer_len) {
grpc_byte_buffer_reader *reader;
gpr_slice slice;
size_t offset = 0;
grpcsharp_call_start_write_from_copied_buffer(grpc_call *call,
const char *buffer, size_t len,
void *tag, gpr_uint32 flags) {
grpc_byte_buffer *byte_buffer = string_to_byte_buffer(buffer, len);
GPR_ASSERT(grpc_call_start_write_old(call, byte_buffer, tag, flags) ==
GRPC_CALL_OK);
grpc_byte_buffer_destroy(byte_buffer);
}
GPR_ASSERT(event->type == GRPC_READ);
reader = grpc_byte_buffer_reader_create(event->data.read);
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
const char *send_buffer, size_t send_buffer_len) {
/* TODO: don't use magic number */
grpc_op ops[6];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
GPR_ASSERT(event->data.read);
while (grpc_byte_buffer_reader_next(reader, &slice)) {
size_t len = GPR_SLICE_LENGTH(slice);
GPR_ASSERT(offset + len <= buffer_len);
memcpy(buffer + offset, GPR_SLICE_START_PTR(slice),
GPR_SLICE_LENGTH(slice));
offset += len;
gpr_slice_unref(slice);
}
grpc_byte_buffer_reader_destroy(reader);
}
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
GPR_EXPORT grpc_call *GPR_CALLTYPE
grpcsharp_event_call(const grpc_event *event) {
/* we only allow this for newly incoming server calls. */
GPR_ASSERT(event->type == GRPC_SERVER_RPC_NEW);
return event->call;
}
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
GPR_EXPORT const char *GPR_CALLTYPE
grpcsharp_event_server_rpc_new_method(const grpc_event *event) {
GPR_ASSERT(event->type == GRPC_SERVER_RPC_NEW);
return event->data.server_rpc_new.method;
}
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
/* Timespec */
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_now(void) { return gpr_now(); }
ops[4].op = GRPC_OP_RECV_MESSAGE;
ops[4].data.recv_message = &(ctx->recv_message);
GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_inf_future(void) {
return gpr_inf_future;
}
ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[5].data.recv_status_on_client.trailing_metadata =
&(ctx->recv_status_on_client.trailing_metadata);
ops[5].data.recv_status_on_client.status =
&(ctx->recv_status_on_client.status);
/* not using preallocation for status_details */
ops[5].data.recv_status_on_client.status_details =
&(ctx->recv_status_on_client.status_details);
ops[5].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
GPR_EXPORT gpr_int32 GPR_CALLTYPE gprsharp_sizeof_timespec(void) {
return sizeof(gpr_timespec);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
/* Call */
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_add_metadata_old(grpc_call *call, grpc_metadata *metadata,
gpr_uint32 flags) {
return grpc_call_add_metadata_old(call, metadata, flags);
grpcsharp_call_start_client_streaming(grpc_call *call,
callback_funcptr callback) {
/* TODO: don't use magic number */
grpc_op ops[4];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[2].op = GRPC_OP_RECV_MESSAGE;
ops[2].data.recv_message = &(ctx->recv_message);
ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[3].data.recv_status_on_client.trailing_metadata =
&(ctx->recv_status_on_client.trailing_metadata);
ops[3].data.recv_status_on_client.status =
&(ctx->recv_status_on_client.status);
/* not using preallocation for status_details */
ops[3].data.recv_status_on_client.status_details =
&(ctx->recv_status_on_client.status_details);
ops[3].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_invoke_old(grpc_call *call, grpc_completion_queue *cq,
void *metadata_read_tag, void *finished_tag,
gpr_uint32 flags) {
return grpc_call_invoke_old(call, cq, metadata_read_tag, finished_tag, flags);
grpcsharp_call_start_server_streaming(grpc_call *call,
callback_funcptr callback,
const char *send_buffer,
size_t send_buffer_len) {
/* TODO: don't use magic number */
grpc_op ops[5];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[4].data.recv_status_on_client.trailing_metadata =
&(ctx->recv_status_on_client.trailing_metadata);
ops[4].data.recv_status_on_client.status =
&(ctx->recv_status_on_client.status);
/* not using preallocation for status_details */
ops[4].data.recv_status_on_client.status_details =
&(ctx->recv_status_on_client.status_details);
ops[4].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_server_accept_old(grpc_call *call, grpc_completion_queue *cq,
void *finished_tag) {
return grpc_call_server_accept_old(call, cq, finished_tag);
grpcsharp_call_start_duplex_streaming(grpc_call *call,
callback_funcptr callback) {
/* TODO: don't use magic number */
grpc_op ops[3];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[2].data.recv_status_on_client.trailing_metadata =
&(ctx->recv_status_on_client.trailing_metadata);
ops[2].data.recv_status_on_client.status =
&(ctx->recv_status_on_client.status);
/* not using preallocation for status_details */
ops[2].data.recv_status_on_client.status_details =
&(ctx->recv_status_on_client.status_details);
ops[2].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_server_end_initial_metadata_old(grpc_call *call,
gpr_uint32 flags) {
return grpc_call_server_end_initial_metadata_old(call, flags);
}
grpcsharp_call_send_message(grpc_call *call, callback_funcptr callback,
const char *send_buffer, size_t send_buffer_len) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_cancel(grpc_call *call) {
return grpc_call_cancel(call);
}
ops[0].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_cancel_with_status(grpc_call *call, grpc_status_code status,
const char *description) {
return grpc_call_cancel_with_status(call, status, description);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_write_old(grpc_call *call, grpc_byte_buffer *byte_buffer,
void *tag, gpr_uint32 flags) {
return grpc_call_start_write_old(call, byte_buffer, tag, flags);
grpcsharp_call_send_close_from_client(grpc_call *call,
callback_funcptr callback) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_write_status_old(grpc_call *call,
grpc_status_code status_code,
const char *status_message, void *tag) {
return grpc_call_start_write_status_old(call, status_code, status_message,
tag);
grpcsharp_call_send_status_from_server(grpc_call *call,
callback_funcptr callback,
grpc_status_code status_code,
const char *status_details) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
gpr_strdup(status_details);
ops[0].data.send_status_from_server.trailing_metadata = NULL;
ops[0].data.send_status_from_server.trailing_metadata_count = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_writes_done_old(grpc_call *call, void *tag) {
return grpc_call_writes_done_old(call, tag);
grpcsharp_call_recv_message(grpc_call *call, callback_funcptr callback) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_RECV_MESSAGE;
ops[0].data.recv_message = &(ctx->recv_message);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_read_old(grpc_call *call, void *tag) {
return grpc_call_start_read_old(call, tag);
}
grpcsharp_call_start_serverside(grpc_call *call, callback_funcptr callback) {
/* TODO: don't use magic number */
grpc_op ops[2];
GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
grpc_call_destroy(call);
}
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_call_start_write_from_copied_buffer(grpc_call *call,
const char *buffer, size_t len,
void *tag, gpr_uint32 flags) {
grpc_byte_buffer *byte_buffer = string_to_byte_buffer(buffer, len);
GPR_ASSERT(grpc_call_start_write_old(call, byte_buffer, tag, flags) ==
GRPC_CALL_OK);
grpc_byte_buffer_destroy(byte_buffer);
}
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
/* Server */
ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops[1].data.recv_close_on_server.cancelled =
(&ctx->recv_close_on_server_cancelled);
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_server_request_call_old(grpc_server *server, void *tag_new) {
return grpc_server_request_call_old(server, tag_new);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
/* Server */
GPR_EXPORT grpc_server *GPR_CALLTYPE
grpcsharp_server_create(grpc_completion_queue *cq,
const grpc_channel_args *args) {
return grpc_server_create(cq, args);
}
GPR_EXPORT int GPR_CALLTYPE
GPR_EXPORT gpr_int32 GPR_CALLTYPE
grpcsharp_server_add_http2_port(grpc_server *server, const char *addr) {
return grpc_server_add_http2_port(server, addr);
}
@ -338,3 +554,14 @@ grpcsharp_server_shutdown_and_notify(grpc_server *server, void *tag) {
GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) {
grpc_server_destroy(server);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq,
callback_funcptr callback) {
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
return grpc_server_request_call(
server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details),
&(ctx->server_rpc_new.request_metadata), cq, ctx);
}

Loading…
Cancel
Save