Merge pull request #1358 from jtattermusch/csharp_server_improvements

Improving C# server
pull/1361/head
Tim Emiola 10 years ago
commit 68180a2b92
  1. 3
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  2. 10
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  3. 176
      src/csharp/Grpc.Core/Server.cs
  4. 2
      src/csharp/Grpc.Examples.MathServer/MathServer.cs
  5. 4
      src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj

@ -33,6 +33,7 @@ using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using Grpc.Core;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
@ -180,7 +181,7 @@ namespace Grpc.Core.Internal
private static void AssertCallOk(GRPCCallError callError)
{
Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
}
private static uint GetFlags(bool buffered)

@ -35,6 +35,7 @@ using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.InteropServices;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
@ -105,9 +106,9 @@ namespace Grpc.Core.Internal
grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback);
}
public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
public void RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
{
return grpcsharp_server_request_call(this, cq, callback);
AssertCallOk(grpcsharp_server_request_call(this, cq, callback));
}
protected override bool ReleaseHandle()
@ -115,5 +116,10 @@ namespace Grpc.Core.Internal
grpcsharp_server_destroy(handle);
return true;
}
private static void AssertCallOk(GRPCCallError callError)
{
Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
}
}
}

@ -38,27 +38,29 @@ using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core
{
/// <summary>
/// Server is implemented only to be able to do
/// in-process testing.
/// A gRPC server.
/// </summary>
public class Server
{
// TODO: make sure the delegate doesn't get garbage collected while
// TODO(jtattermusch) : make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly ServerShutdownCallbackDelegate serverShutdownHandler;
readonly CompletionCallbackDelegate newServerRpcHandler;
readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>();
readonly ServerSafeHandle handle;
readonly object myLock = new object();
readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
bool startRequested;
bool shutdownRequested;
public Server()
{
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
@ -66,71 +68,81 @@ namespace Grpc.Core
this.serverShutdownHandler = HandleServerShutdown;
}
// only call this before Start()
/// <summary>
/// Adds a service definition to the server. This is how you register
/// handlers for a service with the server.
/// Only call this before Start().
/// </summary>
public void AddServiceDefinition(ServerServiceDefinition serviceDefinition)
{
foreach (var entry in serviceDefinition.CallHandlers)
lock (myLock)
{
callHandlers.Add(entry.Key, entry.Value);
Preconditions.CheckState(!startRequested);
foreach (var entry in serviceDefinition.CallHandlers)
{
callHandlers.Add(entry.Key, entry.Value);
}
}
}
// only call before Start()
/// <summary>
/// Add a non-secure port on which server should listen.
/// Only call this before Start().
/// </summary>
public int AddListeningPort(string addr)
{
return handle.AddListeningPort(addr);
}
// only call before Start()
public int AddListeningPort(string addr, ServerCredentials credentials)
{
using (var nativeCredentials = credentials.ToNativeCredentials())
lock (myLock)
{
return handle.AddListeningPort(addr, nativeCredentials);
Preconditions.CheckState(!startRequested);
return handle.AddListeningPort(addr);
}
}
public void Start()
{
handle.Start();
// TODO: this basically means the server is single threaded....
StartHandlingRpcs();
}
/// <summary>
/// Requests and handles single RPC call.
/// Add a secure port on which server should listen.
/// Only call this before Start().
/// </summary>
internal void RunRpc()
public int AddListeningPort(string addr, ServerCredentials credentials)
{
AllowOneRpc();
try
lock (myLock)
{
var rpcInfo = newRpcQueue.Take();
// Console.WriteLine("Server received RPC " + rpcInfo.Method);
IServerCallHandler callHandler;
if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler))
Preconditions.CheckState(!startRequested);
using (var nativeCredentials = credentials.ToNativeCredentials())
{
callHandler = new NoSuchMethodCallHandler();
return handle.AddListeningPort(addr, nativeCredentials);
}
callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue());
}
catch (Exception e)
}
/// <summary>
/// Starts the server.
/// </summary>
public void Start()
{
lock (myLock)
{
Console.WriteLine("Exception while handling RPC: " + e);
Preconditions.CheckState(!startRequested);
startRequested = true;
handle.Start();
AllowOneRpc();
}
}
/// <summary>
/// Requests server shutdown and when there are no more calls being serviced,
/// cleans up used resources.
/// cleans up used resources. The returned task finishes when shutdown procedure
/// is complete.
/// </summary>
/// <returns>The async.</returns>
public async Task ShutdownAsync()
{
lock (myLock)
{
Preconditions.CheckState(startRequested);
Preconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
handle.ShutdownAndNotify(serverShutdownHandler);
await shutdownTcs.Task;
handle.Dispose();
@ -152,19 +164,43 @@ namespace Grpc.Core
handle.Dispose();
}
private async Task StartHandlingRpcs()
/// <summary>
/// Allows one new RPC call to be received by server.
/// </summary>
private void AllowOneRpc()
{
while (true)
lock (myLock)
{
await Task.Factory.StartNew(RunRpc);
if (!shutdownRequested)
{
handle.RequestCall(GetCompletionQueue(), newServerRpcHandler);
}
}
}
private void AllowOneRpc()
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
private void InvokeCallHandler(CallSafeHandle call, string method)
{
AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler));
try
{
IServerCallHandler callHandler;
if (!callHandlers.TryGetValue(method, out callHandler))
{
callHandler = new NoSuchMethodCallHandler();
}
callHandler.StartCall(method, call, GetCompletionQueue());
}
catch (Exception e)
{
Console.WriteLine("Exception while handling RPC: " + e);
}
}
/// <summary>
/// Handles the native callback.
/// </summary>
private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr)
{
try
@ -176,13 +212,16 @@ namespace Grpc.Core
// TODO: handle error
}
var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod());
CallSafeHandle call = ctx.GetServerRpcNewCall();
string method = ctx.GetServerRpcNewMethod();
// after server shutdown, the callback returns with null call
if (!rpcInfo.Call.IsInvalid)
if (!call.IsInvalid)
{
newRpcQueue.Add(rpcInfo);
Task.Run(() => InvokeCallHandler(call, method));
}
AllowOneRpc();
}
catch (Exception e)
{
@ -190,6 +229,10 @@ namespace Grpc.Core
}
}
/// <summary>
/// Handles native callback.
/// </summary>
/// <param name="eventPtr"></param>
private void HandleServerShutdown(IntPtr eventPtr)
{
try
@ -202,42 +245,9 @@ namespace Grpc.Core
}
}
private static void AssertCallOk(GRPCCallError callError)
{
Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
}
private static CompletionQueueSafeHandle GetCompletionQueue()
{
return GrpcEnvironment.ThreadPool.CompletionQueue;
}
private struct NewRpcInfo
{
private CallSafeHandle call;
private string method;
public NewRpcInfo(CallSafeHandle call, string method)
{
this.call = call;
this.method = method;
}
public CallSafeHandle Call
{
get
{
return this.call;
}
}
public string Method
{
get
{
return this.method;
}
}
}
}
}

@ -40,7 +40,7 @@ namespace math
{
public static void Main(string[] args)
{
String host = "0.0.0.0";
string host = "0.0.0.0";
GrpcEnvironment.Initialize();

@ -80,5 +80,7 @@
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup />
<ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
</Project>
Loading…
Cancel
Save