From 60ea9130e10fad6a1de0275120f14416712de449 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 18 Feb 2015 12:40:25 -0800 Subject: [PATCH 01/10] remove nonexistent stream_context_interface.h from build.json --- Makefile | 1 - build.json | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 485042b5b3e..e45b128dbed 100644 --- a/Makefile +++ b/Makefile @@ -3025,7 +3025,6 @@ PUBLIC_HEADERS_CXX += \ include/grpc++/server_credentials.h \ include/grpc++/status.h \ include/grpc++/stream.h \ - include/grpc++/stream_context_interface.h \ LIBGRPC++_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPC++_SRC)))) diff --git a/build.json b/build.json index 07af69126b8..4bca52e3027 100644 --- a/build.json +++ b/build.json @@ -415,8 +415,7 @@ "include/grpc++/server_context.h", "include/grpc++/server_credentials.h", "include/grpc++/status.h", - "include/grpc++/stream.h", - "include/grpc++/stream_context_interface.h" + "include/grpc++/stream.h" ], "headers": [ "src/cpp/client/channel.h", From 337a2ddba59563e7370b133d63ab8bd9ebeb7232 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Fri, 13 Feb 2015 15:41:41 -0800 Subject: [PATCH 02/10] migration to new C API --- src/csharp/GrpcApi/MathGrpc.cs | 6 +- src/csharp/GrpcApi/TestServiceGrpc.cs | 6 +- src/csharp/GrpcCore/Calls.cs | 58 +-- src/csharp/GrpcCore/GrpcCore.csproj | 6 +- src/csharp/GrpcCore/Internal/AsyncCall.cs | 493 ++++++++++-------- .../Internal/BatchContextSafeHandle.cs | 96 ++++ .../GrpcCore/Internal/CallSafeHandle.cs | 138 ++--- ...ver.cs => ClientStreamingInputObserver.cs} | 15 +- .../Internal/CompletionQueueSafeHandle.cs | 16 - src/csharp/GrpcCore/Internal/Event.cs | 224 -------- .../GrpcCore/Internal/GrpcThreadPool.cs | 47 +- .../Internal/SafeHandleZeroIsInvalid.cs | 6 + .../GrpcCore/Internal/ServerSafeHandle.cs | 16 +- ...er.cs => ServerStreamingOutputObserver.cs} | 10 +- src/csharp/GrpcCore/Server.cs | 31 +- src/csharp/GrpcCore/ServerCallHandler.cs | 28 +- src/csharp/GrpcCoreTests/ClientServerTest.cs | 67 ++- src/csharp/ext/grpc_csharp_ext.c | 365 +++++++++++++ 18 files changed, 953 insertions(+), 675 deletions(-) create mode 100644 src/csharp/GrpcCore/Internal/BatchContextSafeHandle.cs rename src/csharp/GrpcCore/Internal/{StreamingInputObserver.cs => ClientStreamingInputObserver.cs} (88%) delete mode 100644 src/csharp/GrpcCore/Internal/Event.cs rename src/csharp/GrpcCore/Internal/{ServerWritingObserver.cs => ServerStreamingOutputObserver.cs} (87%) diff --git a/src/csharp/GrpcApi/MathGrpc.cs b/src/csharp/GrpcApi/MathGrpc.cs index caea1608ecf..44e704e4969 100644 --- a/src/csharp/GrpcApi/MathGrpc.cs +++ b/src/csharp/GrpcApi/MathGrpc.cs @@ -81,7 +81,7 @@ namespace math Task DivAsync(DivArgs request, CancellationToken token = default(CancellationToken)); - Task Fib(FibArgs request, IObserver responseObserver, CancellationToken token = default(CancellationToken)); + void Fib(FibArgs request, IObserver responseObserver, CancellationToken token = default(CancellationToken)); ClientStreamingAsyncResult Sum(CancellationToken token = default(CancellationToken)); @@ -109,10 +109,10 @@ namespace math return Calls.AsyncUnaryCall(call, request, token); } - public Task Fib(FibArgs request, IObserver responseObserver, CancellationToken token = default(CancellationToken)) + public void Fib(FibArgs request, IObserver responseObserver, CancellationToken token = default(CancellationToken)) { var call = new Google.GRPC.Core.Call(fibMethod, channel); - return Calls.AsyncServerStreamingCall(call, request, responseObserver, token); + Calls.AsyncServerStreamingCall(call, request, responseObserver, token); } public ClientStreamingAsyncResult Sum(CancellationToken token = default(CancellationToken)) diff --git a/src/csharp/GrpcApi/TestServiceGrpc.cs b/src/csharp/GrpcApi/TestServiceGrpc.cs index 6534a44ef4f..64d5c095633 100644 --- a/src/csharp/GrpcApi/TestServiceGrpc.cs +++ b/src/csharp/GrpcApi/TestServiceGrpc.cs @@ -99,7 +99,7 @@ namespace grpc.testing Task UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken)); - Task StreamingOutputCall(StreamingOutputCallRequest request, IObserver responseObserver, CancellationToken token = default(CancellationToken)); + void StreamingOutputCall(StreamingOutputCallRequest request, IObserver responseObserver, CancellationToken token = default(CancellationToken)); ClientStreamingAsyncResult StreamingInputCall(CancellationToken token = default(CancellationToken)); @@ -141,9 +141,9 @@ namespace grpc.testing return Calls.AsyncUnaryCall(call, request, token); } - public Task StreamingOutputCall(StreamingOutputCallRequest request, IObserver responseObserver, CancellationToken token = default(CancellationToken)) { + public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver responseObserver, CancellationToken token = default(CancellationToken)) { var call = new Google.GRPC.Core.Call(streamingOutputCallMethod, channel); - return Calls.AsyncServerStreamingCall(call, request, responseObserver, token); + Calls.AsyncServerStreamingCall(call, request, responseObserver, token); } public ClientStreamingAsyncResult StreamingInputCall(CancellationToken token = default(CancellationToken)) diff --git a/src/csharp/GrpcCore/Calls.cs b/src/csharp/GrpcCore/Calls.cs index d89d9a16f9b..e5ddd879d68 100644 --- a/src/csharp/GrpcCore/Calls.cs +++ b/src/csharp/GrpcCore/Calls.cs @@ -47,50 +47,42 @@ namespace Google.GRPC.Core { public static TResponse BlockingUnaryCall(Call call, TRequest req, CancellationToken token) { - //TODO: implement this in real synchronous style once new GRPC C core API is available. - return AsyncUnaryCall(call, req, token).Result; + //TODO: implement this in real synchronous style. + try { + return AsyncUnaryCall(call, req, token).Result; + } catch(AggregateException ae) { + foreach (var e in ae.InnerExceptions) + { + if (e is RpcException) + { + throw e; + } + } + throw; + } } public static async Task AsyncUnaryCall(Call call, TRequest req, CancellationToken token) { var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer); - asyncCall.Initialize(call.Channel, call.MethodName); - asyncCall.Start(false, GetCompletionQueue()); - - await asyncCall.WriteAsync(req); - await asyncCall.WritesCompletedAsync(); - - TResponse response = await asyncCall.ReadAsync(); - - Status status = await asyncCall.Finished; - - if (status.StatusCode != StatusCode.GRPC_STATUS_OK) - { - throw new RpcException(status); - } - return response; + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + return await asyncCall.UnaryCallAsync(req); } - public static async Task AsyncServerStreamingCall(Call call, TRequest req, IObserver outputs, CancellationToken token) + public static void AsyncServerStreamingCall(Call call, TRequest req, IObserver outputs, CancellationToken token) { var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer); - asyncCall.Initialize(call.Channel, call.MethodName); - asyncCall.Start(false, GetCompletionQueue()); - asyncCall.StartReadingToStream(outputs); - - await asyncCall.WriteAsync(req); - await asyncCall.WritesCompletedAsync(); + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + asyncCall.StartServerStreamingCall(req, outputs); } public static ClientStreamingAsyncResult AsyncClientStreamingCall(Call call, CancellationToken token) { var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer); - asyncCall.Initialize(call.Channel, call.MethodName); - asyncCall.Start(false, GetCompletionQueue()); - - var task = asyncCall.ReadAsync(); - var inputs = new StreamingInputObserver(asyncCall); + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + var task = asyncCall.ClientStreamingCallAsync(); + var inputs = new ClientStreamingInputObserver(asyncCall); return new ClientStreamingAsyncResult(task, inputs); } @@ -102,12 +94,10 @@ namespace Google.GRPC.Core public static IObserver DuplexStreamingCall(Call call, IObserver outputs, CancellationToken token) { var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer); - asyncCall.Initialize(call.Channel, call.MethodName); - asyncCall.Start(false, GetCompletionQueue()); + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); - asyncCall.StartReadingToStream(outputs); - var inputs = new StreamingInputObserver(asyncCall); - return inputs; + asyncCall.StartDuplexStreamingCall(outputs); + return new ClientStreamingInputObserver(asyncCall); } private static CompletionQueueSafeHandle GetCompletionQueue() { diff --git a/src/csharp/GrpcCore/GrpcCore.csproj b/src/csharp/GrpcCore/GrpcCore.csproj index 34b9f6dfb82..a574f181c8a 100644 --- a/src/csharp/GrpcCore/GrpcCore.csproj +++ b/src/csharp/GrpcCore/GrpcCore.csproj @@ -47,21 +47,21 @@ - - - + + + diff --git a/src/csharp/GrpcCore/Internal/AsyncCall.cs b/src/csharp/GrpcCore/Internal/AsyncCall.cs index d5f3239e1e0..ae7428978ea 100644 --- a/src/csharp/GrpcCore/Internal/AsyncCall.cs +++ b/src/csharp/GrpcCore/Internal/AsyncCall.cs @@ -2,11 +2,11 @@ // Copyright 2015, Google Inc. // All rights reserved. -// +// // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: -// +// // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above @@ -16,7 +16,7 @@ // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. -// +// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR @@ -41,39 +41,28 @@ using Google.GRPC.Core.Internal; namespace Google.GRPC.Core.Internal { - /// - /// Listener for call events that can be delivered from a completion queue. - /// - internal interface ICallEventListener { - - void OnClientMetadata(); - - void OnRead(byte[] payload); - - void OnWriteAccepted(GRPCOpError error); - - void OnFinishAccepted(GRPCOpError error); - - // ignore the status on server - void OnFinished(Status status); - } - /// /// Handle native call lifecycle and provides convenience methods. /// - internal class AsyncCall: ICallEventListener, IDisposable + internal class AsyncCall : IDisposable { readonly Func serializer; readonly Func deserializer; - // TODO: make sure the delegate doesn't get garbage collected while + // TODO: make sure the delegate doesn't get garbage collected while // native callbacks are in the completion queue. - readonly EventCallbackDelegate callbackHandler; + readonly CompletionCallbackDelegate unaryResponseHandler; + readonly CompletionCallbackDelegate finishedHandler; + readonly CompletionCallbackDelegate writeFinishedHandler; + readonly CompletionCallbackDelegate readFinishedHandler; + readonly CompletionCallbackDelegate halfclosedHandler; + readonly CompletionCallbackDelegate finishedServersideHandler; object myLock = new object(); bool disposed; CallSafeHandle call; + bool server; bool started; bool errorOccured; @@ -85,54 +74,25 @@ namespace Google.GRPC.Core.Internal TaskCompletionSource writeTcs; TaskCompletionSource readTcs; + + TaskCompletionSource finishedServersideTcs = new TaskCompletionSource(); TaskCompletionSource halfcloseTcs = new TaskCompletionSource(); TaskCompletionSource finishedTcs = new TaskCompletionSource(); + TaskCompletionSource unaryResponseTcs; + IObserver readObserver; public AsyncCall(Func serializer, Func deserializer) { this.serializer = serializer; this.deserializer = deserializer; - this.callbackHandler = HandleEvent; - } - - public Task WriteAsync(TWrite msg) - { - return StartWrite(msg, false).Task; - } - - public Task WritesCompletedAsync() - { - WritesDone(); - return halfcloseTcs.Task; - } - - public Task WriteStatusAsync(Status status) - { - WriteStatus(status); - return halfcloseTcs.Task; - } - - public Task ReadAsync() - { - return StartRead().Task; - } - - public Task Halfclosed - { - get - { - return halfcloseTcs.Task; - } - } - - public Task Finished - { - get - { - return finishedTcs.Task; - } + this.unaryResponseHandler = HandleUnaryResponseCompletion; + this.finishedHandler = HandleFinished; + this.writeFinishedHandler = HandleWriteFinished; + this.readFinishedHandler = HandleReadFinished; + this.halfclosedHandler = HandleHalfclosed; + this.finishedServersideHandler = HandleFinishedServerside; } /// @@ -147,14 +107,14 @@ namespace Google.GRPC.Core.Internal throw new InvalidOperationException("Already registered an observer."); } this.readObserver = readObserver; - StartRead(); + ReceiveMessageAsync(); } } - public void Initialize(Channel channel, String methodName) { + public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName) { lock (myLock) { - this.call = CallSafeHandle.Create(channel.Handle, methodName, channel.Target, Timespec.InfFuture); + this.call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture); } } @@ -163,42 +123,75 @@ namespace Google.GRPC.Core.Internal lock(myLock) { this.call = call; + started = true; + server = true; } } - // Client only - public void Start(bool buffered, CompletionQueueSafeHandle cq) + + public Task UnaryCallAsync(TWrite msg) { lock (myLock) { - if (started) - { - throw new InvalidOperationException("Already started."); - } - - call.Invoke(cq, buffered, callbackHandler, callbackHandler); started = true; + halfcloseRequested = true; + + // TODO: handle serialization error... + byte[] payload = serializer(msg); + + unaryResponseTcs = new TaskCompletionSource(); + call.StartUnary(payload, unaryResponseHandler); + + return unaryResponseTcs.Task; } } - // Server only - public void Accept(CompletionQueueSafeHandle cq) + public Task ClientStreamingCallAsync() { lock (myLock) { - if (started) - { - throw new InvalidOperationException("Already started."); - } + started = true; + + unaryResponseTcs = new TaskCompletionSource(); + call.StartClientStreaming(unaryResponseHandler); + + return unaryResponseTcs.Task; + } + } - call.ServerAccept(cq, callbackHandler); - call.ServerEndInitialMetadata(0); + public void StartServerStreamingCall(TWrite msg, IObserver readObserver) + { + lock (myLock) + { started = true; + halfcloseRequested = true; + + this.readObserver = readObserver; + + // TODO: handle serialization error... + byte[] payload = serializer(msg); + + call.StartServerStreaming(payload, finishedHandler); + + ReceiveMessageAsync(); } } - public TaskCompletionSource StartWrite(TWrite msg, bool buffered) + public void StartDuplexStreamingCall(IObserver readObserver) { + lock (myLock) + { + started = true; + + this.readObserver = readObserver; + + call.StartDuplexStreaming(finishedHandler); + + ReceiveMessageAsync(); + } + } + + public Task SendMessageAsync(TWrite msg) { lock (myLock) { CheckStarted(); @@ -219,14 +212,13 @@ namespace Google.GRPC.Core.Internal // TODO: wrap serialization... byte[] payload = serializer(msg); - call.StartWrite(payload, buffered, callbackHandler); + call.StartSendMessage(payload, writeFinishedHandler); writeTcs = new TaskCompletionSource(); - return writeTcs; + return writeTcs.Task; } } - // client only - public void WritesDone() + public Task SendCloseFromClientAsync() { lock (myLock) { @@ -240,13 +232,13 @@ namespace Google.GRPC.Core.Internal throw new InvalidOperationException("Already halfclosed."); } - call.WritesDone(callbackHandler); + call.StartSendCloseFromClient(halfclosedHandler); halfcloseRequested = true; + return halfcloseTcs.Task; } } - // server only - public void WriteStatus(Status status) + public Task SendStatusFromServerAsync(Status status) { lock (myLock) { @@ -260,12 +252,13 @@ namespace Google.GRPC.Core.Internal throw new InvalidOperationException("Already halfclosed."); } - call.StartWriteStatus(status, callbackHandler); + call.StartSendStatusFromServer(status, halfclosedHandler); halfcloseRequested = true; + return halfcloseTcs.Task; } } - public TaskCompletionSource StartRead() + public Task ReceiveMessageAsync() { lock (myLock) { @@ -285,10 +278,19 @@ namespace Google.GRPC.Core.Internal throw new InvalidOperationException("Only one read can be pending at a time"); } - call.StartRead(callbackHandler); + call.StartReceiveMessage(readFinishedHandler); readTcs = new TaskCompletionSource(); - return readTcs; + return readTcs.Task; + } + } + + internal Task StartServerSide() + { + lock (myLock) + { + call.StartServerSide(finishedServersideHandler); + return finishedServersideTcs.Task; } } @@ -317,107 +319,7 @@ namespace Google.GRPC.Core.Internal // grpc_call_cancel_with_status is threadsafe call.CancelWithStatus(status); } - - public void OnClientMetadata() - { - // TODO: implement.... - } - - public void OnRead(byte[] payload) - { - TaskCompletionSource oldTcs = null; - IObserver observer = null; - lock (myLock) - { - oldTcs = readTcs; - readTcs = null; - if (payload == null) - { - doneWithReading = true; - } - observer = readObserver; - } - - // TODO: wrap deserialization... - TRead msg = payload != null ? deserializer(payload) : default(TRead); - - oldTcs.SetResult(msg); - - // TODO: make sure we deliver reads in the right order. - - if (observer != null) - { - if (payload != null) - { - // TODO: wrap to handle exceptions - observer.OnNext(msg); - - // start a new read - StartRead(); - } - else - { - // TODO: wrap to handle exceptions; - observer.OnCompleted(); - } - - } - } - - public void OnWriteAccepted(GRPCOpError error) - { - TaskCompletionSource oldTcs = null; - lock (myLock) - { - UpdateErrorOccured(error); - oldTcs = writeTcs; - writeTcs = null; - } - - if (errorOccured) - { - // TODO: use the right type of exception... - oldTcs.SetException(new Exception("Write failed")); - } - else - { - // TODO: where does the continuation run? - oldTcs.SetResult(null); - } - } - - public void OnFinishAccepted(GRPCOpError error) - { - lock (myLock) - { - UpdateErrorOccured(error); - halfclosed = true; - } - - if (errorOccured) - { - halfcloseTcs.SetException(new Exception("Halfclose failed")); - - } - else - { - halfcloseTcs.SetResult(null); - } - - } - - public void OnFinished(Status status) - { - lock (myLock) - { - finishedStatus = status; - - DisposeResourcesIfNeeded(); - } - finishedTcs.SetResult(status); - - } - + public void Dispose() { Dispose(true); @@ -434,7 +336,7 @@ namespace Google.GRPC.Core.Internal { call.Dispose(); } - } + } disposed = true; } } @@ -489,38 +391,195 @@ namespace Google.GRPC.Core.Internal } } - private void HandleEvent(IntPtr eventPtr) { + private void CompleteStreamObserver(Status status) { + if (status.StatusCode != StatusCode.GRPC_STATUS_OK) + { + // TODO: wrap to handle exceptions; + readObserver.OnError(new RpcException(status)); + } else { + // TODO: wrap to handle exceptions; + readObserver.OnCompleted(); + } + } + + private void HandleUnaryResponseCompletion(GRPCOpError error, IntPtr batchContextPtr) { + try { + + TaskCompletionSource tcs; + lock(myLock) { + tcs = unaryResponseTcs; + } + + // we're done with this call, get rid of the native object. + call.Dispose(); + + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + + if (error != GRPCOpError.GRPC_OP_OK) { + tcs.SetException(new RpcException( + new Status(StatusCode.GRPC_STATUS_INTERNAL, "Internal error occured.") + )); + return; + } + + var status = ctx.GetReceivedStatus(); + if (status.StatusCode != StatusCode.GRPC_STATUS_OK) { + tcs.SetException(new RpcException(status)); + return; + } + + // TODO: handle deserialize error... + var msg = deserializer(ctx.GetReceivedMessage()); + tcs.SetResult(msg); + } catch(Exception e) { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr) { + try { + + TaskCompletionSource oldTcs = null; + lock (myLock) + { + UpdateErrorOccured(error); + oldTcs = writeTcs; + writeTcs = null; + } + + if (errorOccured) + { + // TODO: use the right type of exception... + oldTcs.SetException(new Exception("Write failed")); + } + else + { + // TODO: where does the continuation run? + oldTcs.SetResult(null); + } + + } catch(Exception e) { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr) { + try { + lock (myLock) + { + UpdateErrorOccured(error); + halfclosed = true; + } + + if (errorOccured) + { + halfcloseTcs.SetException(new Exception("Halfclose failed")); + + } + else + { + halfcloseTcs.SetResult(null); + } + } catch(Exception e) { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr) { try { - var ev = new EventSafeHandleNotOwned(eventPtr); - switch (ev.GetCompletionType()) + + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + var payload = ctx.GetReceivedMessage(); + + TaskCompletionSource oldTcs = null; + IObserver observer = null; + + Nullable status = null; + + lock (myLock) { - case GRPCCompletionType.GRPC_CLIENT_METADATA_READ: - OnClientMetadata(); - break; + oldTcs = readTcs; + readTcs = null; + if (payload == null) + { + doneWithReading = true; + } + observer = readObserver; + status = finishedStatus; + } + + // TODO: wrap deserialization... + TRead msg = payload != null ? deserializer(payload) : default(TRead); - case GRPCCompletionType.GRPC_READ: - byte[] payload = ev.GetReadData(); - OnRead(payload); - break; + oldTcs.SetResult(msg); - case GRPCCompletionType.GRPC_WRITE_ACCEPTED: - OnWriteAccepted(ev.GetWriteAccepted()); - break; + // TODO: make sure we deliver reads in the right order. - case GRPCCompletionType.GRPC_FINISH_ACCEPTED: - OnFinishAccepted(ev.GetFinishAccepted()); - break; + if (observer != null) { + if (payload != null) + { + // TODO: wrap to handle exceptions + observer.OnNext(msg); + + // start a new read + ReceiveMessageAsync(); + } + else + { + if (!server) { + if (status.HasValue) { + CompleteStreamObserver(status.Value); + } + } else { + // TODO: wrap to handle exceptions.. + observer.OnCompleted(); + } + // TODO: completeStreamObserver serverside... + } + } + } catch(Exception e) { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr) { + try { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + var status = ctx.GetReceivedStatus(); + + bool wasDoneWithReading; + + lock (myLock) + { + finishedStatus = status; - case GRPCCompletionType.GRPC_FINISHED: - OnFinished(ev.GetFinished()); - break; + DisposeResourcesIfNeeded(); - default: - throw new ArgumentException("Unexpected completion type"); + wasDoneWithReading = doneWithReading; } + + if (wasDoneWithReading) { + CompleteStreamObserver(status); + } + + } catch(Exception e) { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr) { + try { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + + // TODO: handle error ... + + finishedServersideTcs.SetResult(null); + + call.Dispose(); + } catch(Exception e) { Console.WriteLine("Caught exception in a native handler: " + e); } } } -} +} \ No newline at end of file diff --git a/src/csharp/GrpcCore/Internal/BatchContextSafeHandle.cs b/src/csharp/GrpcCore/Internal/BatchContextSafeHandle.cs new file mode 100644 index 00000000000..ddfd94a3b56 --- /dev/null +++ b/src/csharp/GrpcCore/Internal/BatchContextSafeHandle.cs @@ -0,0 +1,96 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using Google.GRPC.Core; + +namespace Google.GRPC.Core.Internal +{ + /// + /// Not owned version of + /// grpcsharp_batch_context + /// + internal class BatchContextSafeHandleNotOwned : SafeHandleZeroIsInvalid + { + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandleNotOwned ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandleNotOwned ctx, byte[] buffer, UIntPtr bufferLen); + + [DllImport("grpc_csharp_ext.dll")] + static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandleNotOwned ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandleNotOwned ctx); // returns const char* + + [DllImport("grpc_csharp_ext.dll")] + static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandleNotOwned ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char* + + public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false) + { + SetHandle(handle); + } + + public Status GetReceivedStatus() + { + // TODO: can the native method return string directly? + string details = Marshal.PtrToStringAnsi(grpcsharp_batch_context_recv_status_on_client_details(this)); + return new Status(grpcsharp_batch_context_recv_status_on_client_status(this), details); + } + + public byte[] GetReceivedMessage() + { + IntPtr len = grpcsharp_batch_context_recv_message_length(this); + if (len == new IntPtr(-1)) + { + return null; + } + byte[] data = new byte[(int) len]; + grpcsharp_batch_context_recv_message_to_buffer(this, data, new UIntPtr((ulong)data.Length)); + return data; + } + + public CallSafeHandle GetServerRpcNewCall() { + return grpcsharp_batch_context_server_rpc_new_call(this); + } + + public string GetServerRpcNewMethod() { + return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this)); + } + } +} \ No newline at end of file diff --git a/src/csharp/GrpcCore/Internal/CallSafeHandle.cs b/src/csharp/GrpcCore/Internal/CallSafeHandle.cs index e9ccd8d5f99..55d66a62ca7 100644 --- a/src/csharp/GrpcCore/Internal/CallSafeHandle.cs +++ b/src/csharp/GrpcCore/Internal/CallSafeHandle.cs @@ -2,11 +2,11 @@ // Copyright 2015, Google Inc. // All rights reserved. -// +// // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: -// +// // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above @@ -16,7 +16,7 @@ // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. -// +// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR @@ -38,8 +38,8 @@ using Google.GRPC.Core; namespace Google.GRPC.Core.Internal { - // TODO: we need to make sure that the delegates are not collected before invoked. - internal delegate void EventCallbackDelegate(IntPtr eventPtr); + //TODO: rename the delegate + internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr); /// /// grpc_call from @@ -49,142 +49,108 @@ namespace Google.GRPC.Core.Internal const UInt32 GRPC_WRITE_BUFFER_HINT = 1; [DllImport("grpc_csharp_ext.dll")] - static extern CallSafeHandle grpcsharp_channel_create_call_old(ChannelSafeHandle channel, string method, string host, Timespec deadline); + static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_add_metadata(CallSafeHandle call, IntPtr metadata, UInt32 flags); + static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_invoke_old(CallSafeHandle call, CompletionQueueSafeHandle cq, IntPtr metadataReadTag, IntPtr finishedTag, UInt32 flags); - - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_invoke_old")] - static extern GRPCCallError grpcsharp_call_invoke_old_CALLBACK(CallSafeHandle call, CompletionQueueSafeHandle cq, - [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate metadataReadCallback, - [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate finishedCallback, - UInt32 flags); + static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_server_accept_old(CallSafeHandle call, CompletionQueueSafeHandle completionQueue, IntPtr finishedTag); - - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_server_accept_old")] - static extern GRPCCallError grpcsharp_call_server_accept_old_CALLBACK(CallSafeHandle call, CompletionQueueSafeHandle completionQueue, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate finishedCallback); + static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_server_end_initial_metadata_old(CallSafeHandle call, UInt32 flags); + static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call); + static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description); + static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_start_write_status_old(CallSafeHandle call, StatusCode statusCode, string statusMessage, IntPtr tag); - - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_write_status_old")] - static extern GRPCCallError grpcsharp_call_start_write_status_old_CALLBACK(CallSafeHandle call, StatusCode statusCode, string statusMessage, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); + static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_writes_done_old(CallSafeHandle call, IntPtr tag); - - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_writes_done_old")] - static extern GRPCCallError grpcsharp_call_writes_done_old_CALLBACK(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); + static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCallError grpcsharp_call_start_read_old(CallSafeHandle call, IntPtr tag); - - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_read_old")] - static extern GRPCCallError grpcsharp_call_start_read_old_CALLBACK(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); + static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage); [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_call_start_write_from_copied_buffer(CallSafeHandle call, - byte[] buffer, UIntPtr length, - IntPtr tag, UInt32 flags); + static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_call_start_write_from_copied_buffer")] - static extern void grpcsharp_call_start_write_from_copied_buffer_CALLBACK(CallSafeHandle call, - byte[] buffer, UIntPtr length, - [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback, - UInt32 flags); + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); - [DllImport("grpc_csharp_ext.dll")] + [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_call_destroy(IntPtr call); - private CallSafeHandle() - { - } - - /// - /// Creates a client call. - /// - public static CallSafeHandle Create(ChannelSafeHandle channel, string method, string host, Timespec deadline) - { - return grpcsharp_channel_create_call_old(channel, method, host, deadline); - } - - public void Invoke(CompletionQueueSafeHandle cq, IntPtr metadataReadTag, IntPtr finishedTag, bool buffered) - { - AssertCallOk(grpcsharp_call_invoke_old(this, cq, metadataReadTag, finishedTag, GetFlags(buffered))); - } - - public void Invoke(CompletionQueueSafeHandle cq, bool buffered, EventCallbackDelegate metadataReadCallback, EventCallbackDelegate finishedCallback) - { - AssertCallOk(grpcsharp_call_invoke_old_CALLBACK(this, cq, metadataReadCallback, finishedCallback, GetFlags(buffered))); - } - public void ServerAccept(CompletionQueueSafeHandle cq, IntPtr finishedTag) + private CallSafeHandle() { - AssertCallOk(grpcsharp_call_server_accept_old(this, cq, finishedTag)); } - public void ServerAccept(CompletionQueueSafeHandle cq, EventCallbackDelegate callback) + public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) { - AssertCallOk(grpcsharp_call_server_accept_old_CALLBACK(this, cq, callback)); + return grpcsharp_channel_create_call(channel, cq, method, host, deadline); } - public void ServerEndInitialMetadata(UInt32 flags) + public void StartUnary(byte[] payload, CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_server_end_initial_metadata_old(this, flags)); + AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length))); } - public void StartWrite(byte[] payload, IntPtr tag, bool buffered) + public void StartClientStreaming(CompletionCallbackDelegate callback) { - grpcsharp_call_start_write_from_copied_buffer(this, payload, new UIntPtr((ulong) payload.Length), tag, GetFlags(buffered)); + AssertCallOk(grpcsharp_call_start_client_streaming(this, callback)); } - public void StartWrite(byte[] payload, bool buffered, EventCallbackDelegate callback) + public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback) { - grpcsharp_call_start_write_from_copied_buffer_CALLBACK(this, payload, new UIntPtr((ulong) payload.Length), callback, GetFlags(buffered)); + AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong) payload.Length))); } - public void StartWriteStatus(Status status, IntPtr tag) + public void StartDuplexStreaming(CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_start_write_status_old(this, status.StatusCode, status.Detail, tag)); + AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback)); } - public void StartWriteStatus(Status status, EventCallbackDelegate callback) + public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_start_write_status_old_CALLBACK(this, status.StatusCode, status.Detail, callback)); + AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong) payload.Length))); } - public void WritesDone(IntPtr tag) + public void StartSendCloseFromClient(CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_writes_done_old(this, tag)); + AssertCallOk(grpcsharp_call_send_close_from_client(this, callback)); } - public void WritesDone(EventCallbackDelegate callback) + public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_writes_done_old_CALLBACK(this, callback)); + AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail)); } - public void StartRead(IntPtr tag) + public void StartReceiveMessage(CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_start_read_old(this, tag)); + AssertCallOk(grpcsharp_call_recv_message(this, callback)); } - public void StartRead(EventCallbackDelegate callback) + public void StartServerSide(CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_start_read_old_CALLBACK(this, callback)); + AssertCallOk(grpcsharp_call_start_serverside(this, callback)); } public void Cancel() @@ -212,4 +178,4 @@ namespace Google.GRPC.Core.Internal return buffered ? 0 : GRPC_WRITE_BUFFER_HINT; } } -} +} \ No newline at end of file diff --git a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs b/src/csharp/GrpcCore/Internal/ClientStreamingInputObserver.cs similarity index 88% rename from src/csharp/GrpcCore/Internal/StreamingInputObserver.cs rename to src/csharp/GrpcCore/Internal/ClientStreamingInputObserver.cs index 60837de5e65..4d10a9bdf96 100644 --- a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs +++ b/src/csharp/GrpcCore/Internal/ClientStreamingInputObserver.cs @@ -2,11 +2,11 @@ // Copyright 2015, Google Inc. // All rights reserved. -// +// // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: -// +// // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above @@ -16,7 +16,7 @@ // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. -// +// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR @@ -36,19 +36,20 @@ using Google.GRPC.Core.Internal; namespace Google.GRPC.Core.Internal { - internal class StreamingInputObserver : IObserver + internal class ClientStreamingInputObserver : IObserver { readonly AsyncCall call; - public StreamingInputObserver(AsyncCall call) + public ClientStreamingInputObserver(AsyncCall call) { this.call = call; } public void OnCompleted() { + // TODO: how bad is the Wait here? - call.WritesCompletedAsync().Wait(); + call.SendCloseFromClientAsync().Wait(); } public void OnError(Exception error) @@ -59,7 +60,7 @@ namespace Google.GRPC.Core.Internal public void OnNext(TWrite value) { // TODO: how bad is the Wait here? - call.WriteAsync(value).Wait(); + call.SendMessageAsync(value).Wait(); } } } diff --git a/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs b/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs index 666f220b8c8..5ea436df197 100644 --- a/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs +++ b/src/csharp/GrpcCore/Internal/CompletionQueueSafeHandle.cs @@ -45,12 +45,6 @@ namespace Google.GRPC.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern CompletionQueueSafeHandle grpcsharp_completion_queue_create(); - [DllImport("grpc_csharp_ext.dll")] - static extern EventSafeHandle grpcsharp_completion_queue_pluck(CompletionQueueSafeHandle cq, IntPtr tag, Timespec deadline); - - [DllImport("grpc_csharp_ext.dll")] - static extern EventSafeHandle grpcsharp_completion_queue_next(CompletionQueueSafeHandle cq, Timespec deadline); - [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_completion_queue_shutdown(CompletionQueueSafeHandle cq); @@ -69,21 +63,11 @@ namespace Google.GRPC.Core.Internal return grpcsharp_completion_queue_create(); } - public EventSafeHandle Next(Timespec deadline) - { - return grpcsharp_completion_queue_next(this, deadline); - } - public GRPCCompletionType NextWithCallback() { return grpcsharp_completion_queue_next_with_callback(this); } - public EventSafeHandle Pluck(IntPtr tag, Timespec deadline) - { - return grpcsharp_completion_queue_pluck(this, tag, deadline); - } - public void Shutdown() { grpcsharp_completion_queue_shutdown(this); diff --git a/src/csharp/GrpcCore/Internal/Event.cs b/src/csharp/GrpcCore/Internal/Event.cs deleted file mode 100644 index 6116e0975af..00000000000 --- a/src/csharp/GrpcCore/Internal/Event.cs +++ /dev/null @@ -1,224 +0,0 @@ -#region Copyright notice and license - -// Copyright 2015, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -#endregion - -using System; -using System.Runtime.InteropServices; -using Google.GRPC.Core; - -namespace Google.GRPC.Core.Internal -{ - /// - /// grpc_event from grpc/grpc.h - /// - internal class EventSafeHandle : SafeHandleZeroIsInvalid - { - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_event_finish(IntPtr ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCompletionType grpcsharp_event_type(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern CallSafeHandle grpcsharp_event_call(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCOpError grpcsharp_event_write_accepted(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCOpError grpcsharp_event_finish_accepted(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern StatusCode grpcsharp_event_finished_status(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_finished_details(EventSafeHandle ev); // returns const char* - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_read_length(EventSafeHandle ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_event_read_copy_to_buffer(EventSafeHandle ev, byte[] buffer, UIntPtr bufferLen); - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_server_rpc_new_method(EventSafeHandle ev); // returns const char* - - public GRPCCompletionType GetCompletionType() - { - return grpcsharp_event_type(this); - } - - public GRPCOpError GetWriteAccepted() - { - return grpcsharp_event_write_accepted(this); - } - - public GRPCOpError GetFinishAccepted() - { - return grpcsharp_event_finish_accepted(this); - } - - public Status GetFinished() - { - // TODO: can the native method return string directly? - string details = Marshal.PtrToStringAnsi(grpcsharp_event_finished_details(this)); - return new Status(grpcsharp_event_finished_status(this), details); - } - - public byte[] GetReadData() - { - IntPtr len = grpcsharp_event_read_length(this); - if (len == new IntPtr(-1)) - { - return null; - } - byte[] data = new byte[(int) len]; - grpcsharp_event_read_copy_to_buffer(this, data, new UIntPtr((ulong)data.Length)); - return data; - } - - public CallSafeHandle GetCall() { - return grpcsharp_event_call(this); - } - - public string GetServerRpcNewMethod() { - // TODO: can the native method return string directly? - return Marshal.PtrToStringAnsi(grpcsharp_event_server_rpc_new_method(this)); - } - - //TODO: client_metadata_read event type - - protected override bool ReleaseHandle() - { - grpcsharp_event_finish(handle); - return true; - } - } - - // TODO: this is basically c&p of EventSafeHandle. Unify! - /// - /// Not owned version of - /// grpc_event from grpc/grpc.h - /// - internal class EventSafeHandleNotOwned : SafeHandleZeroIsInvalid - { - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_event_finish(IntPtr ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCCompletionType grpcsharp_event_type(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern CallSafeHandle grpcsharp_event_call(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCOpError grpcsharp_event_write_accepted(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern GRPCOpError grpcsharp_event_finish_accepted(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern StatusCode grpcsharp_event_finished_status(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_finished_details(EventSafeHandleNotOwned ev); // returns const char* - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_read_length(EventSafeHandleNotOwned ev); - - [DllImport("grpc_csharp_ext.dll")] - static extern void grpcsharp_event_read_copy_to_buffer(EventSafeHandleNotOwned ev, byte[] buffer, UIntPtr bufferLen); - - [DllImport("grpc_csharp_ext.dll")] - static extern IntPtr grpcsharp_event_server_rpc_new_method(EventSafeHandleNotOwned ev); // returns const char* - - public EventSafeHandleNotOwned() : base(false) - { - } - - public EventSafeHandleNotOwned(IntPtr handle) : base(false) - { - SetHandle(handle); - } - - public GRPCCompletionType GetCompletionType() - { - return grpcsharp_event_type(this); - } - - public GRPCOpError GetWriteAccepted() - { - return grpcsharp_event_write_accepted(this); - } - - public GRPCOpError GetFinishAccepted() - { - return grpcsharp_event_finish_accepted(this); - } - - public Status GetFinished() - { - // TODO: can the native method return string directly? - string details = Marshal.PtrToStringAnsi(grpcsharp_event_finished_details(this)); - return new Status(grpcsharp_event_finished_status(this), details); - } - - public byte[] GetReadData() - { - IntPtr len = grpcsharp_event_read_length(this); - if (len == new IntPtr(-1)) - { - return null; - } - byte[] data = new byte[(int) len]; - grpcsharp_event_read_copy_to_buffer(this, data, new UIntPtr((ulong)data.Length)); - return data; - } - - public CallSafeHandle GetCall() { - return grpcsharp_event_call(this); - } - - public string GetServerRpcNewMethod() { - // TODO: can the native method return string directly? - return Marshal.PtrToStringAnsi(grpcsharp_event_server_rpc_new_method(this)); - } - - //TODO: client_metadata_read event type - - protected override bool ReleaseHandle() - { - grpcsharp_event_finish(handle); - return true; - } - } -} diff --git a/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs b/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs index f8154fa2505..634a0b2d721 100644 --- a/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs +++ b/src/csharp/GrpcCore/Internal/GrpcThreadPool.cs @@ -48,7 +48,6 @@ namespace Google.GRPC.Core.Internal readonly object myLock = new object(); readonly List threads = new List(); readonly int poolSize; - readonly Action eventHandler; CompletionQueueSafeHandle cq; @@ -56,11 +55,6 @@ namespace Google.GRPC.Core.Internal this.poolSize = poolSize; } - internal GrpcThreadPool(int poolSize, Action eventHandler) { - this.poolSize = poolSize; - this.eventHandler = eventHandler; - } - public void Start() { lock (myLock) @@ -104,34 +98,19 @@ namespace Google.GRPC.Core.Internal } } - private Thread CreateAndStartThread(int i) { - Action body; - if (eventHandler != null) - { - body = ThreadBodyWithHandler; - } - else - { - body = ThreadBodyNoHandler; - } - var thread = new Thread(new ThreadStart(body)); + private Thread CreateAndStartThread(int i) + { + var thread = new Thread(new ThreadStart(RunHandlerLoop)); thread.IsBackground = false; thread.Start(); - if (eventHandler != null) - { - thread.Name = "grpc_server_newrpc " + i; - } - else - { - thread.Name = "grpc " + i; - } + thread.Name = "grpc " + i; return thread; } /// /// Body of the polling thread. /// - private void ThreadBodyNoHandler() + private void RunHandlerLoop() { GRPCCompletionType completionType; do @@ -140,22 +119,6 @@ namespace Google.GRPC.Core.Internal } while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN); Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting."); } - - /// - /// Body of the polling thread. - /// - private void ThreadBodyWithHandler() - { - GRPCCompletionType completionType; - do - { - using (EventSafeHandle ev = cq.Next(Timespec.InfFuture)) { - completionType = ev.GetCompletionType(); - eventHandler(ev); - } - } while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN); - Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting."); - } } } diff --git a/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs b/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs index 74a8ef7b6ea..59f08d4ca89 100644 --- a/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs +++ b/src/csharp/GrpcCore/Internal/SafeHandleZeroIsInvalid.cs @@ -56,6 +56,12 @@ namespace Google.GRPC.Core.Internal return handle == IntPtr.Zero; } } + + protected override bool ReleaseHandle() + { + // handle is not owned. + return true; + } } } diff --git a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs index c91de97ce3b..c0966028008 100644 --- a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs +++ b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs @@ -38,13 +38,16 @@ using System.Collections.Concurrent; namespace Google.GRPC.Core.Internal { + // TODO: we need to make sure that the delegates are not collected before invoked. + internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr); + /// /// grpc_server from grpc/grpc.h /// internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid { - [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_request_call_old")] - static extern GRPCCallError grpcsharp_server_request_call_old_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args); @@ -63,8 +66,9 @@ namespace Google.GRPC.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_server_shutdown(ServerSafeHandle server); + // TODO: get rid of the old callback style [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_shutdown_and_notify")] - static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); + static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate callback); [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_server_destroy(IntPtr server); @@ -95,14 +99,14 @@ namespace Google.GRPC.Core.Internal grpcsharp_server_shutdown(this); } - public void ShutdownAndNotify(EventCallbackDelegate callback) + public void ShutdownAndNotify(ServerShutdownCallbackDelegate callback) { grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback); } - public GRPCCallError RequestCall(EventCallbackDelegate callback) + public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback) { - return grpcsharp_server_request_call_old_CALLBACK(this, callback); + return grpcsharp_server_request_call(this, cq, callback); } protected override bool ReleaseHandle() diff --git a/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs b/src/csharp/GrpcCore/Internal/ServerStreamingOutputObserver.cs similarity index 87% rename from src/csharp/GrpcCore/Internal/ServerWritingObserver.cs rename to src/csharp/GrpcCore/Internal/ServerStreamingOutputObserver.cs index 1d29864b9f4..e9cb65cb3b0 100644 --- a/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs +++ b/src/csharp/GrpcCore/Internal/ServerStreamingOutputObserver.cs @@ -40,11 +40,11 @@ namespace Google.GRPC.Core.Internal /// Observer that writes all arriving messages to a call abstraction (in blocking fashion) /// and then halfcloses the call. Used for server-side call handling. /// - internal class ServerWritingObserver : IObserver + internal class ServerStreamingOutputObserver : IObserver { readonly AsyncCall call; - public ServerWritingObserver(AsyncCall call) + public ServerStreamingOutputObserver(AsyncCall call) { this.call = call; } @@ -52,19 +52,19 @@ namespace Google.GRPC.Core.Internal public void OnCompleted() { // TODO: how bad is the Wait here? - call.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); + call.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); } public void OnError(Exception error) { - // TODO: handle this... + // TODO: implement this... throw new InvalidOperationException("This should never be called."); } public void OnNext(TWrite value) { // TODO: how bad is the Wait here? - call.WriteAsync(value).Wait(); + call.SendMessageAsync(value).Wait(); } } } diff --git a/src/csharp/GrpcCore/Server.cs b/src/csharp/GrpcCore/Server.cs index 0882a612995..91842d81821 100644 --- a/src/csharp/GrpcCore/Server.cs +++ b/src/csharp/GrpcCore/Server.cs @@ -49,8 +49,8 @@ namespace Google.GRPC.Core { // TODO: make sure the delegate doesn't get garbage collected while // native callbacks are in the completion queue. - readonly EventCallbackDelegate newRpcHandler; - readonly EventCallbackDelegate serverShutdownHandler; + readonly ServerShutdownCallbackDelegate serverShutdownHandler; + readonly CompletionCallbackDelegate newServerRpcHandler; readonly BlockingCollection newRpcQueue = new BlockingCollection(); readonly ServerSafeHandle handle; @@ -61,9 +61,8 @@ namespace Google.GRPC.Core public Server() { - // TODO: what is the tag for server shutdown? this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); - this.newRpcHandler = HandleNewRpc; + this.newServerRpcHandler = HandleNewServerRpc; this.serverShutdownHandler = HandleServerShutdown; } @@ -99,7 +98,7 @@ namespace Google.GRPC.Core { var rpcInfo = newRpcQueue.Take(); - Console.WriteLine("Server received RPC " + rpcInfo.Method); + //Console.WriteLine("Server received RPC " + rpcInfo.Method); IServerCallHandler callHandler; if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) @@ -138,23 +137,25 @@ namespace Google.GRPC.Core private void AllowOneRpc() { - AssertCallOk(handle.RequestCall(newRpcHandler)); + AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler)); } - private void HandleNewRpc(IntPtr eventPtr) - { - try - { - var ev = new EventSafeHandleNotOwned(eventPtr); - var rpcInfo = new NewRpcInfo(ev.GetCall(), ev.GetServerRpcNewMethod()); + private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) { + try { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + + if (error != GRPCOpError.GRPC_OP_OK) { + // TODO: handle error + } + + var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod()); // after server shutdown, the callback returns with null call if (!rpcInfo.Call.IsInvalid) { newRpcQueue.Add(rpcInfo); } - } - catch (Exception e) - { + + } catch(Exception e) { Console.WriteLine("Caught exception in a native handler: " + e); } } diff --git a/src/csharp/GrpcCore/ServerCallHandler.cs b/src/csharp/GrpcCore/ServerCallHandler.cs index bcce4a091fb..3bc3b153964 100644 --- a/src/csharp/GrpcCore/ServerCallHandler.cs +++ b/src/csharp/GrpcCore/ServerCallHandler.cs @@ -59,15 +59,16 @@ namespace Google.GRPC.Core method.RequestMarshaller.Deserializer); asyncCall.InitializeServer(call); - asyncCall.Accept(cq); + + var finishedTask = asyncCall.StartServerSide(); - var request = asyncCall.ReadAsync().Result; + var request = asyncCall.ReceiveMessageAsync().Result; - var responseObserver = new ServerWritingObserver(asyncCall); + var responseObserver = new ServerStreamingOutputObserver(asyncCall); handler(request, responseObserver); - asyncCall.Halfclosed.Wait(); - asyncCall.Finished.Wait(); + finishedTask.Wait(); + } } @@ -89,16 +90,16 @@ namespace Google.GRPC.Core method.RequestMarshaller.Deserializer); asyncCall.InitializeServer(call); - asyncCall.Accept(cq); - var responseObserver = new ServerWritingObserver(asyncCall); + var finishedTask = asyncCall.StartServerSide(); + + var responseObserver = new ServerStreamingOutputObserver(asyncCall); var requestObserver = handler(responseObserver); // feed the requests asyncCall.StartReadingToStream(requestObserver); - asyncCall.Halfclosed.Wait(); - asyncCall.Finished.Wait(); + finishedTask.Wait(); } } @@ -110,11 +111,14 @@ namespace Google.GRPC.Core AsyncCall asyncCall = new AsyncCall( (payload) => payload, (payload) => payload); + asyncCall.InitializeServer(call); - asyncCall.Accept(cq); - asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait(); - asyncCall.Finished.Wait(); + var finishedTask = asyncCall.StartServerSide(); + + asyncCall.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait(); + + finishedTask.Wait(); } } } diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs index 44011565204..dd3fc7038e7 100644 --- a/src/csharp/GrpcCoreTests/ClientServerTest.cs +++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs @@ -36,6 +36,7 @@ using NUnit.Framework; using Google.GRPC.Core; using Google.GRPC.Core.Internal; using System.Threading; +using System.Diagnostics; using System.Threading.Tasks; using Google.GRPC.Core.Utils; @@ -52,7 +53,7 @@ namespace Google.GRPC.Core.Tests Marshallers.StringMarshaller); [Test] - public void EmptyCall() + public void UnaryCall() { GrpcEnvironment.Initialize(); @@ -69,6 +70,7 @@ namespace Google.GRPC.Core.Tests var call = new Call(unaryEchoStringMethod, channel); Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken))); + Assert.AreEqual("abcdef", Calls.BlockingUnaryCall(call, "abcdef", default(CancellationToken))); } @@ -77,11 +79,72 @@ namespace Google.GRPC.Core.Tests GrpcEnvironment.Shutdown(); } + [Test] + public void UnaryCallPerformance() + { + GrpcEnvironment.Initialize(); + + Server server = new Server(); + server.AddServiceDefinition( + ServerServiceDefinition.CreateBuilder("someService") + .AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build()); + + int port = server.AddPort(host + ":0"); + server.Start(); + + using (Channel channel = new Channel(host + ":" + port)) + { + var call = new Call(unaryEchoStringMethod, channel); + + var stopwatch = new Stopwatch(); + stopwatch.Start(); + for (int i = 0; i < 1000; i++) + { + Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); + } + stopwatch.Stop(); + Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms"); + } + + server.ShutdownAsync().Wait(); + + GrpcEnvironment.Shutdown(); + } + + + [Test] + public void UnknownMethodHandler() + { + GrpcEnvironment.Initialize(); + + Server server = new Server(); + server.AddServiceDefinition( + ServerServiceDefinition.CreateBuilder("someService").Build()); + + int port = server.AddPort(host + ":0"); + server.Start(); + + using (Channel channel = new Channel(host + ":" + port)) + { + var call = new Call(unaryEchoStringMethod, channel); + + try { + Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); + Assert.Fail(); + } catch(RpcException e) { + Assert.AreEqual(StatusCode.GRPC_STATUS_UNIMPLEMENTED, e.Status.StatusCode); + } + } + + server.ShutdownAsync().Wait(); + + GrpcEnvironment.Shutdown(); + } + private void HandleUnaryEchoString(string request, IObserver responseObserver) { responseObserver.OnNext(request); responseObserver.OnCompleted(); } - } } diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index c7949af44ec..eff862537b0 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -32,9 +32,11 @@ */ #include +#include #include #include #include +#include #include @@ -58,6 +60,134 @@ grpc_byte_buffer *string_to_byte_buffer(const char *buffer, size_t len) { return bb; } +typedef void(GPR_CALLTYPE * callback_funcptr)(grpc_op_error op_error, void *batch_context); + +/* + * Helper to maintain lifetime of batch op inputs and store batch op outputs. + */ +typedef struct gprcsharp_batch_context { + grpc_metadata_array send_initial_metadata; + grpc_byte_buffer *send_message; + struct { + grpc_metadata_array trailing_metadata; + char *status_details; + } send_status_from_server; + grpc_metadata_array recv_initial_metadata; + grpc_byte_buffer *recv_message; + struct { + grpc_metadata_array trailing_metadata; + grpc_status_code status; + char *status_details; + size_t status_details_capacity; + } recv_status_on_client; + int recv_close_on_server_cancelled; + struct { + grpc_call *call; + grpc_call_details call_details; + grpc_metadata_array request_metadata; + } server_rpc_new; + + /* callback will be called upon completion */ + callback_funcptr callback; + +} grpcsharp_batch_context; + +grpcsharp_batch_context *grpcsharp_batch_context_create() { + grpcsharp_batch_context *ctx = gpr_malloc(sizeof(grpcsharp_batch_context)); + memset(ctx, 0, sizeof(grpcsharp_batch_context)); + return ctx; +} + +/** + * Destroys metadata array including keys and values. + */ +void grpcsharp_metadata_array_destroy_recursive(grpc_metadata_array *array) { + if (!array->metadata) { + return; + } + /* TODO: destroy also keys and values */ + grpc_metadata_array_destroy(array); +} + +void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) { + if (!ctx) { + return; + } + grpcsharp_metadata_array_destroy_recursive(&(ctx->send_initial_metadata)); + + grpc_byte_buffer_destroy(ctx->send_message); + + grpcsharp_metadata_array_destroy_recursive(&(ctx->send_status_from_server.trailing_metadata)); + gpr_free(ctx->send_status_from_server.status_details); + + grpc_metadata_array_destroy(&(ctx->recv_initial_metadata)); + + grpc_byte_buffer_destroy(ctx->recv_message); + + grpc_metadata_array_destroy(&(ctx->recv_status_on_client.trailing_metadata)); + gpr_free((void*) ctx->recv_status_on_client.status_details); + + /* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is supposed + to take its ownership. */ + + grpc_call_details_destroy(&(ctx->server_rpc_new.call_details)); + grpc_metadata_array_destroy(&(ctx->server_rpc_new.request_metadata)); + + gpr_free(ctx); +} + +GPR_EXPORT gpr_intptr GPR_CALLTYPE grpcsharp_batch_context_recv_message_length(const grpcsharp_batch_context *ctx) { + if (!ctx->recv_message) { + return -1; + } + return grpc_byte_buffer_length(ctx->recv_message); +} + +/* + * Copies data from recv_message to a buffer. Fatal error occurs if + * buffer is too small. + */ +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_batch_context_recv_message_to_buffer(const grpcsharp_batch_context *ctx, char *buffer, + size_t buffer_len) { + grpc_byte_buffer_reader *reader; + gpr_slice slice; + size_t offset = 0; + + reader = grpc_byte_buffer_reader_create(ctx->recv_message); + + while (grpc_byte_buffer_reader_next(reader, &slice)) { + size_t len = GPR_SLICE_LENGTH(slice); + GPR_ASSERT(offset + len <= buffer_len); + memcpy(buffer + offset, GPR_SLICE_START_PTR(slice), + GPR_SLICE_LENGTH(slice)); + offset += len; + gpr_slice_unref(slice); + } + grpc_byte_buffer_reader_destroy(reader); +} + +GPR_EXPORT grpc_status_code GPR_CALLTYPE +grpcsharp_batch_context_recv_status_on_client_status(const grpcsharp_batch_context *ctx) { + return ctx->recv_status_on_client.status; +} + +GPR_EXPORT const char *GPR_CALLTYPE +grpcsharp_batch_context_recv_status_on_client_details(const grpcsharp_batch_context *ctx) { + return ctx->recv_status_on_client.status_details; +} + +GPR_EXPORT grpc_call* GPR_CALLTYPE +grpcsharp_batch_context_server_rpc_new_call(const grpcsharp_batch_context *ctx) { + return ctx->server_rpc_new.call; +} + +GPR_EXPORT const char *GPR_CALLTYPE +grpcsharp_batch_context_server_rpc_new_method(const grpcsharp_batch_context *ctx) { + return ctx->server_rpc_new.call_details.method; +} + + /* Init & shutdown */ GPR_EXPORT void GPR_CALLTYPE grpcsharp_init(void) { grpc_init(); } @@ -96,11 +226,18 @@ grpcsharp_completion_queue_destroy(grpc_completion_queue *cq) { GPR_EXPORT grpc_completion_type GPR_CALLTYPE grpcsharp_completion_queue_next_with_callback(grpc_completion_queue *cq) { grpc_event *ev; + grpcsharp_batch_context *batch_context; grpc_completion_type t; void(GPR_CALLTYPE * callback)(grpc_event *); ev = grpc_completion_queue_next(cq, gpr_inf_future); t = ev->type; + if (t == GRPC_OP_COMPLETE && ev->tag) { + /* NEW API handler */ + batch_context = (grpcsharp_batch_context *) ev->tag; + batch_context->callback(ev->data.op_complete, batch_context); + grpcsharp_batch_context_destroy(batch_context); + } else if (ev->tag) { /* call the callback in ev->tag */ /* C forbids to cast object pointers to function pointers, so @@ -128,6 +265,12 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_destroy(grpc_channel *channel) { grpc_channel_destroy(channel); } +GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_channel_create_call(grpc_channel *channel, grpc_completion_queue *cq, + const char *method, + const char *host, gpr_timespec deadline) { + return grpc_channel_create_call(channel, cq, method, host, deadline); +} + GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_channel_create_call_old(grpc_channel *channel, const char *method, const char *host, gpr_timespec deadline) { @@ -145,6 +288,12 @@ grpcsharp_event_type(const grpc_event *event) { return event->type; } +GPR_EXPORT grpc_op_error GPR_CALLTYPE +grpcsharp_event_op_complete(const grpc_event *event) { + GPR_ASSERT(event->type == GRPC_OP_COMPLETE); + return event->data.op_complete; +} + GPR_EXPORT grpc_op_error GPR_CALLTYPE grpcsharp_event_write_accepted(const grpc_event *event) { GPR_ASSERT(event->type == GRPC_WRITE_ACCEPTED); @@ -343,3 +492,219 @@ grpcsharp_server_shutdown_and_notify(grpc_server *server, void *tag) { GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) { grpc_server_destroy(server); } + +/* New API Experiments */ + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_unary(grpc_call *call, + callback_funcptr callback, + const char *send_buffer, size_t send_buffer_len) { + /* TODO: don't use magic number */ + grpc_op ops[6]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + /* TODO: implement sending the metadata... */ + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + /* ctx->send_initial_metadata is already zeroed out. */ + ops[0].data.send_initial_metadata.count = 0; + ops[0].data.send_initial_metadata.metadata = NULL; + + ops[1].op = GRPC_OP_SEND_MESSAGE; + ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); + ops[1].data.send_message = ctx->send_message; + + ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + + ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + + ops[4].op = GRPC_OP_RECV_MESSAGE; + ops[4].data.recv_message = &(ctx->recv_message); + + ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[5].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); + ops[5].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); + /* not using preallocation for status_details */ + ops[5].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); + ops[5].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_client_streaming(grpc_call *call, + callback_funcptr callback) { + /* TODO: don't use magic number */ + grpc_op ops[4]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + /* TODO: implement sending the metadata... */ + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + /* ctx->send_initial_metadata is already zeroed out. */ + ops[0].data.send_initial_metadata.count = 0; + ops[0].data.send_initial_metadata.metadata = NULL; + + ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + + ops[2].op = GRPC_OP_RECV_MESSAGE; + ops[2].data.recv_message = &(ctx->recv_message); + + ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[3].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); + ops[3].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); + /* not using preallocation for status_details */ + ops[3].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); + ops[3].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(grpc_call *call, + callback_funcptr callback, + const char *send_buffer, size_t send_buffer_len) { + /* TODO: don't use magic number */ + grpc_op ops[5]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + /* TODO: implement sending the metadata... */ + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + /* ctx->send_initial_metadata is already zeroed out. */ + ops[0].data.send_initial_metadata.count = 0; + ops[0].data.send_initial_metadata.metadata = NULL; + + ops[1].op = GRPC_OP_SEND_MESSAGE; + ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); + ops[1].data.send_message = ctx->send_message; + + ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + + ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + + ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[4].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); + ops[4].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); + /* not using preallocation for status_details */ + ops[4].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); + ops[4].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_duplex_streaming(grpc_call *call, + callback_funcptr callback) { + /* TODO: don't use magic number */ + grpc_op ops[3]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + /* TODO: implement sending the metadata... */ + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + /* ctx->send_initial_metadata is already zeroed out. */ + ops[0].data.send_initial_metadata.count = 0; + ops[0].data.send_initial_metadata.metadata = NULL; + + ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + + ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[2].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); + ops[2].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); + /* not using preallocation for status_details */ + ops[2].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); + ops[2].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_message(grpc_call *call, + callback_funcptr callback, + const char *send_buffer, size_t send_buffer_len) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + ops[0].op = GRPC_OP_SEND_MESSAGE; + ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); + ops[0].data.send_message = ctx->send_message; + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_close_from_client(grpc_call *call, + callback_funcptr callback) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(grpc_call *call, + callback_funcptr callback, grpc_status_code status_code, const char* status_details) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; + ops[0].data.send_status_from_server.status = status_code; + ops[0].data.send_status_from_server.status_details = gpr_strdup(status_details); + ops[0].data.send_status_from_server.trailing_metadata = NULL; + ops[0].data.send_status_from_server.trailing_metadata_count = 0; + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_message(grpc_call *call, + callback_funcptr callback) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + ops[0].op = GRPC_OP_RECV_MESSAGE; + ops[0].data.recv_message = &(ctx->recv_message); + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_serverside(grpc_call *call, + callback_funcptr callback) { + /* TODO: don't use magic number */ + grpc_op ops[2]; + + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[0].data.send_initial_metadata.count = 0; + ops[0].data.send_initial_metadata.metadata = NULL; + + ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER; + ops[1].data.recv_close_on_server.cancelled = (&ctx->recv_close_on_server_cancelled); + + return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); +} + + +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_server_request_call(grpc_server *server, + grpc_completion_queue *cq, callback_funcptr callback) { + + grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); + ctx->callback = callback; + + return grpc_server_request_call(server, &(ctx->server_rpc_new.call), + &(ctx->server_rpc_new.call_details), + &(ctx->server_rpc_new.request_metadata), + cq, ctx); +} + + + + From 3f8962c52d06602f6be73bed56e72e76f6ea7407 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 17 Feb 2015 19:20:39 -0800 Subject: [PATCH 03/10] removal of unused methods in extension library --- src/csharp/ext/grpc_csharp_ext.c | 236 +++++-------------------------- 1 file changed, 34 insertions(+), 202 deletions(-) diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index eff862537b0..2961a708be8 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -201,18 +201,6 @@ grpcsharp_completion_queue_create(void) { return grpc_completion_queue_create(); } -GPR_EXPORT grpc_event *GPR_CALLTYPE -grpcsharp_completion_queue_next(grpc_completion_queue *cq, - gpr_timespec deadline) { - return grpc_completion_queue_next(cq, deadline); -} - -GPR_EXPORT grpc_event *GPR_CALLTYPE -grpcsharp_completion_queue_pluck(grpc_completion_queue *cq, void *tag, - gpr_timespec deadline) { - return grpc_completion_queue_pluck(cq, tag, deadline); -} - GPR_EXPORT void GPR_CALLTYPE grpcsharp_completion_queue_shutdown(grpc_completion_queue *cq) { grpc_completion_queue_shutdown(cq); @@ -271,101 +259,6 @@ GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_channel_create_call(grpc_channel *c return grpc_channel_create_call(channel, cq, method, host, deadline); } -GPR_EXPORT grpc_call *GPR_CALLTYPE -grpcsharp_channel_create_call_old(grpc_channel *channel, const char *method, - const char *host, gpr_timespec deadline) { - return grpc_channel_create_call_old(channel, method, host, deadline); -} - -/* Event */ - -GPR_EXPORT void GPR_CALLTYPE grpcsharp_event_finish(grpc_event *event) { - grpc_event_finish(event); -} - -GPR_EXPORT grpc_completion_type GPR_CALLTYPE -grpcsharp_event_type(const grpc_event *event) { - return event->type; -} - -GPR_EXPORT grpc_op_error GPR_CALLTYPE -grpcsharp_event_op_complete(const grpc_event *event) { - GPR_ASSERT(event->type == GRPC_OP_COMPLETE); - return event->data.op_complete; -} - -GPR_EXPORT grpc_op_error GPR_CALLTYPE -grpcsharp_event_write_accepted(const grpc_event *event) { - GPR_ASSERT(event->type == GRPC_WRITE_ACCEPTED); - return event->data.invoke_accepted; -} - -GPR_EXPORT grpc_op_error GPR_CALLTYPE -grpcsharp_event_finish_accepted(const grpc_event *event) { - GPR_ASSERT(event->type == GRPC_FINISH_ACCEPTED); - return event->data.finish_accepted; -} - -GPR_EXPORT grpc_status_code GPR_CALLTYPE -grpcsharp_event_finished_status(const grpc_event *event) { - GPR_ASSERT(event->type == GRPC_FINISHED); - return event->data.finished.status; -} - -GPR_EXPORT const char *GPR_CALLTYPE -grpcsharp_event_finished_details(const grpc_event *event) { - GPR_ASSERT(event->type == GRPC_FINISHED); - return event->data.finished.details; -} - -GPR_EXPORT gpr_intptr GPR_CALLTYPE -grpcsharp_event_read_length(const grpc_event *event) { - GPR_ASSERT(event->type == GRPC_READ); - if (!event->data.read) { - return -1; - } - return grpc_byte_buffer_length(event->data.read); -} - -/* - * Copies data from read event to a buffer. Fatal error occurs if - * buffer is too small. - */ -GPR_EXPORT void GPR_CALLTYPE -grpcsharp_event_read_copy_to_buffer(const grpc_event *event, char *buffer, - size_t buffer_len) { - grpc_byte_buffer_reader *reader; - gpr_slice slice; - size_t offset = 0; - - GPR_ASSERT(event->type == GRPC_READ); - reader = grpc_byte_buffer_reader_create(event->data.read); - - GPR_ASSERT(event->data.read); - while (grpc_byte_buffer_reader_next(reader, &slice)) { - size_t len = GPR_SLICE_LENGTH(slice); - GPR_ASSERT(offset + len <= buffer_len); - memcpy(buffer + offset, GPR_SLICE_START_PTR(slice), - GPR_SLICE_LENGTH(slice)); - offset += len; - gpr_slice_unref(slice); - } - grpc_byte_buffer_reader_destroy(reader); -} - -GPR_EXPORT grpc_call *GPR_CALLTYPE -grpcsharp_event_call(const grpc_event *event) { - /* we only allow this for newly incoming server calls. */ - GPR_ASSERT(event->type == GRPC_SERVER_RPC_NEW); - return event->call; -} - -GPR_EXPORT const char *GPR_CALLTYPE -grpcsharp_event_server_rpc_new_method(const grpc_event *event) { - GPR_ASSERT(event->type == GRPC_SERVER_RPC_NEW); - return event->data.server_rpc_new.method; -} - /* Timespec */ GPR_EXPORT gpr_timespec GPR_CALLTYPE gprsharp_now(void) { return gpr_now(); } @@ -380,31 +273,6 @@ GPR_EXPORT gpr_int32 GPR_CALLTYPE gprsharp_sizeof_timespec(void) { /* Call */ -GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_add_metadata_old(grpc_call *call, grpc_metadata *metadata, - gpr_uint32 flags) { - return grpc_call_add_metadata_old(call, metadata, flags); -} - -GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_invoke_old(grpc_call *call, grpc_completion_queue *cq, - void *metadata_read_tag, void *finished_tag, - gpr_uint32 flags) { - return grpc_call_invoke_old(call, cq, metadata_read_tag, finished_tag, flags); -} - -GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_server_accept_old(grpc_call *call, grpc_completion_queue *cq, - void *finished_tag) { - return grpc_call_server_accept_old(call, cq, finished_tag); -} - -GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_server_end_initial_metadata_old(grpc_call *call, - gpr_uint32 flags) { - return grpc_call_server_end_initial_metadata_old(call, flags); -} - GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_cancel(grpc_call *call) { return grpc_call_cancel(call); } @@ -415,30 +283,6 @@ grpcsharp_call_cancel_with_status(grpc_call *call, grpc_status_code status, return grpc_call_cancel_with_status(call, status, description); } -GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_start_write_old(grpc_call *call, grpc_byte_buffer *byte_buffer, - void *tag, gpr_uint32 flags) { - return grpc_call_start_write_old(call, byte_buffer, tag, flags); -} - -GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_start_write_status_old(grpc_call *call, - grpc_status_code status_code, - const char *status_message, void *tag) { - return grpc_call_start_write_status_old(call, status_code, status_message, - tag); -} - -GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_writes_done_old(grpc_call *call, void *tag) { - return grpc_call_writes_done_old(call, tag); -} - -GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_call_start_read_old(grpc_call *call, void *tag) { - return grpc_call_start_read_old(call, tag); -} - GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) { grpc_call_destroy(call); } @@ -453,48 +297,6 @@ grpcsharp_call_start_write_from_copied_buffer(grpc_call *call, grpc_byte_buffer_destroy(byte_buffer); } -/* Server */ - -GPR_EXPORT grpc_call_error GPR_CALLTYPE -grpcsharp_server_request_call_old(grpc_server *server, void *tag_new) { - return grpc_server_request_call_old(server, tag_new); -} - -GPR_EXPORT grpc_server *GPR_CALLTYPE -grpcsharp_server_create(grpc_completion_queue *cq, - const grpc_channel_args *args) { - return grpc_server_create(cq, args); -} - -GPR_EXPORT int GPR_CALLTYPE -grpcsharp_server_add_http2_port(grpc_server *server, const char *addr) { - return grpc_server_add_http2_port(server, addr); -} - -GPR_EXPORT int GPR_CALLTYPE -grpcsharp_server_add_secure_http2_port(grpc_server *server, const char *addr) { - return grpc_server_add_secure_http2_port(server, addr); -} - -GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_start(grpc_server *server) { - grpc_server_start(server); -} - -GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_shutdown(grpc_server *server) { - grpc_server_shutdown(server); -} - -GPR_EXPORT void GPR_CALLTYPE -grpcsharp_server_shutdown_and_notify(grpc_server *server, void *tag) { - grpc_server_shutdown_and_notify(server, tag); -} - -GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) { - grpc_server_destroy(server); -} - -/* New API Experiments */ - GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback, const char *send_buffer, size_t send_buffer_len) { @@ -692,6 +494,40 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_serverside(grpc_cal return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); } +/* Server */ + +GPR_EXPORT grpc_server *GPR_CALLTYPE +grpcsharp_server_create(grpc_completion_queue *cq, + const grpc_channel_args *args) { + return grpc_server_create(cq, args); +} + +GPR_EXPORT int GPR_CALLTYPE +grpcsharp_server_add_http2_port(grpc_server *server, const char *addr) { + return grpc_server_add_http2_port(server, addr); +} + +GPR_EXPORT int GPR_CALLTYPE +grpcsharp_server_add_secure_http2_port(grpc_server *server, const char *addr) { + return grpc_server_add_secure_http2_port(server, addr); +} + +GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_start(grpc_server *server) { + grpc_server_start(server); +} + +GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_shutdown(grpc_server *server) { + grpc_server_shutdown(server); +} + +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_server_shutdown_and_notify(grpc_server *server, void *tag) { + grpc_server_shutdown_and_notify(server, tag); +} + +GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) { + grpc_server_destroy(server); +} GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq, callback_funcptr callback) { @@ -704,7 +540,3 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_server_request_call(grpc_serve &(ctx->server_rpc_new.request_metadata), cq, ctx); } - - - - From a96afb013babf5afd8d47b195d616cd03b93d677 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 17 Feb 2015 19:23:55 -0800 Subject: [PATCH 04/10] renaming file name to match class name --- src/csharp/GrpcCore/GrpcCore.csproj | 2 +- ...chContextSafeHandle.cs => BatchContextSafeHandleNotOwned.cs} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename src/csharp/GrpcCore/Internal/{BatchContextSafeHandle.cs => BatchContextSafeHandleNotOwned.cs} (100%) diff --git a/src/csharp/GrpcCore/GrpcCore.csproj b/src/csharp/GrpcCore/GrpcCore.csproj index a574f181c8a..ee76b742ce4 100644 --- a/src/csharp/GrpcCore/GrpcCore.csproj +++ b/src/csharp/GrpcCore/GrpcCore.csproj @@ -59,9 +59,9 @@ - + diff --git a/src/csharp/GrpcCore/Internal/BatchContextSafeHandle.cs b/src/csharp/GrpcCore/Internal/BatchContextSafeHandleNotOwned.cs similarity index 100% rename from src/csharp/GrpcCore/Internal/BatchContextSafeHandle.cs rename to src/csharp/GrpcCore/Internal/BatchContextSafeHandleNotOwned.cs From 607307d0beca6b3742ba446390603b42f5a57c19 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 18 Feb 2015 11:05:45 -0800 Subject: [PATCH 05/10] Cleanup of AsyncCall.cs --- .../GrpcApiTests/MathClientServerTests.cs | 18 +- src/csharp/GrpcCore/GrpcEnvironment.cs | 2 +- src/csharp/GrpcCore/Internal/AsyncCall.cs | 325 ++++++++++-------- src/csharp/GrpcCore/ServerCallHandler.cs | 11 +- src/csharp/GrpcCoreTests/ClientServerTest.cs | 23 +- 5 files changed, 203 insertions(+), 176 deletions(-) diff --git a/src/csharp/GrpcApiTests/MathClientServerTests.cs b/src/csharp/GrpcApiTests/MathClientServerTests.cs index bd298b0932f..9056142097b 100644 --- a/src/csharp/GrpcApiTests/MathClientServerTests.cs +++ b/src/csharp/GrpcApiTests/MathClientServerTests.cs @@ -64,6 +64,15 @@ namespace math.Tests client = MathGrpc.NewStub(channel); } + [TestFixtureTearDown] + public void Cleanup() + { + channel.Dispose(); + + server.ShutdownAsync().Wait(); + GrpcEnvironment.Shutdown(); + } + [Test] public void Div1() { @@ -136,15 +145,6 @@ namespace math.Tests CollectionAssert.AreEqual(new long[] {3, 4, 3}, result.ConvertAll((divReply) => divReply.Quotient)); CollectionAssert.AreEqual(new long[] {1, 16, 1}, result.ConvertAll((divReply) => divReply.Remainder)); } - - [TestFixtureTearDown] - public void Cleanup() - { - channel.Dispose(); - - server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); - } } } diff --git a/src/csharp/GrpcCore/GrpcEnvironment.cs b/src/csharp/GrpcCore/GrpcEnvironment.cs index c4f030267d2..55a6cac8f69 100644 --- a/src/csharp/GrpcCore/GrpcEnvironment.cs +++ b/src/csharp/GrpcCore/GrpcEnvironment.cs @@ -42,7 +42,7 @@ namespace Google.GRPC.Core /// public class GrpcEnvironment { - const int THREAD_POOL_SIZE = 1; + const int THREAD_POOL_SIZE = 4; [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_init(); diff --git a/src/csharp/GrpcCore/Internal/AsyncCall.cs b/src/csharp/GrpcCore/Internal/AsyncCall.cs index ae7428978ea..ce0ba30d53d 100644 --- a/src/csharp/GrpcCore/Internal/AsyncCall.cs +++ b/src/csharp/GrpcCore/Internal/AsyncCall.cs @@ -42,15 +42,13 @@ using Google.GRPC.Core.Internal; namespace Google.GRPC.Core.Internal { /// - /// Handle native call lifecycle and provides convenience methods. + /// Handles native call lifecycle and provides convenience methods. /// - internal class AsyncCall : IDisposable + internal class AsyncCall { readonly Func serializer; readonly Func deserializer; - // TODO: make sure the delegate doesn't get garbage collected while - // native callbacks are in the completion queue. readonly CompletionCallbackDelegate unaryResponseHandler; readonly CompletionCallbackDelegate finishedHandler; readonly CompletionCallbackDelegate writeFinishedHandler; @@ -59,35 +57,44 @@ namespace Google.GRPC.Core.Internal readonly CompletionCallbackDelegate finishedServersideHandler; object myLock = new object(); - bool disposed; + GCHandle gchandle; CallSafeHandle call; + bool disposed; bool server; + bool started; bool errorOccured; - bool cancelRequested; + bool readingDone; bool halfcloseRequested; bool halfclosed; - bool doneWithReading; - Nullable finishedStatus; + bool finished; + // Completion of a pending write if not null. TaskCompletionSource writeTcs; + + // Completion of a pending read if not null. TaskCompletionSource readTcs; - TaskCompletionSource finishedServersideTcs = new TaskCompletionSource(); - TaskCompletionSource halfcloseTcs = new TaskCompletionSource(); - TaskCompletionSource finishedTcs = new TaskCompletionSource(); + // Completion of a pending halfclose if not null. + TaskCompletionSource halfcloseTcs; + // Completion of a pending unary response if not null. TaskCompletionSource unaryResponseTcs; + // Set after status is received on client. Only used for server streaming and duplex streaming calls. + Nullable finishedStatus; + TaskCompletionSource finishedServersideTcs = new TaskCompletionSource(); + + // For streaming, the reads will be delivered to this observer. IObserver readObserver; public AsyncCall(Func serializer, Func deserializer) { this.serializer = serializer; this.deserializer = deserializer; - this.unaryResponseHandler = HandleUnaryResponseCompletion; + this.unaryResponseHandler = HandleUnaryResponse; this.finishedHandler = HandleFinished; this.writeFinishedHandler = HandleWriteFinished; this.readFinishedHandler = HandleReadFinished; @@ -95,46 +102,23 @@ namespace Google.GRPC.Core.Internal this.finishedServersideHandler = HandleFinishedServerside; } - /// - /// Initiates reading to given observer. - /// - public void StartReadingToStream(IObserver readObserver) { - lock (myLock) - { - CheckStarted(); - if (this.readObserver != null) - { - throw new InvalidOperationException("Already registered an observer."); - } - this.readObserver = readObserver; - ReceiveMessageAsync(); - } - } - - public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName) { - lock (myLock) - { - this.call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture); - } + public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName) + { + InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false); } public void InitializeServer(CallSafeHandle call) { - lock(myLock) - { - this.call = call; - started = true; - server = true; - } + InitializeInternal(call, true); } - public Task UnaryCallAsync(TWrite msg) { lock (myLock) { started = true; halfcloseRequested = true; + readingDone = true; // TODO: handle serialization error... byte[] payload = serializer(msg); @@ -151,6 +135,7 @@ namespace Google.GRPC.Core.Internal lock (myLock) { started = true; + readingDone = true; unaryResponseTcs = new TaskCompletionSource(); call.StartClientStreaming(unaryResponseHandler); @@ -191,15 +176,43 @@ namespace Google.GRPC.Core.Internal } } - public Task SendMessageAsync(TWrite msg) { + public Task ServerSideUnaryRequestCallAsync() + { lock (myLock) { + started = true; + call.StartServerSide(finishedServersideHandler); + return finishedServersideTcs.Task; + } + } + + public Task ServerSideStreamingRequestCallAsync(IObserver readObserver) + { + lock (myLock) + { + started = true; + call.StartServerSide(finishedServersideHandler); + + if (this.readObserver != null) + { + throw new InvalidOperationException("Already registered an observer."); + } + this.readObserver = readObserver; + ReceiveMessageAsync(); + + return finishedServersideTcs.Task; + } + } + + public Task SendMessageAsync(TWrite msg) + { + lock (myLock) + { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); CheckNoError(); - CheckCancelNotRequested(); - if (halfcloseRequested || halfclosed) + if (halfcloseRequested) { throw new InvalidOperationException("Already halfclosed."); } @@ -222,18 +235,19 @@ namespace Google.GRPC.Core.Internal { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); CheckNoError(); - CheckCancelNotRequested(); - if (halfcloseRequested || halfclosed) + if (halfcloseRequested) { throw new InvalidOperationException("Already halfclosed."); } call.StartSendCloseFromClient(halfclosedHandler); + halfcloseRequested = true; + halfcloseTcs = new TaskCompletionSource(); return halfcloseTcs.Task; } } @@ -242,18 +256,18 @@ namespace Google.GRPC.Core.Internal { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); CheckNoError(); - CheckCancelNotRequested(); - if (halfcloseRequested || halfclosed) + if (halfcloseRequested) { throw new InvalidOperationException("Already halfclosed."); } call.StartSendStatusFromServer(status, halfclosedHandler); halfcloseRequested = true; + halfcloseTcs = new TaskCompletionSource(); return halfcloseTcs.Task; } } @@ -262,13 +276,11 @@ namespace Google.GRPC.Core.Internal { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); CheckNoError(); - // TODO: add check for not cancelled? - - if (doneWithReading) + if (readingDone) { throw new InvalidOperationException("Already read the last message."); } @@ -285,22 +297,12 @@ namespace Google.GRPC.Core.Internal } } - internal Task StartServerSide() - { - lock (myLock) - { - call.StartServerSide(finishedServersideHandler); - return finishedServersideTcs.Task; - } - } - public void Cancel() { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); - cancelRequested = true; } // grpc_call_cancel is threadsafe @@ -311,41 +313,23 @@ namespace Google.GRPC.Core.Internal { lock (myLock) { + CheckNotDisposed(); CheckStarted(); - CheckNotFinished(); - cancelRequested = true; } // grpc_call_cancel_with_status is threadsafe call.CancelWithStatus(status); } - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected virtual void Dispose(bool disposing) - { - if (!disposed) - { - if (disposing) - { - if (call != null) - { - call.Dispose(); - } - } - disposed = true; - } - } - private void UpdateErrorOccured(GRPCOpError error) + private void InitializeInternal(CallSafeHandle call, bool server) { - if (error == GRPCOpError.GRPC_OP_ERROR) + lock (myLock) { - errorOccured = true; + // Make sure this object and the delegated held by it will not be garbage collected + // before we release this handle. + gchandle = GCHandle.Alloc(this); + this.call = call; + this.server = server; } } @@ -357,41 +341,46 @@ namespace Google.GRPC.Core.Internal } } - private void CheckNoError() + private void CheckNotDisposed() { - if (errorOccured) + if (disposed) { - throw new InvalidOperationException("Error occured when processing call."); + throw new InvalidOperationException("Call has already been disposed."); } } - private void CheckNotFinished() + private void CheckNoError() { - if (finishedStatus.HasValue) + if (errorOccured) { - throw new InvalidOperationException("Already finished."); + throw new InvalidOperationException("Error occured when processing call."); } } - private void CheckCancelNotRequested() + private bool ReleaseResourcesIfPossible() { - if (cancelRequested) + if (!disposed && call != null) { - throw new InvalidOperationException("Cancel has been requested."); + if (halfclosed && readingDone && finished) + { + ReleaseResources(); + return true; + } } + return false; } - private void DisposeResourcesIfNeeded() + private void ReleaseResources() { - if (call != null && started && finishedStatus.HasValue) - { - // TODO: should we also wait for all the pending events to finish? - + if (call != null) { call.Dispose(); } + gchandle.Free(); + disposed = true; } - private void CompleteStreamObserver(Status status) { + private void CompleteStreamObserver(Status status) + { if (status.StatusCode != StatusCode.GRPC_STATUS_OK) { // TODO: wrap to handle exceptions; @@ -402,20 +391,27 @@ namespace Google.GRPC.Core.Internal } } - private void HandleUnaryResponseCompletion(GRPCOpError error, IntPtr batchContextPtr) { - try { - + /// + /// Handler for unary response completion. + /// + private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr) + { + try + { TaskCompletionSource tcs; - lock(myLock) { + lock(myLock) + { + finished = true; + halfclosed = true; tcs = unaryResponseTcs; - } - // we're done with this call, get rid of the native object. - call.Dispose(); + ReleaseResourcesIfPossible(); + } var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); - if (error != GRPCOpError.GRPC_OP_OK) { + if (error != GRPCOpError.GRPC_OP_OK) + { tcs.SetException(new RpcException( new Status(StatusCode.GRPC_STATUS_INTERNAL, "Internal error occured.") )); @@ -423,7 +419,8 @@ namespace Google.GRPC.Core.Internal } var status = ctx.GetReceivedStatus(); - if (status.StatusCode != StatusCode.GRPC_STATUS_OK) { + if (status.StatusCode != StatusCode.GRPC_STATUS_OK) + { tcs.SetException(new RpcException(status)); return; } @@ -431,18 +428,20 @@ namespace Google.GRPC.Core.Internal // TODO: handle deserialize error... var msg = deserializer(ctx.GetReceivedMessage()); tcs.SetResult(msg); - } catch(Exception e) { + } + catch(Exception e) + { Console.WriteLine("Caught exception in a native handler: " + e); } } - private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr) { - try { - + private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr) + { + try + { TaskCompletionSource oldTcs = null; lock (myLock) { - UpdateErrorOccured(error); oldTcs = writeTcs; writeTcs = null; } @@ -458,20 +457,25 @@ namespace Google.GRPC.Core.Internal oldTcs.SetResult(null); } - } catch(Exception e) { + } + catch(Exception e) + { Console.WriteLine("Caught exception in a native handler: " + e); } } - private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr) { - try { + private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr) + { + try + { lock (myLock) { - UpdateErrorOccured(error); halfclosed = true; + + ReleaseResourcesIfPossible(); } - if (errorOccured) + if (error != GRPCOpError.GRPC_OP_OK) { halfcloseTcs.SetException(new Exception("Halfclose failed")); @@ -480,14 +484,17 @@ namespace Google.GRPC.Core.Internal { halfcloseTcs.SetResult(null); } - } catch(Exception e) { + } + catch(Exception e) + { Console.WriteLine("Caught exception in a native handler: " + e); } } - private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr) { - try { - + private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr) + { + try + { var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); var payload = ctx.GetReceivedMessage(); @@ -502,7 +509,7 @@ namespace Google.GRPC.Core.Internal readTcs = null; if (payload == null) { - doneWithReading = true; + readingDone = true; } observer = readObserver; status = finishedStatus; @@ -515,7 +522,8 @@ namespace Google.GRPC.Core.Internal // TODO: make sure we deliver reads in the right order. - if (observer != null) { + if (observer != null) + { if (payload != null) { // TODO: wrap to handle exceptions @@ -526,58 +534,81 @@ namespace Google.GRPC.Core.Internal } else { - if (!server) { - if (status.HasValue) { + if (!server) + { + if (status.HasValue) + { CompleteStreamObserver(status.Value); } - } else { + } + else + { // TODO: wrap to handle exceptions.. observer.OnCompleted(); } // TODO: completeStreamObserver serverside... } } - } catch(Exception e) { + } + catch(Exception e) + { Console.WriteLine("Caught exception in a native handler: " + e); } } - private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr) { - try { + private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr) + { + try + { var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); var status = ctx.GetReceivedStatus(); - bool wasDoneWithReading; + bool wasReadingDone; lock (myLock) { + finished = true; finishedStatus = status; - DisposeResourcesIfNeeded(); + wasReadingDone = readingDone; - wasDoneWithReading = doneWithReading; + ReleaseResourcesIfPossible(); } - if (wasDoneWithReading) { + if (wasReadingDone) { CompleteStreamObserver(status); } - } catch(Exception e) { + } + catch(Exception e) + { Console.WriteLine("Caught exception in a native handler: " + e); } } - private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr) { - try { + private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr) + { + try + { var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + lock(myLock) + { + finished = true; + + // TODO: because of the way server calls are implemented, we need to set + // reading done to true here. Should be fixed in the future. + readingDone = true; + + ReleaseResourcesIfPossible(); + } // TODO: handle error ... finishedServersideTcs.SetResult(null); - call.Dispose(); - - } catch(Exception e) { + } + catch(Exception e) + { Console.WriteLine("Caught exception in a native handler: " + e); } } diff --git a/src/csharp/GrpcCore/ServerCallHandler.cs b/src/csharp/GrpcCore/ServerCallHandler.cs index 3bc3b153964..73dfa52def8 100644 --- a/src/csharp/GrpcCore/ServerCallHandler.cs +++ b/src/csharp/GrpcCore/ServerCallHandler.cs @@ -60,7 +60,7 @@ namespace Google.GRPC.Core asyncCall.InitializeServer(call); - var finishedTask = asyncCall.StartServerSide(); + var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync(); var request = asyncCall.ReceiveMessageAsync().Result; @@ -91,14 +91,9 @@ namespace Google.GRPC.Core asyncCall.InitializeServer(call); - var finishedTask = asyncCall.StartServerSide(); - var responseObserver = new ServerStreamingOutputObserver(asyncCall); var requestObserver = handler(responseObserver); - - // feed the requests - asyncCall.StartReadingToStream(requestObserver); - + var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(requestObserver); finishedTask.Wait(); } } @@ -114,7 +109,7 @@ namespace Google.GRPC.Core asyncCall.InitializeServer(call); - var finishedTask = asyncCall.StartServerSide(); + var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync(); asyncCall.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait(); diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs index dd3fc7038e7..37d770e0c04 100644 --- a/src/csharp/GrpcCoreTests/ClientServerTest.cs +++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs @@ -52,11 +52,21 @@ namespace Google.GRPC.Core.Tests Marshallers.StringMarshaller, Marshallers.StringMarshaller); - [Test] - public void UnaryCall() + [TestFixtureSetUp] + public void Init() + { + GrpcEnvironment.Initialize(); + } + + [TestFixtureTearDown] + public void Cleanup() { GrpcEnvironment.Initialize(); + } + [Test] + public void UnaryCall() + { Server server = new Server(); server.AddServiceDefinition( ServerServiceDefinition.CreateBuilder("someService") @@ -82,8 +92,6 @@ namespace Google.GRPC.Core.Tests [Test] public void UnaryCallPerformance() { - GrpcEnvironment.Initialize(); - Server server = new Server(); server.AddServiceDefinition( ServerServiceDefinition.CreateBuilder("someService") @@ -107,16 +115,11 @@ namespace Google.GRPC.Core.Tests } server.ShutdownAsync().Wait(); - - GrpcEnvironment.Shutdown(); } - [Test] public void UnknownMethodHandler() { - GrpcEnvironment.Initialize(); - Server server = new Server(); server.AddServiceDefinition( ServerServiceDefinition.CreateBuilder("someService").Build()); @@ -137,8 +140,6 @@ namespace Google.GRPC.Core.Tests } server.ShutdownAsync().Wait(); - - GrpcEnvironment.Shutdown(); } private void HandleUnaryEchoString(string request, IObserver responseObserver) { From fa21673cf8469a9fdc5848de4a9a9d9914b8e5e4 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 18 Feb 2015 11:06:44 -0800 Subject: [PATCH 06/10] clang-format --- src/csharp/ext/grpc_csharp_ext.c | 186 ++++++++++++++++++------------- 1 file changed, 108 insertions(+), 78 deletions(-) diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 2961a708be8..1dd6c692e5f 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -60,7 +60,8 @@ grpc_byte_buffer *string_to_byte_buffer(const char *buffer, size_t len) { return bb; } -typedef void(GPR_CALLTYPE * callback_funcptr)(grpc_op_error op_error, void *batch_context); +typedef void(GPR_CALLTYPE *callback_funcptr)(grpc_op_error op_error, + void *batch_context); /* * Helper to maintain lifetime of batch op inputs and store batch op outputs. @@ -117,7 +118,8 @@ void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) { grpc_byte_buffer_destroy(ctx->send_message); - grpcsharp_metadata_array_destroy_recursive(&(ctx->send_status_from_server.trailing_metadata)); + grpcsharp_metadata_array_destroy_recursive( + &(ctx->send_status_from_server.trailing_metadata)); gpr_free(ctx->send_status_from_server.status_details); grpc_metadata_array_destroy(&(ctx->recv_initial_metadata)); @@ -125,9 +127,10 @@ void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) { grpc_byte_buffer_destroy(ctx->recv_message); grpc_metadata_array_destroy(&(ctx->recv_status_on_client.trailing_metadata)); - gpr_free((void*) ctx->recv_status_on_client.status_details); + gpr_free((void *)ctx->recv_status_on_client.status_details); - /* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is supposed + /* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is + supposed to take its ownership. */ grpc_call_details_destroy(&(ctx->server_rpc_new.call_details)); @@ -136,20 +139,20 @@ void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) { gpr_free(ctx); } -GPR_EXPORT gpr_intptr GPR_CALLTYPE grpcsharp_batch_context_recv_message_length(const grpcsharp_batch_context *ctx) { +GPR_EXPORT gpr_intptr GPR_CALLTYPE grpcsharp_batch_context_recv_message_length( + const grpcsharp_batch_context *ctx) { if (!ctx->recv_message) { - return -1; - } - return grpc_byte_buffer_length(ctx->recv_message); + return -1; + } + return grpc_byte_buffer_length(ctx->recv_message); } /* * Copies data from recv_message to a buffer. Fatal error occurs if * buffer is too small. */ -GPR_EXPORT void GPR_CALLTYPE -grpcsharp_batch_context_recv_message_to_buffer(const grpcsharp_batch_context *ctx, char *buffer, - size_t buffer_len) { +GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_recv_message_to_buffer( + const grpcsharp_batch_context *ctx, char *buffer, size_t buffer_len) { grpc_byte_buffer_reader *reader; gpr_slice slice; size_t offset = 0; @@ -168,26 +171,28 @@ grpcsharp_batch_context_recv_message_to_buffer(const grpcsharp_batch_context *ct } GPR_EXPORT grpc_status_code GPR_CALLTYPE -grpcsharp_batch_context_recv_status_on_client_status(const grpcsharp_batch_context *ctx) { +grpcsharp_batch_context_recv_status_on_client_status( + const grpcsharp_batch_context *ctx) { return ctx->recv_status_on_client.status; } GPR_EXPORT const char *GPR_CALLTYPE -grpcsharp_batch_context_recv_status_on_client_details(const grpcsharp_batch_context *ctx) { +grpcsharp_batch_context_recv_status_on_client_details( + const grpcsharp_batch_context *ctx) { return ctx->recv_status_on_client.status_details; } -GPR_EXPORT grpc_call* GPR_CALLTYPE -grpcsharp_batch_context_server_rpc_new_call(const grpcsharp_batch_context *ctx) { +GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_batch_context_server_rpc_new_call( + const grpcsharp_batch_context *ctx) { return ctx->server_rpc_new.call; } GPR_EXPORT const char *GPR_CALLTYPE -grpcsharp_batch_context_server_rpc_new_method(const grpcsharp_batch_context *ctx) { +grpcsharp_batch_context_server_rpc_new_method( + const grpcsharp_batch_context *ctx) { return ctx->server_rpc_new.call_details.method; } - /* Init & shutdown */ GPR_EXPORT void GPR_CALLTYPE grpcsharp_init(void) { grpc_init(); } @@ -222,11 +227,10 @@ grpcsharp_completion_queue_next_with_callback(grpc_completion_queue *cq) { t = ev->type; if (t == GRPC_OP_COMPLETE && ev->tag) { /* NEW API handler */ - batch_context = (grpcsharp_batch_context *) ev->tag; + batch_context = (grpcsharp_batch_context *)ev->tag; batch_context->callback(ev->data.op_complete, batch_context); grpcsharp_batch_context_destroy(batch_context); - } else - if (ev->tag) { + } else if (ev->tag) { /* call the callback in ev->tag */ /* C forbids to cast object pointers to function pointers, so * we cast to intptr first. @@ -253,9 +257,10 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_destroy(grpc_channel *channel) { grpc_channel_destroy(channel); } -GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_channel_create_call(grpc_channel *channel, grpc_completion_queue *cq, - const char *method, - const char *host, gpr_timespec deadline) { +GPR_EXPORT grpc_call *GPR_CALLTYPE +grpcsharp_channel_create_call(grpc_channel *channel, grpc_completion_queue *cq, + const char *method, const char *host, + gpr_timespec deadline) { return grpc_channel_create_call(channel, cq, method, host, deadline); } @@ -297,9 +302,9 @@ grpcsharp_call_start_write_from_copied_buffer(grpc_call *call, grpc_byte_buffer_destroy(byte_buffer); } -GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_unary(grpc_call *call, - callback_funcptr callback, - const char *send_buffer, size_t send_buffer_len) { +GPR_EXPORT grpc_call_error GPR_CALLTYPE +grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback, + const char *send_buffer, size_t send_buffer_len) { /* TODO: don't use magic number */ grpc_op ops[6]; grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); @@ -324,17 +329,22 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_unary(grpc_call *ca ops[4].data.recv_message = &(ctx->recv_message); ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - ops[5].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); - ops[5].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); + ops[5].data.recv_status_on_client.trailing_metadata = + &(ctx->recv_status_on_client.trailing_metadata); + ops[5].data.recv_status_on_client.status = + &(ctx->recv_status_on_client.status); /* not using preallocation for status_details */ - ops[5].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); - ops[5].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + ops[5].data.recv_status_on_client.status_details = + &(ctx->recv_status_on_client.status_details); + ops[5].data.recv_status_on_client.status_details_capacity = + &(ctx->recv_status_on_client.status_details_capacity); - return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } -GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_client_streaming(grpc_call *call, - callback_funcptr callback) { +GPR_EXPORT grpc_call_error GPR_CALLTYPE +grpcsharp_call_start_client_streaming(grpc_call *call, + callback_funcptr callback) { /* TODO: don't use magic number */ grpc_op ops[4]; grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); @@ -353,18 +363,24 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_client_streaming(gr ops[2].data.recv_message = &(ctx->recv_message); ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - ops[3].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); - ops[3].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); + ops[3].data.recv_status_on_client.trailing_metadata = + &(ctx->recv_status_on_client.trailing_metadata); + ops[3].data.recv_status_on_client.status = + &(ctx->recv_status_on_client.status); /* not using preallocation for status_details */ - ops[3].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); - ops[3].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + ops[3].data.recv_status_on_client.status_details = + &(ctx->recv_status_on_client.status_details); + ops[3].data.recv_status_on_client.status_details_capacity = + &(ctx->recv_status_on_client.status_details_capacity); - return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } -GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(grpc_call *call, - callback_funcptr callback, - const char *send_buffer, size_t send_buffer_len) { +GPR_EXPORT grpc_call_error GPR_CALLTYPE +grpcsharp_call_start_server_streaming(grpc_call *call, + callback_funcptr callback, + const char *send_buffer, + size_t send_buffer_len) { /* TODO: don't use magic number */ grpc_op ops[5]; grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); @@ -386,17 +402,22 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(gr ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - ops[4].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); - ops[4].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); + ops[4].data.recv_status_on_client.trailing_metadata = + &(ctx->recv_status_on_client.trailing_metadata); + ops[4].data.recv_status_on_client.status = + &(ctx->recv_status_on_client.status); /* not using preallocation for status_details */ - ops[4].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); - ops[4].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + ops[4].data.recv_status_on_client.status_details = + &(ctx->recv_status_on_client.status_details); + ops[4].data.recv_status_on_client.status_details_capacity = + &(ctx->recv_status_on_client.status_details_capacity); - return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } -GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_duplex_streaming(grpc_call *call, - callback_funcptr callback) { +GPR_EXPORT grpc_call_error GPR_CALLTYPE +grpcsharp_call_start_duplex_streaming(grpc_call *call, + callback_funcptr callback) { /* TODO: don't use magic number */ grpc_op ops[3]; grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); @@ -412,18 +433,22 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_duplex_streaming(gr ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - ops[2].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); - ops[2].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); + ops[2].data.recv_status_on_client.trailing_metadata = + &(ctx->recv_status_on_client.trailing_metadata); + ops[2].data.recv_status_on_client.status = + &(ctx->recv_status_on_client.status); /* not using preallocation for status_details */ - ops[2].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); - ops[2].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); + ops[2].data.recv_status_on_client.status_details = + &(ctx->recv_status_on_client.status_details); + ops[2].data.recv_status_on_client.status_details_capacity = + &(ctx->recv_status_on_client.status_details_capacity); - return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } -GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_message(grpc_call *call, - callback_funcptr callback, - const char *send_buffer, size_t send_buffer_len) { +GPR_EXPORT grpc_call_error GPR_CALLTYPE +grpcsharp_call_send_message(grpc_call *call, callback_funcptr callback, + const char *send_buffer, size_t send_buffer_len) { /* TODO: don't use magic number */ grpc_op ops[1]; grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); @@ -433,11 +458,12 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_message(grpc_call *c ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); ops[0].data.send_message = ctx->send_message; - return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } -GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_close_from_client(grpc_call *call, - callback_funcptr callback) { +GPR_EXPORT grpc_call_error GPR_CALLTYPE +grpcsharp_call_send_close_from_client(grpc_call *call, + callback_funcptr callback) { /* TODO: don't use magic number */ grpc_op ops[1]; grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); @@ -445,11 +471,14 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_close_from_client(gr ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; - return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } -GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(grpc_call *call, - callback_funcptr callback, grpc_status_code status_code, const char* status_details) { +GPR_EXPORT grpc_call_error GPR_CALLTYPE +grpcsharp_call_send_status_from_server(grpc_call *call, + callback_funcptr callback, + grpc_status_code status_code, + const char *status_details) { /* TODO: don't use magic number */ grpc_op ops[1]; grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); @@ -457,15 +486,16 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(g ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; ops[0].data.send_status_from_server.status = status_code; - ops[0].data.send_status_from_server.status_details = gpr_strdup(status_details); + ops[0].data.send_status_from_server.status_details = + gpr_strdup(status_details); ops[0].data.send_status_from_server.trailing_metadata = NULL; ops[0].data.send_status_from_server.trailing_metadata_count = 0; - return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } -GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_message(grpc_call *call, - callback_funcptr callback) { +GPR_EXPORT grpc_call_error GPR_CALLTYPE +grpcsharp_call_recv_message(grpc_call *call, callback_funcptr callback) { /* TODO: don't use magic number */ grpc_op ops[1]; grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); @@ -473,11 +503,11 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_message(grpc_call *c ops[0].op = GRPC_OP_RECV_MESSAGE; ops[0].data.recv_message = &(ctx->recv_message); - return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } -GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_serverside(grpc_call *call, - callback_funcptr callback) { +GPR_EXPORT grpc_call_error GPR_CALLTYPE +grpcsharp_call_start_serverside(grpc_call *call, callback_funcptr callback) { /* TODO: don't use magic number */ grpc_op ops[2]; @@ -489,9 +519,10 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_serverside(grpc_cal ops[0].data.send_initial_metadata.metadata = NULL; ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER; - ops[1].data.recv_close_on_server.cancelled = (&ctx->recv_close_on_server_cancelled); + ops[1].data.recv_close_on_server.cancelled = + (&ctx->recv_close_on_server_cancelled); - return grpc_call_start_batch(call, ops, sizeof(ops)/sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } /* Server */ @@ -529,14 +560,13 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) { grpc_server_destroy(server); } -GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_server_request_call(grpc_server *server, - grpc_completion_queue *cq, callback_funcptr callback) { - +GPR_EXPORT grpc_call_error GPR_CALLTYPE +grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq, + callback_funcptr callback) { grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); ctx->callback = callback; - return grpc_server_request_call(server, &(ctx->server_rpc_new.call), - &(ctx->server_rpc_new.call_details), - &(ctx->server_rpc_new.request_metadata), - cq, ctx); + return grpc_server_request_call( + server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details), + &(ctx->server_rpc_new.request_metadata), cq, ctx); } From 37afb9ab2b2e5a8f6a3bae2546e26eda22256976 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 18 Feb 2015 11:20:04 -0800 Subject: [PATCH 07/10] fixing unknown method call handler on server --- src/csharp/GrpcCore/ServerCallHandler.cs | 18 +++++++++++++++++- src/csharp/GrpcCoreTests/ClientServerTest.cs | 2 -- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/csharp/GrpcCore/ServerCallHandler.cs b/src/csharp/GrpcCore/ServerCallHandler.cs index 73dfa52def8..48d1eaa3359 100644 --- a/src/csharp/GrpcCore/ServerCallHandler.cs +++ b/src/csharp/GrpcCore/ServerCallHandler.cs @@ -109,12 +109,28 @@ namespace Google.GRPC.Core asyncCall.InitializeServer(call); - var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync(); + var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver()); asyncCall.SendStatusFromServerAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait(); finishedTask.Wait(); } } + + internal class NullObserver : IObserver + { + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + public void OnNext(T value) + { + } + + } } diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs index 37d770e0c04..d0e357e29a6 100644 --- a/src/csharp/GrpcCoreTests/ClientServerTest.cs +++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs @@ -85,8 +85,6 @@ namespace Google.GRPC.Core.Tests } server.ShutdownAsync().Wait(); - - GrpcEnvironment.Shutdown(); } [Test] From 8d7ce43aa4993cf71e57de9a7c2ae94b01248bef Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 18 Feb 2015 11:20:43 -0800 Subject: [PATCH 08/10] formatting --- src/csharp/GrpcCoreTests/ClientServerTest.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs index d0e357e29a6..e76189974d3 100644 --- a/src/csharp/GrpcCoreTests/ClientServerTest.cs +++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs @@ -140,7 +140,8 @@ namespace Google.GRPC.Core.Tests server.ShutdownAsync().Wait(); } - private void HandleUnaryEchoString(string request, IObserver responseObserver) { + private void HandleUnaryEchoString(string request, IObserver responseObserver) + { responseObserver.OnNext(request); responseObserver.OnCompleted(); } From 8d2e572371afc771b107d1ba6fb56375bd7d46be Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 18 Feb 2015 12:41:14 -0800 Subject: [PATCH 09/10] got rid of server_add_secure_http2_port --- src/csharp/GrpcCore/Internal/ServerSafeHandle.cs | 8 +------- src/csharp/ext/grpc_csharp_ext.c | 7 +------ 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs index c0966028008..047bde1addf 100644 --- a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs +++ b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs @@ -52,13 +52,8 @@ namespace Google.GRPC.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args); - // TODO: check int representation size [DllImport("grpc_csharp_ext.dll")] - static extern int grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr); - - // TODO: check int representation size - [DllImport("grpc_csharp_ext.dll")] - static extern int grpcsharp_server_add_secure_http2_port(ServerSafeHandle server, string addr); + static extern Int32 grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr); [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_server_start(ServerSafeHandle server); @@ -85,7 +80,6 @@ namespace Google.GRPC.Core.Internal public int AddPort(string addr) { - // TODO: also grpc_server_add_secure_http2_port... return grpcsharp_server_add_http2_port(this, addr); } diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 1dd6c692e5f..304ee9cf34c 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -533,16 +533,11 @@ grpcsharp_server_create(grpc_completion_queue *cq, return grpc_server_create(cq, args); } -GPR_EXPORT int GPR_CALLTYPE +GPR_EXPORT gpr_int32 GPR_CALLTYPE grpcsharp_server_add_http2_port(grpc_server *server, const char *addr) { return grpc_server_add_http2_port(server, addr); } -GPR_EXPORT int GPR_CALLTYPE -grpcsharp_server_add_secure_http2_port(grpc_server *server, const char *addr) { - return grpc_server_add_secure_http2_port(server, addr); -} - GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_start(grpc_server *server) { grpc_server_start(server); } From ec77624a9f56c1158e1e425c0cbdba591ad2d86a Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 18 Feb 2015 14:06:56 -0800 Subject: [PATCH 10/10] fix typo: shutdown should be used in teardown. --- src/csharp/GrpcCoreTests/ClientServerTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs index e76189974d3..ba43e4f6a07 100644 --- a/src/csharp/GrpcCoreTests/ClientServerTest.cs +++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs @@ -61,7 +61,7 @@ namespace Google.GRPC.Core.Tests [TestFixtureTearDown] public void Cleanup() { - GrpcEnvironment.Initialize(); + GrpcEnvironment.Shutdown(); } [Test]