#region Copyright notice and license // Copyright 2015, Google Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above // copyright notice, this list of conditions and the following disclaimer // in the documentation and/or other materials provided with the // distribution. // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #endregion using System; using System.Runtime.InteropServices; using System.Diagnostics; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Collections.Generic; using Google.GRPC.Core.Internal; namespace Google.GRPC.Core { /// /// Server is implemented only to be able to do /// in-process testing. /// public class Server { // TODO: make sure the delegate doesn't get garbage collected while // native callbacks are in the completion queue. readonly EventCallbackDelegate newRpcHandler; readonly EventCallbackDelegate serverShutdownHandler; readonly BlockingCollection newRpcQueue = new BlockingCollection(); readonly ServerSafeHandle handle; readonly Dictionary callHandlers = new Dictionary(); readonly TaskCompletionSource shutdownTcs = new TaskCompletionSource(); static Server() { GrpcEnvironment.EnsureInitialized(); } public Server() { // TODO: what is the tag for server shutdown? this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); this.newRpcHandler = HandleNewRpc; this.serverShutdownHandler = HandleServerShutdown; } // only call this before Start() public void AddServiceDefinition(ServerServiceDefinition serviceDefinition) { foreach(var entry in serviceDefinition.CallHandlers) { callHandlers.Add(entry.Key, entry.Value); } } // only call before Start() public int AddPort(string addr) { return handle.AddPort(addr); } public void Start() { handle.Start(); // TODO: this basically means the server is single threaded.... StartHandlingRpcs(); } /// /// Requests and handles single RPC call. /// internal void RunRpc() { AllowOneRpc(); try { var rpcInfo = newRpcQueue.Take(); Console.WriteLine("Server received RPC " + rpcInfo.Method); IServerCallHandler callHandler; if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) { callHandler = new NoSuchMethodCallHandler(); } callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue()); } catch(Exception e) { Console.WriteLine("Exception while handling RPC: " + e); } } /// /// Requests server shutdown and when there are no more calls being serviced, /// cleans up used resources. /// /// The async. public async Task ShutdownAsync() { handle.ShutdownAndNotify(serverShutdownHandler); await shutdownTcs.Task; handle.Dispose(); } public void Kill() { handle.Dispose(); } private async Task StartHandlingRpcs() { while (true) { await Task.Factory.StartNew(RunRpc); } } private void AllowOneRpc() { AssertCallOk(handle.RequestCall(newRpcHandler)); } private void HandleNewRpc(IntPtr eventPtr) { try { var ev = new EventSafeHandleNotOwned(eventPtr); var rpcInfo = new NewRpcInfo(ev.GetCall(), ev.GetServerRpcNewMethod()); // after server shutdown, the callback returns with null call if (!rpcInfo.Call.IsInvalid) { newRpcQueue.Add(rpcInfo); } } catch (Exception e) { Console.WriteLine("Caught exception in a native handler: " + e); } } private void HandleServerShutdown(IntPtr eventPtr) { try { shutdownTcs.SetResult(null); } catch (Exception e) { Console.WriteLine("Caught exception in a native handler: " + e); } } private static void AssertCallOk(GRPCCallError callError) { Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); } private static CompletionQueueSafeHandle GetCompletionQueue() { return GrpcEnvironment.ThreadPool.CompletionQueue; } private struct NewRpcInfo { private CallSafeHandle call; private string method; public NewRpcInfo(CallSafeHandle call, string method) { this.call = call; this.method = method; } public CallSafeHandle Call { get { return this.call; } } public string Method { get { return this.method; } } } } }