diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 19d3f57abbb..c97a3bc2b16 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -1,11 +1,11 @@ #region Copyright notice and license // Copyright 2015, Google Inc. // All rights reserved. -// +// // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: -// +// // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above @@ -15,7 +15,7 @@ // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. -// +// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR @@ -33,204 +33,160 @@ using System; using System.Diagnostics; using System.Runtime.InteropServices; using Grpc.Core; - -namespace Grpc.Core.Internal { - internal delegate void CompletionCallbackDelegate(GRPCOpError error, - IntPtr batchContextPtr); - - /// - /// grpc_call from - /// - internal class CallSafeHandle : SafeHandleZeroIsInvalid { - const uint GRPC_WRITE_BUFFER_HINT = 1; - - [DllImport("grpc_csharp_ext.dll")] static extern CallSafeHandle - grpcsharp_channel_create_call(ChannelSafeHandle channel, - CompletionQueueSafeHandle cq, string method, - string host, Timespec deadline); - - [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError - grpcsharp_call_cancel(CallSafeHandle call); - - [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError - grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, - string description); - - [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError - grpcsharp_call_start_unary( - CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate - callback, - byte[] send_buffer, UIntPtr send_buffer_len, - MetadataArraySafeHandle metadataArray); - - [DllImport("grpc_csharp_ext.dll")] static extern void - grpcsharp_call_blocking_unary( - CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate - callback, - byte[] send_buffer, UIntPtr send_buffer_len, - MetadataArraySafeHandle metadataArray); - - [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError - grpcsharp_call_start_client_streaming( - CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate - callback, - MetadataArraySafeHandle metadataArray); - - [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError - grpcsharp_call_start_server_streaming( - CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate - callback, - byte[] send_buffer, UIntPtr send_buffer_len, - MetadataArraySafeHandle metadataArray); - - [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError - grpcsharp_call_start_duplex_streaming( - CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate - callback, - MetadataArraySafeHandle metadataArray); - - [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError - grpcsharp_call_send_message( - CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate - callback, - byte[] send_buffer, UIntPtr send_buffer_len); - - [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError - grpcsharp_call_send_close_from_client( - CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate - callback); - - [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError - grpcsharp_call_send_status_from_server( - CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate - callback, - StatusCode statusCode, string statusMessage); - - [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError - grpcsharp_call_recv_message( - CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate - callback); - - [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError - grpcsharp_call_start_serverside( - CallSafeHandle call, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate - callback); - - [DllImport("grpc_csharp_ext.dll")] static extern void - grpcsharp_call_destroy(IntPtr call); - - private - CallSafeHandle() {} - - public - static CallSafeHandle Create(ChannelSafeHandle channel, - CompletionQueueSafeHandle cq, string method, - string host, Timespec deadline) { - return grpcsharp_channel_create_call(channel, cq, method, host, deadline); - } - - public - void StartUnary(byte[] payload, CompletionCallbackDelegate callback, - MetadataArraySafeHandle metadataArray) { - AssertCallOk(grpcsharp_call_start_unary( - this, callback, payload, new UIntPtr((ulong)payload.Length), - metadataArray)); - } - - public - void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, - CompletionCallbackDelegate callback, - MetadataArraySafeHandle metadataArray) { - grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, - new UIntPtr((ulong)payload.Length), - metadataArray); - } - - public - void StartClientStreaming(CompletionCallbackDelegate callback, - MetadataArraySafeHandle metadataArray) { - AssertCallOk( - grpcsharp_call_start_client_streaming(this, callback, metadataArray)); - } - - public - void StartServerStreaming(byte[] payload, - CompletionCallbackDelegate callback, - MetadataArraySafeHandle metadataArray) { - AssertCallOk(grpcsharp_call_start_server_streaming( - this, callback, payload, new UIntPtr((ulong)payload.Length), - metadataArray)); - } - - public - void StartDuplexStreaming(CompletionCallbackDelegate callback, - MetadataArraySafeHandle metadataArray) { - AssertCallOk( - grpcsharp_call_start_duplex_streaming(this, callback, metadataArray)); - } - - public - void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_send_message( - this, callback, payload, new UIntPtr((ulong)payload.Length))); - } - - public - void StartSendCloseFromClient(CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_send_close_from_client(this, callback)); - } - - public - void StartSendStatusFromServer(Status status, - CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_send_status_from_server( - this, callback, status.StatusCode, status.Detail)); - } - - public - void StartReceiveMessage(CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_recv_message(this, callback)); - } - - public - void StartServerSide(CompletionCallbackDelegate callback) { - AssertCallOk(grpcsharp_call_start_serverside(this, callback)); - } - - public - void Cancel() { AssertCallOk(grpcsharp_call_cancel(this)); } - - public - void CancelWithStatus(Status status) { - AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, - status.Detail)); - } - - protected - override bool ReleaseHandle() { - grpcsharp_call_destroy(handle); - return true; - } - - private - static void AssertCallOk(GRPCCallError callError) { - Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, - "Status not GRPC_CALL_OK"); - } - - private - static uint GetFlags(bool buffered) { - return buffered ? 0 : GRPC_WRITE_BUFFER_HINT; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr); + + /// + /// grpc_call from + /// + internal class CallSafeHandle : SafeHandleZeroIsInvalid + { + const uint GRPC_WRITE_BUFFER_HINT = 1; + + [DllImport("grpc_csharp_ext.dll")] + static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + MetadataArraySafeHandle metadataArray); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len, + MetadataArraySafeHandle metadataArray); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + MetadataArraySafeHandle metadataArray); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_call_destroy(IntPtr call); + + private CallSafeHandle() + { + } + + public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) + { + return grpcsharp_channel_create_call(channel, cq, method, host, deadline); + } + + public void StartUnary(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray) + { + AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray)); + } + + public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray) + { + grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray); + } + + public void StartClientStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray) + { + AssertCallOk(grpcsharp_call_start_client_streaming(this, callback, metadataArray)); + } + + public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray) + { + AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray)); + } + + public void StartDuplexStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray) + { + AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback, metadataArray)); + } + + public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong)payload.Length))); + } + + public void StartSendCloseFromClient(CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_send_close_from_client(this, callback)); + } + + public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail)); + } + + public void StartReceiveMessage(CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_recv_message(this, callback)); + } + + public void StartServerSide(CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_start_serverside(this, callback)); + } + + public void Cancel() + { + AssertCallOk(grpcsharp_call_cancel(this)); + } + + public void CancelWithStatus(Status status) + { + AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail)); + } + + protected override bool ReleaseHandle() + { + grpcsharp_call_destroy(handle); + return true; + } + + private static void AssertCallOk(GRPCCallError callError) + { + Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); + } + + private static uint GetFlags(bool buffered) + { + return buffered ? 0 : GRPC_WRITE_BUFFER_HINT; + } } - } } \ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index b56e8d98019..8080643d8c1 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -35,90 +35,91 @@ using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Runtime.InteropServices; - -namespace Grpc.Core.Internal { - // TODO: we need to make sure that the delegates are not collected before - // invoked. - internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr); - - /// - /// grpc_server from grpc/grpc.h - /// - internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid { - [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError - grpcsharp_server_request_call( - ServerSafeHandle server, CompletionQueueSafeHandle cq, - [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate - callback); - - [DllImport("grpc_csharp_ext.dll")] static extern ServerSafeHandle - grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args); - - [DllImport("grpc_csharp_ext.dll")] static extern int - grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr); - - [DllImport("grpc_csharp_ext.dll")] static extern int - grpcsharp_server_add_secure_http2_port(ServerSafeHandle server, string addr, - ServerCredentialsSafeHandle creds); - - [DllImport("grpc_csharp_ext.dll")] static extern void - grpcsharp_server_start(ServerSafeHandle server); - - [DllImport("grpc_csharp_ext.dll")] static extern void - grpcsharp_server_shutdown(ServerSafeHandle server); - - // TODO: get rid of the old callback style - [DllImport( - "grpc_csharp_ext.dll", - EntryPoint = "grpcsharp_server_shutdown_and_notify")] static extern void - grpcsharp_server_shutdown_and_notify_CALLBACK( - ServerSafeHandle server, - [MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate - callback); - - [DllImport("grpc_csharp_ext.dll")] static extern void - grpcsharp_server_destroy(IntPtr server); - - private - ServerSafeHandle() {} - - public - static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, - IntPtr args) { - return grpcsharp_server_create(cq, args); - } - - public - int AddListeningPort(string addr) { - return grpcsharp_server_add_http2_port(this, addr); - } - - public - int AddListeningPort(string addr, ServerCredentialsSafeHandle credentials) { - return grpcsharp_server_add_secure_http2_port(this, addr, credentials); - } - - public - void Start() { grpcsharp_server_start(this); } - - public - void Shutdown() { grpcsharp_server_shutdown(this); } - - public - void ShutdownAndNotify(ServerShutdownCallbackDelegate callback) { - grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback); - } - - public - GRPCCallError RequestCall(CompletionQueueSafeHandle cq, - CompletionCallbackDelegate callback) { - return grpcsharp_server_request_call(this, cq, callback); - } - - protected - override bool ReleaseHandle() { - grpcsharp_server_destroy(handle); - return true; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + // TODO: we need to make sure that the delegates are not collected before invoked. + internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr); + + /// + /// grpc_server from grpc/grpc.h + /// + internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid + { + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args); + + [DllImport("grpc_csharp_ext.dll")] + static extern int grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr); + + [DllImport("grpc_csharp_ext.dll")] + static extern int grpcsharp_server_add_secure_http2_port(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_server_start(ServerSafeHandle server); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_server_shutdown(ServerSafeHandle server); + + // TODO: get rid of the old callback style + [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_shutdown_and_notify")] + static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_server_destroy(IntPtr server); + + private ServerSafeHandle() + { + } + + public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, IntPtr args) + { + return grpcsharp_server_create(cq, args); + } + + public int AddListeningPort(string addr) + { + return grpcsharp_server_add_http2_port(this, addr); + } + + public int AddListeningPort(string addr, ServerCredentialsSafeHandle credentials) + { + return grpcsharp_server_add_secure_http2_port(this, addr, credentials); + } + + public void Start() + { + grpcsharp_server_start(this); + } + + public void Shutdown() + { + grpcsharp_server_shutdown(this); + } + + public void ShutdownAndNotify(ServerShutdownCallbackDelegate callback) + { + grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback); + } + + public void RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_server_request_call(this, cq, callback)); + } + + protected override bool ReleaseHandle() + { + grpcsharp_server_destroy(handle); + return true; + } + + private static void AssertCallOk(GRPCCallError callError) + { + Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); + } } - } } diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 308fbfb71c8..e686cdddef7 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -38,187 +38,216 @@ using System.Diagnostics; using System.Runtime.InteropServices; using System.Threading.Tasks; using Grpc.Core.Internal; +using Grpc.Core.Utils; -namespace Grpc.Core { - /// - /// Server is implemented only to be able to do - /// in-process testing. - /// - public - class Server { - // TODO: make sure the delegate doesn't get garbage collected while - // native callbacks are in the completion queue. - readonly ServerShutdownCallbackDelegate serverShutdownHandler; - readonly CompletionCallbackDelegate newServerRpcHandler; - - readonly BlockingCollection newRpcQueue = - new BlockingCollection(); - readonly ServerSafeHandle handle; - - readonly Dictionary callHandlers = - new Dictionary(); - - readonly TaskCompletionSource shutdownTcs = - new TaskCompletionSource(); - - public - Server() { - this.handle = - ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); - this.newServerRpcHandler = HandleNewServerRpc; - this.serverShutdownHandler = HandleServerShutdown; - } - - // only call this before Start() - public - void AddServiceDefinition(ServerServiceDefinition serviceDefinition) { - foreach (var entry in serviceDefinition.CallHandlers) { - callHandlers.Add(entry.Key, entry.Value); - } - } - - // only call before Start() - public - int AddListeningPort(string addr) { return handle.AddListeningPort(addr); } - - // only call before Start() - public - int AddListeningPort(string addr, ServerCredentials credentials) { - using(var nativeCredentials = credentials.ToNativeCredentials()) { - return handle.AddListeningPort(addr, nativeCredentials); - } - } - - public - void Start() { - handle.Start(); - - // TODO: this basically means the server is single threaded.... - StartHandlingRpcs(); - } - +namespace Grpc.Core +{ /// - /// Requests and handles single RPC call. + /// A gRPC server. /// - internal void RunRpc() { - AllowOneRpc(); - - try { - var rpcInfo = newRpcQueue.Take(); - - // Console.WriteLine("Server received RPC " + rpcInfo.Method); - - IServerCallHandler callHandler; - if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) { - callHandler = new NoSuchMethodCallHandler(); + public class Server + { + // TODO(jtattermusch) : make sure the delegate doesn't get garbage collected while + // native callbacks are in the completion queue. + readonly ServerShutdownCallbackDelegate serverShutdownHandler; + readonly CompletionCallbackDelegate newServerRpcHandler; + + readonly ServerSafeHandle handle; + readonly object myLock = new object(); + + readonly Dictionary callHandlers = new Dictionary(); + readonly TaskCompletionSource shutdownTcs = new TaskCompletionSource(); + + bool startRequested; + bool shutdownRequested; + + public Server() + { + this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); + this.newServerRpcHandler = HandleNewServerRpc; + this.serverShutdownHandler = HandleServerShutdown; } - callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, - GetCompletionQueue()); - } catch (Exception e) { - Console.WriteLine("Exception while handling RPC: " + e); - } - } - /// - /// Requests server shutdown and when there are no more calls being - /// serviced, - /// cleans up used resources. - /// - /// The async. - public - async Task ShutdownAsync() { - handle.ShutdownAndNotify(serverShutdownHandler); - await shutdownTcs.Task; - handle.Dispose(); - } - - /// - /// To allow awaiting termination of the server. - /// - public - Task ShutdownTask { - get { return shutdownTcs.Task; } - } + /// + /// Adds a service definition to the server. This is how you register + /// handlers for a service with the server. + /// Only call this before Start(). + /// + public void AddServiceDefinition(ServerServiceDefinition serviceDefinition) + { + lock (myLock) + { + Preconditions.CheckState(!startRequested); + foreach (var entry in serviceDefinition.CallHandlers) + { + callHandlers.Add(entry.Key, entry.Value); + } + } + } - public - void Kill() { handle.Dispose(); } + /// + /// Add a non-secure port on which server should listen. + /// Only call this before Start(). + /// + public int AddListeningPort(string addr) + { + lock (myLock) + { + Preconditions.CheckState(!startRequested); + return handle.AddListeningPort(addr); + } + } - private - async Task StartHandlingRpcs() { - while (true) { - await Task.Factory.StartNew(RunRpc); - } - } + /// + /// Add a secure port on which server should listen. + /// Only call this before Start(). + /// + public int AddListeningPort(string addr, ServerCredentials credentials) + { + lock (myLock) + { + Preconditions.CheckState(!startRequested); + using (var nativeCredentials = credentials.ToNativeCredentials()) + { + return handle.AddListeningPort(addr, nativeCredentials); + } + } + } - private - void AllowOneRpc() { - AssertCallOk( - handle.RequestCall(GetCompletionQueue(), newServerRpcHandler)); - } + /// + /// Starts the server. + /// + public void Start() + { + lock (myLock) + { + Preconditions.CheckState(!startRequested); + startRequested = true; + + handle.Start(); + AllowOneRpc(); + } + } - private - void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) { - try { - var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + /// + /// Requests server shutdown and when there are no more calls being serviced, + /// cleans up used resources. The returned task finishes when shutdown procedure + /// is complete. + /// + public async Task ShutdownAsync() + { + lock (myLock) + { + Preconditions.CheckState(startRequested); + Preconditions.CheckState(!shutdownRequested); + shutdownRequested = true; + } + + handle.ShutdownAndNotify(serverShutdownHandler); + await shutdownTcs.Task; + handle.Dispose(); + } - if (error != GRPCOpError.GRPC_OP_OK) { - // TODO: handle error + /// + /// To allow awaiting termination of the server. + /// + public Task ShutdownTask + { + get + { + return shutdownTcs.Task; + } } - var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), - ctx.GetServerRpcNewMethod()); + public void Kill() + { + handle.Dispose(); + } - // after server shutdown, the callback returns with null call - if (!rpcInfo.Call.IsInvalid) { - newRpcQueue.Add(rpcInfo); + /// + /// Allows one new RPC call to be received by server. + /// + private void AllowOneRpc() + { + lock (myLock) + { + if (!shutdownRequested) + { + handle.RequestCall(GetCompletionQueue(), newServerRpcHandler); + } + } } - } catch (Exception e) { - Console.WriteLine("Caught exception in a native handler: " + e); - } - } - private - void HandleServerShutdown(IntPtr eventPtr) { - try { - shutdownTcs.SetResult(null); - } catch (Exception e) { - Console.WriteLine("Caught exception in a native handler: " + e); - } - } + /// + /// Selects corresponding handler for given call and handles the call. + /// + private void InvokeCallHandler(CallSafeHandle call, string method) + { + try + { + IServerCallHandler callHandler; + if (!callHandlers.TryGetValue(method, out callHandler)) + { + callHandler = new NoSuchMethodCallHandler(); + } + callHandler.StartCall(method, call, GetCompletionQueue()); + } + catch (Exception e) + { + Console.WriteLine("Exception while handling RPC: " + e); + } + } - private - static void AssertCallOk(GRPCCallError callError) { - Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, - "Status not GRPC_CALL_OK"); - } + /// + /// Handles the native callback. + /// + private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) + { + try + { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + + if (error != GRPCOpError.GRPC_OP_OK) + { + // TODO: handle error + } + + CallSafeHandle call = ctx.GetServerRpcNewCall(); + string method = ctx.GetServerRpcNewMethod(); + + // after server shutdown, the callback returns with null call + if (!call.IsInvalid) + { + Task.Run(() => InvokeCallHandler(call, method)); + } + + AllowOneRpc(); + } + catch (Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } - private - static CompletionQueueSafeHandle GetCompletionQueue() { - return GrpcEnvironment.ThreadPool.CompletionQueue; - } + /// + /// Handles native callback. + /// + /// + private void HandleServerShutdown(IntPtr eventPtr) + { + try + { + shutdownTcs.SetResult(null); + } + catch (Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } - private - struct NewRpcInfo { - private - CallSafeHandle call; - private - string method; - - public - NewRpcInfo(CallSafeHandle call, string method) { - this.call = call; - this.method = method; - } - - public - CallSafeHandle Call { - get { return this.call; } - } - - public - string Method { - get { return this.method; } - } + private static CompletionQueueSafeHandle GetCompletionQueue() + { + return GrpcEnvironment.ThreadPool.CompletionQueue; + } } - } } diff --git a/src/csharp/Grpc.Examples.MathServer/MathServer.cs b/src/csharp/Grpc.Examples.MathServer/MathServer.cs index f9a28f4d0d6..abc7ef05e4b 100644 --- a/src/csharp/Grpc.Examples.MathServer/MathServer.cs +++ b/src/csharp/Grpc.Examples.MathServer/MathServer.cs @@ -34,26 +34,28 @@ using System.Runtime.InteropServices; using System.Threading; using Grpc.Core; -namespace math { -class MainClass { - public - static void Main(string[] args) { - String host = "0.0.0.0"; +namespace math +{ + class MainClass + { + public static void Main(string[] args) + { + string host = "0.0.0.0"; - GrpcEnvironment.Initialize(); + GrpcEnvironment.Initialize(); - Server server = new Server(); - server.AddServiceDefinition(MathGrpc.BindService(new MathServiceImpl())); - int port = server.AddListeningPort(host + ":23456"); - server.Start(); + Server server = new Server(); + server.AddServiceDefinition(MathGrpc.BindService(new MathServiceImpl())); + int port = server.AddListeningPort(host + ":23456"); + server.Start(); - Console.WriteLine("MathServer listening on port " + port); + Console.WriteLine("MathServer listening on port " + port); - Console.WriteLine("Press any key to stop the server..."); - Console.ReadKey(); + Console.WriteLine("Press any key to stop the server..."); + Console.ReadKey(); - server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); - } -} + server.ShutdownAsync().Wait(); + GrpcEnvironment.Shutdown(); + } + } } diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 088c4355859..6ae8041fb7c 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -16,8 +16,7 @@ full false bin\Debug - DEBUG; - + DEBUG; prompt 4 true @@ -81,5 +80,7 @@ PreserveNewest - + + + \ No newline at end of file